Multi User Chat
This demo showcases a multi-user chat application using channels, server state, and sessions.
Each user can choose a username, which is then associated with their session and persists across browser reloads. Users can also create chat rooms and join existing ones. The landing page of the demo displays a list of previously created chat rooms, as well as the number of users currently engaged in conversation.
Source code
from datetime import datetime
from uuid import uuid1
from time import time
import re
from lona_picocss.html import (
InlineButton,
ScrollerDiv,
TextInput,
TextArea,
Strong,
THead,
TBody,
Table,
Span,
HTML,
Div,
Tr,
Th,
Td,
H1,
Br,
P,
A,
)
from lona_picocss import install_picocss
from lona import RedirectResponse, Channel, View, App
NAME = re.compile(r'^([a-zA-Z0-9-_]{1,})$')
MESSAGE_BACK_LOG = 10
app = App(__file__)
install_picocss(app)
app.settings.PICOCSS_BRAND = 'Multi-User Chat'
app.settings.PICOCSS_TITLE = 'Multi-User Chat'
app.settings.INITIAL_SERVER_STATE = {
'user': {
# session_key: user_name
},
'rooms': {
# name: {
# user: [user_name, ]
# log: [
# [uuid, unix_timestamp, type, user_name, text, ]
# ]
# }
},
}
@app.route('/<room>(/)', name='room')
class ChatView(View):
def show_message(self, message, index=None):
message_id, unix_timestamp, type, user_name, message = message
span = Span(style='margin-left: 0.5em')
line = Div(
Div(
Strong(user_name),
Span(
str(datetime.fromtimestamp(unix_timestamp)),
style={
'color': 'gray',
'font-size': '75%',
'margin-left': '0.5em',
},
),
),
span,
data_message_id=message_id,
)
if type == 'message':
span.set_text(message)
else:
span.set_text(f'*{message}*')
if type == 'join':
span.style['color'] = 'lime'
elif type == 'leave':
span.style['color'] = 'red'
with self.html.lock:
if self.messages.query_selector(f'[data-message-id={message_id}]'):
return
if index is None:
self.messages.append(line)
else:
self.messages.insert(index, line)
def send_message(self, type, text):
# create message
message = [
uuid1().hex,
time(),
type,
self.user_name,
text,
]
# add message to data
self.room_state['log'].append(message)
# send message to all clients
self.channel.send({
'message': message,
})
# trim messages
while len(self.room_state['log']) > MESSAGE_BACK_LOG:
self.room_state['log'].pop(0)
def handle_send_button_click(self, input_event):
message = self.message_text_area.value.strip()
self.message_text_area.value = ''
# nothing to send
if not message:
return
self.send_message('message', message)
def handle_request(self, request):
self.room_name = request.match_info['room']
self.session_key = request.user.session_key
self.user_name = self.server.state['user'].get(self.session_key, '')
self.joined = False
# redirect to lobby if the user has no user name set
if not self.user_name:
return RedirectResponse(self.server.reverse('lobby'))
# check if room exists
if self.room_name not in self.server.state['rooms']:
return HTML(
H1('Room not found'),
P(f'No room named "{self.room_name}" found'),
)
# setup html
self.room_state = self.server.state['rooms'][self.room_name]
self.messages = ScrollerDiv(lines=MESSAGE_BACK_LOG, height='50vh')
self.message_text_area = TextArea(placeholder='Say something nice')
self.send_button = InlineButton(
'Send',
handle_click=self.handle_send_button_click,
)
self.html = HTML(
H1(f'{self.room_name}'),
self.messages,
self.message_text_area,
self.send_button,
)
# subscribe to channel
self.channel = self.subscribe(
f'chat.room.{self.room_name}',
lambda message: self.show_message(message.data['message']),
)
self.room_state['user'].append(self.user_name)
self.send_message('join', 'Joined')
# load history
for index, message in enumerate(self.room_state['log'].copy()):
self.show_message(message, index=index)
self.joined = True
return self.html
def on_cleanup(self) -> None:
if not self.joined:
return
self.room_state['user'].remove(self.user_name)
self.send_message('leave', 'Left')
@app.route('/', name='lobby')
class LobbyView(View):
# alerts
def show_error_alert(self, *message):
with self.html.lock:
self.alerts.style['color'] = 'red'
self.alerts.nodes = list(message)
def show_success_alert(self, *message):
with self.html.lock:
self.alerts.style['color'] = 'lime'
self.alerts.nodes = list(message)
# user name
def set_user_name(self, input_event):
name = self.user_name.value
if not NAME.match(name):
self.show_error_alert(f'"{name}" is no valid name')
return
if name in self.server.state['user']:
self.show_error_alert(f'"{name}" is already taken')
return
self.server.state['user'][self.session_key] = name
return RedirectResponse('.')
# rooms
def list_rooms(self, *args, **kwargs):
with self.html.lock:
self.room_table[-1].clear()
for name in self.server.state['rooms'].keys():
user_count = len(self.server.state['rooms'][name]['user'])
self.room_table[-1].append(
Tr(
Td(
A(
name,
href=self.server.reverse('room', room=name),
),
),
Td(str(user_count)),
),
)
def create_room(self, input_event):
name = self.room_name.value
if not NAME.match(name):
self.show_error_alert(f'"{name}" is no valid name')
return
if name in self.server.state['rooms']:
self.show_error_alert(f'"{name}" is already taken')
return
self.server.state['rooms'][name] = {
'user': [],
'log': [],
}
self.room_name.value = ''
self.show_success_alert(f'"{name}" was created')
Channel('chat.room.open').send()
# request handling
def handle_request(self, request):
self.session_key = request.user.session_key
self.alerts = P()
# set name
if self.session_key not in self.server.state['user']:
self.user_name = TextInput(placeholder='User Name')
self.set_user_name_button = InlineButton(
'Set',
handle_click=self.set_user_name,
)
self.html = HTML(
H1('Set User Name'),
self.alerts,
self.user_name,
self.set_user_name_button,
)
return self.html
# select / create room
self.room_name = TextInput(placeholder='Room Name')
self.create_room_button = InlineButton(
'Create Room',
handle_click=self.create_room,
)
self.room_table = Table(
THead(
Tr(
Th('Room Name'),
Th('User Chatting'),
),
),
TBody(),
)
self.html = HTML(
H1('Chat Rooms'),
self.alerts,
self.room_name,
self.create_room_button,
Br(),
Br(),
self.room_table,
)
self.list_rooms()
self.channel = self.subscribe(
'chat.room.*',
self.list_rooms,
)
return self.html
if __name__ == '__main__':
app.run()