March 15, 2022
WebSockets in Rust Actix Framework
- 15 Comments
- 35970 Views
- Categories: socket

WHAT is Web-Socket
The Web Socket API is a cutting-edge technology, providing full duplex communication between the user’s browser and a server. With this API, you can send a message to a server and receive a message over a single TCP connection. It provides easier real-time data transfer because server can send content to you without any request from you!
Advantages of Web Socket
Bidirectional
In HTTP, request is initiated by the client,
then the response is operated and returned to the client by the server. In
web-socket client and server do not need pre-defined request/response scheme –
except handshake. After handshake, either the client or server can send a
message.
Full
Duplex
In HTTP, at the same time, either client can
send a request, or the server can send a response to the client. Full duplex
communication allows that server and client can talk to each other separately.
Single TCP Connection
In general, a new TCP connection is created for an HTTP request, after getting response this connection terminated. In Web Socket, the connection is kept alive over the same TCP for the lifecycle of either the client or server.
HOW is Web-Socket
Using Rust's Actix framework, we can work with a
web socket server. Moreover, we can handle requests and response using Actix’s
actor system. At the end of the project, we are going to create an HTTP
endpoint for clients to connect to the web-socket within the topic they want to
join.
I will explain step by step how to implement in Actix.
First step:
Define a struct for Web Socket object.
pub struct WebSocketSession {
room: String,
lobby_addr: Addr<Lobby>,
hb: Instant,
id: Uuid,
}
Let me explain meaning of fields
Room: This uuid shows room id. I will implement a ‘rooms’. The rooms will be just a HashMap that maps each room Uuid to List of Socket Ids. Because each socket exists in a room.- Lobby_addr: This address shows where the lobby is. We need this id because each socket’s room exist in a lobby. We need this property because it is used to send data to lobby.
- Hb: This time shows when we receive the last heartbeat. Sometimes connection lost without any warning, so we should handle this situation. Having the actor forever is unnecessary. So, we send a heartbeat to socket every specified N second, if we do not get any response, we terminate the actor.
- Id: this is the id of socket. Assigning an id to each socket is helpful in many cases. Especially, if we want to send private message to specific socket.
Step Two: The Web-Socket Actor
So far, WebSocketSession is just a simple struct. We need to convert this struct into an actor. Let me implement the Actor trait on it.
Here is the implementation:
impl Actor for WebSocketSession {type Context = ws::WebsocketContext<Self>;fn started(&mut self, ctx: &mut Self::Context) {self.hb(ctx);let addr = ctx.address();self.lobby_addr.send(Connect {addr: addr.recipient(),lobby_id: self.room.clone(),self_id: self.id,}).into_actor(self).then(|res, _, ctx| {match res {Ok(_res) => (),_ => ctx.stop(),}fut::ready(())}).wait(ctx);}fn stopping(&mut self, _: &mut Self::Context) -> Running {self.lobby_addr.do_send(Disconnect { id: self.id, room_id: self.room.clone() });Running::Stop}}
Since we are creating WebSocket actor we need a WebSocket context. So context is declared as ws::WebsocketContext. This context allowed us to do WebSocket works. To create and close the Actor we should have started and stopping methods.
Started function is called when the actor starts
up. In started function, we run the heartbeat function first. We will see this
function in a bit, but basically it will automatically close the socket if
the heartbeat is not echo’d.
Then we take the addres of the lobby because we send a Connect message to that lobby. This message also includes id and address of the socket. I will show how lobby handle this connect message. If anything goes in wrong way, we just stop the whole Actor with ctx.stop.
In stopping function, we send a Disconnect
message to the lobby. If you noticed, do_send is used here. This allows us to
send message synchronously. We do not care if message has been sent or read. As
you seen in started method, send needs to be awaited.
WebSocketSession is now an Actor!
Here’s the helper heartbeat method
impl WebSocketSession {fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {println!("Disconnecting failed heartbeat");act.lobby_addr.do_send(Disconnect { id: act.id, room_id: act.room.clone() });ctx.stop();return;}ctx.ping(b"hi");});}}
This method pings the client and wait for a response on a specified interval. If it does not get response, send a Disconnect message to the lobby. Then lobby clears this session and socket died.
How to handle WS messages
We need to implement the StreamHandler trait
that will allow us to process an events stream coming into the actor. This
pattern matches on all possible WebSocket events.
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketSession {fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {match msg {Ok(ws::Message::Ping(msg)) => {self.hb = Instant::now();ctx.pong(&msg);}Ok(ws::Message::Pong(_)) => {self.hb = Instant::now();}Ok(ws::Message::Binary(bin)) => ctx.binary(bin),Ok(ws::Message::Close(reason)) => {ctx.close(reason);ctx.stop();}Ok(ws::Message::Continuation(_)) => {ctx.stop();}Ok(ws::Message::Nop) => (),Ok(Text(s)) => self.lobby_addr.do_send(BroadcastMessage {id: self.id,msg: serde_json::Value::String(s),room_id: self.room.clone()}),Err(e) => panic!("{}", e),}}}
The ping and pong events are basic sending of data to validate the connection is still alive. Since we know they are alive, reset the heartbeat to the current timestamp. If it is a close request, just close and stop the context. On a text message, send it to the lobby. The lobby will figure out to where it needs to go.
Responding to text messages
impl Handler<WsMessage> for WebSocketSession {type Result = ();fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {ctx.text(msg.0);}}
If the server puts a WsMessage into our mailbox, we just pass the message String to the actor context as a text message. We forward the messages from the server to the client in this way.
This is all about WebSocketSession for this project.
Step Three: Defining Messages for our Mailboxes
Here is the message structs we used in this project
use actix::prelude::{Message, Recipient};use uuid::Uuid;use serde_json::Value;use serde::{Deserialize, Serialize};#[derive(Message)]#[rtype(result = "()")]pub struct WsMessage(pub String);#[derive(Message)]#[rtype(result = "()")]pub struct Connect {pub addr: Recipient<WsMessage>,pub lobby_id: String,pub self_id: Uuid,}#[derive(Message)]#[rtype(result = "()")]pub struct Disconnect {pub id: Uuid,pub room_id: String,}#[derive(Message, Deserialize, Serialize, Clone)]#[rtype(result = "()")]pub struct BroadcastMessage {pub id: Uuid,pub msg: Value,pub room_id: String}impl BroadcastMessage {pub fn new(id: Uuid, data: Value, r_id: String) -> Self {Self {id,msg :data,room_id: r_id}}}
It is important to describe return type here.
Because it must be the same as the type that get returned after the message is
handled.
The msg field is one of Value from serde_json. So any type that can be formed into a Value type.
Step Four: Defining the Lobby
We need to define Lobby struct. Here is the plain simple Lobby struct.
type Socket = Recipient<WsMessage>;pub struct Lobby {sessions: HashMap<Uuid, Socket>, //self id to selfrooms: HashMap<String, HashSet<Uuid>>, //room id to list of users id}
The socket here is a recipient of WsMessage. So,
when a client connects, we can receive the Recipient< WsMessage
> from the HTTP request and store it in the Server.
Lobby will store the the current sessions using
an Uuid as a key. It will also store the chat rooms using a room Uuid as a key.
We will implement a default method for the server.
impl Default for Lobby {fn default() -> Lobby {Lobby {sessions: HashMap::new(),rooms: HashMap::new(),}}}
And here is the helper that sends a message to a specified client actor.
impl Lobby {fn send_message(&self, message: &str, id_to: &Uuid) {if let Some(socket_recipient) = self.sessions.get(id_to) {let _ = socket_recipient.do_send(WsMessage(message.to_owned()));} else {println!("attempting to send message but couldn't find user id.");}}}
This method takes a string message and an id, if the id is existing send that message to a client with that id. If it does not exist just give an error or print something says that given id does not exist.
Making the lobby an actor
We need to make our lobby an actor because
Server needs to receive messages as an Actor. Now we can pull it in to our
route handlers and send messages to it. Actix will send these messages
asynchronously.
impl Actor for Lobby {type Context = Context<Self>;}
If you noticed, we do not have to look out any
lifecycle of the Lobby. The only launch when the app starts and remove it when
the app closes.
Handling Messages
Our server will get 3 types of messages: Connects, Disconnects and WsMessage from the client actor. All of them come from the WsConn lifecycle methods from the actor trait.
impl Handler<Disconnect> for Lobby {type Result = ();fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {if self.sessions.remove(&msg.id).is_some() {self.rooms.get(&msg.room_id).unwrap().iter().filter(|conn_id| *conn_id.to_owned() != msg.id).for_each(|user_id| self.send_message(&format!("{} disconnected.", &msg.id), user_id));if let Some(lobby) = self.rooms.get_mut(&msg.room_id) {if lobby.len() > 1 {lobby.remove(&msg.id);} else {//only one in the lobby, remove it entirelyself.rooms.remove(&msg.room_id);}}}}}
Here
we are responding Disconnect message in a two different way:
1.remove a single client from a room – remove
its identifier. You can inform other clients. send everyone else X
disconnected!
2.If that client was the last one in the room,
remove the room completely to prevent blocked HashMap
impl Handler<Connect> for Lobby {type Result = ();fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {self.rooms.entry(msg.lobby_id.clone()).or_insert_with(HashSet::new).insert(msg.self_id);self.rooms.get(&msg.lobby_id).unwrap().iter().filter(|conn_id| *conn_id.to_owned() != msg.self_id).for_each(|conn_id| self.send_message(&format!("{} just joined!", msg.self_id), conn_id));self.sessions.insert(msg.self_id,msg.addr,);self.send_message(&format!("your id is {}", msg.self_id), &msg.self_id);}}
We are responding Connect message in this way:
If we cannot found room id,create
new one then add the id to it. Otherwise, inform everyone in the room that new
client just joined then add new socket id to sessions.
impl Handler<BroadcastMessage> for Lobby {type Result = ();fn handle(&mut self, msg: BroadcastMessage, _ctx: &mut Context<Self>) -> Self::Result {if let Some(_socket_recipient) = self.sessions.get(&msg.id) {self.rooms.get(&msg.room_id).unwrap().iter().for_each(|_client| self.send_message(&to_string(&msg).unwrap(), _client));} else {println!("attempting to send message but couldn't find admin id.");}}}
Finally, the lobby listen BroadcastMessage. Clients can send messages to lobby for the lobby to forward to clients.
Final Step: Setting up the Route / Running the Server
I define a route that just has a topic name as a path param. Then, I create a new WebSocketSession with a refrence to the Lobby address. Finally, I upgrade the request to a WebSocket request then I have an open persistant connection.
#[get("/{topic_name}")]pub async fn start_connection(req: HttpRequest,stream: Payload,topic_name: web::Path<String>,srv: Data<Addr<Lobby>>,) -> Result<HttpResponse, Error> {println!("client");let topic_name = topic_name.into_inner();let ws = WebSocketSession::new(topic_name,srv.get_ref().clone(),);let resp = ws::start(ws, &req, stream)?;Ok(resp)}
Then I define second route to broadcast some statistics. I call the web-socket server, converting the statistics into a value, then send it as a BroadcastMessage. Using this route, you can send statistics as an admin to everyone who connect to "dailyNews " room.
#[post("/dailyDashBoard")]pub async fn send_statistics(websocket_srv: Data<Addr<Lobby>>,params: Json<Vec<StatisticRecord>>,) -> Result<HttpResponse, Error> {let _msg =params.into_inner();let msg = BroadcastMessage{id:Uuid::parse_str("470bb217-ffa7-43d8-a0cc-b3d30421d1werfw").unwrap(),msg:json!(_msg) ,room_id: "dailyNews".to_string()};websocket_srv.do_send(msg);return Ok(HttpResponse::Ok().json(()));}
The last step is to register the Lobby as shared data so we can get address of the server. Here is the main class;
To test the socket client, you can use web-socket extension for chrome.
Have fun with web-sockets!
check out repo for the completed tutorial: edayardim/web-socket (github.com)
15 Comments
diannhh9
Women s erotica 17 best erotic novels of all time cosmopolitan https://camcomsex.instasexyblog.com/?jana-macie free porn swinging threesomes orgies gay ghetto white porn kitty costume porn super video porn madison wi free russian loita porn vids
billns69
Porn hd free mobile porn every day porn videos in hd download https://cutebigirls.bestsexyblog.com/?jasmine-elle phote gallery for porn private internet porn image collections rfee young asain porn christina ernst porn ebony porn porn tube
kittykh6
All about idina menzel and taye diggs son walker nathaniel diggs https://hindiadult-freehairyporn.a4ktube.com/?nayeli-kennedi ron jeremy porn tube vintage jizz long sample free teen porn videos amature wife porn tubes free celeb porn clips free teen porn for ipod
janelltu3
Milk anime hentai sexy milk movies with taboo breastmilk https://patients-leipzig.celebrityamateur.com/?tierra-brandi turd fuck porn tube celebrity males on porn porn fidelity jamie 1966 uk porn 2010 hottest porn star
robinot3
Craigslist buffalo jobs apartments for sale servicesmunity https://fayetteville-belotti.tiktok-pornhub.com/?erika-kathleen moblie nasty ebony porn little orphan annie porn free lesbian porn knitting free halo reach porn free itouch porn by streaming
meganxs1
Avatarics avatar the last airbenderics toons pornic https://fumi-big-tits.miyuhot.com/?kaleigh-brandi eskimo porn free wmv porn trailer clip mother insest porn tube iphone porn movie free pictures porn and movies
bobbyos11
Online ankaufsportal f r erotikfilme ab 18 larotika de https://sexyandbeautifulgirl.kanakox.com/?katy-angie medical porn free online video voluptous porn stars pinky porn squirt cjild porn tracy lord porn
claudiaoa5
Hentai and anime sex overwatch porn at toon porn pics https://desivillagegirlphoto-onmyfaceporn.tubered69.com/?juliana-elyssa gold porn tub hot nast porn mobile phone porn real tattooed tina marie porn video free asian sister porn
pamelacx3
Captain marvel xxx an axel braun parody official trailer https://jenniferlawrencehotscene.hotblognetwork.com/?jessie-amanda porn with condom pics tanya tate free porn vids husband videoo wife porn true chichas porn free mobile porn brazzers
kaylazf6
Very nervous and shy wife first time cuckold nunuporn https://gunn-gunn.celebrityamateur.com/?alyssa-macy grandpa and grandma porn indian schoolgirls porn mega porn video stream sylvia saint free porn feminist porn videos
donaldbt3
Revenge porn law in the uk charges sentencing lawtons law https://encode.tiktok-pornhub.com/?anjali-kassidy natvie sex pictures porn the cable guys porn porn fetish video 5 star lingerie thumbnails porn man fucking a teen video porn
jerrypn60
Porntube porn tube mobile porn pornotube you porn youporn https://teenxxxpics-malebondagefemdomtumblr.bloglag.com/?valerie-ellie pv surfer girl porn a list of porn movies strange porn free online free streaming of amuture allure porn porn for the ps3
seanzu7
Straight british mates watch porn and wank together fyff https://congratulations.sexyico.com/?willow-ashly holland free porn porn clips for iphone amature porn italian women streaming phone porn porn movie free message
veronicajp6
Catch the Latest Video Craze: New Content Added Daily in Popular Categories https://golfcartoonimagesfree.bloglag.com/?rita-elizabeth mens food porn ameteur video porn xxx gay porn bare eagles polina free porn clips free hardcore anul porn
marciajx60
Is a president trump a time traveler by spirit science medium https://smallbeautifulporn.danexxx.com/?quinn-america vice softcore porn mp4 free porn video downloads hairy homemade porn opinion of porn amateur mums porn movies