Channels

Note

Added in 1.14

Lona channels facilitate soft real-time communication between different components of the server side of your application by implementing a straightforward publish/subscribe system. They provide a means to exchange messages among multiple views and middlewares, enabling the implementation of multi-user features and bidirectional signaling within your application.

Note

soft real-time means "no timing guarantees" in this case. Messages are guaranteed to arrive at every subscribed channel exactly once, in the correct order, but not at a specific time.

Each channel is associated with a topic which is represented as a string and can be a wildcard. This allows channels to send and receive messages related to a specific topic or a group of topics that match a particular pattern.

from lona import Channel


def handle_message(message):
    topic = message.topic  # contains the topic of the sender as string
    data = message.data    # contains a dictionary of data

    print(f'{topic}: {data!r}')


# subscribe to specific topics
Channel('chat.rooms.lona', handle_message)
Channel('chat.rooms.lona-releases', handle_message)


# subscribe to wildcard topics
Channel('chat.rooms.lona-*', handle_message)
Channel('chat.rooms.*', handle_message)
Channel('*', handle_message)  # subscribes to all topic, including internal topics


# send messages to specific topics
Channel('chat.rooms.releases').send({'lona-version': '2.0'})
Channel('chat.rooms.lona').send()  # empty message


# send messages to wildcard topics
Channel('chat.rooms.lona-*').send({'lona-version': '2.0'})
Channel('*').send({'lona-version': '2.0'})


# explicit subscribing and unsubscribing
channel = Channel('chat.rooms.lona')

channel.subscribe(handle_message)
channel.unsubscribe()

Subscribing to Topics

To subscribe to a channel, you can either set a message handler in the constructor of lona.Channel or use the lona.Channel.subscribe() method. Each channel can have only one topic and one handler. Once you are subscribed, your handler will be called with lona.channels.Message objects whenever a matching message is sent, whether it's from your channel or another.

When using channels in conjunction with views, it is highly recommended to use View.subscribe instead of directly using the channel API. By utilizing View.subscribe, errors are automatically handled as 500 errors, and channel gets automatically unsubscribed when the view closes.

Sending Messages

Messages can be sent using Channel.send. Channel.send creates a shallow copy of your message data, using copy.copy, and then puts the message into the global channel message queue, using Queue.no_wait, reducing the cost of sending messages to a minimum.

lona.Channel.send(
    self,
    message_data: 'dict | None' = None,
    expiry: 'datetime | timedelta | None' = None,
    local: 'bool' = False,
    droppable: 'bool' = False,
) -> Message
    Send a message to all channels subscribed to a matching topic.

    :message_data:  Optional user-defined message payload. When given a
                    dict it gets shallow copied using `copy.copy` to ensure
                    message integrity between multiple scopes.

    :expiry:        When set to a datetime- or timedelta object, the
                    message gets dropped if it expires before being
                    handled.

    :local:         Tells the broker if a message should be handled process
                    local or not. Has no effect when using the
                    default broker.

    :droppable:     Tells the broker and the task worker if a message may
                    be dropped for faster shutdown.

Internal Topics

Lona provides a list of internal topics that can be subscribed to in order to receive internal events.

lona.channels.subscribe

Gets sent locally whenever a channel subscribes to a topic

from lona import Channel

def handle_message(message):
    topic = message.date['topic']  # subscriber topic as string

    print(f'one user subscribed to {topic}')

Channel('lona.channels.subscribe', handle_message)
lona.channels.unsubscribe

Gets sent locally whenever a channel unsubscribes from a topic

from lona import Channel

def handle_message(message):
    topic = message.date['topic']  # subscriber topic as string

    print(f'one user unsubscribed from {topic}')

Channel('lona.channels.unsubscribe', handle_message)

Message Broker and Task Worker

Lona channels follow the broker pattern, where messages sent via Channels.send are added to a global queue. The message broker threads schedule a task for each message and for each subscribed channel to another global queue, for the task worker threads to execute.

The separation of these two stages and queues allows for the possibility of implementing a custom message broker with network capabilities. This flexibility enables the integration of external systems or protocols to handle the distribution and routing of messages across a network, expanding the capabilities of the Lona channel system.

During startup, Lona initializes a set of message broker threads and task worker threads. The number of threads can be configured using settings.MAX_CHANNEL_MESSAGE_BROKER_THREADS and settings.MAX_CHANNEL_TASK_WORKER_THREADS. You can specify the message broker class and the task worker class that Lona should use for these threads using settings.CHANNEL_MESSAGE_BROKER_CLASS and settings.CHANNEL_TASK_WORKER_CLASS respectively.

Both the message broker class and the task worker class need to implement a run method, which is periodically called by the base class lona.channels.Worker with a timeout. The timeout value can be configured using settings.CHANNEL_WORKER_TIMEOUT.

Settings: Threads, Channels

Default Message Broker

class MessageBroker(Worker):
    # derived from lona.channels.Worker

    def run(self):

        # get message from global message queue
        # this queue gets filled by lona.Channel.send()
        message = self.get_message()

        # get all channels that are subscribed to message.topic
        channels = Channel.get_channels(topic=message.topic)

        # schedule a task for each channel
        for channel in channels:
            self.schedule_task(
                channel=channel,
                message=message,
            )

        logger.debug(
            '%s tasks scheduled for %s',
            len(channels),
            message,
        )

Default Task Worker

class TaskWorker(Worker):
    # derived from lona.channels.Worker

    def run(self):

        # get task from global task queue
        # this queue gets filled by lona.Worker.schedule_task()
        task = self.get_task()

        logger.debug(
            'running %s with message %s',
            task.channel.handler,
            task.message,
        )

        # run handler
        handler = task.channel.handler

        try:
            if not handler:  # pragma: no cover
                logger.debug('channel unsubscribed while running its handler')

                return

            handler(task.message)

        except Exception:
            logger.exception(
                'exception was raised while running %s',
                handler,
            )