Now Hiring: Are you a driven and motivated 1st Line IT Support Engineer?

March 15, 2022

WebSockets in Rust Actix Framework

WHAT is Web-Socket

The Web Socket API is a cutting-edge technology, providing full duplex communication between the user’s browser and a server. With this API, you can send a message to a server and receive a message over a single TCP connection. It provides easier real-time data transfer because server can send content to you without any request from you!


Advantages of Web Socket

Bidirectional

In HTTP, request is initiated by the client, then the response is operated and returned to the client by the server. In web-socket client and server do not need pre-defined request/response scheme – except handshake. After handshake, either the client or server can send a message.

 

Full Duplex

In HTTP, at the same time, either client can send a request, or the server can send a response to the client. Full duplex communication allows that server and client can talk to each other separately.

 

Single TCP Connection

In general, a new TCP connection is created for an HTTP request, after getting response this connection terminated. In Web Socket, the connection is kept alive over the same TCP for the lifecycle of either the client or server.



HOW is Web-Socket

Using Rust's Actix framework, we can work with a web socket server. Moreover, we can handle requests and response using Actix’s actor system. At the end of the project, we are going to create an HTTP endpoint for clients to connect to the web-socket within the topic they want to join.

I will explain step by step how to implement in Actix.


First step:

Define a struct for Web Socket object. 

pub struct WebSocketSession  {
    room: String,
    lobby_addr: Addr<Lobby>,
    hb: Instant,
    id: Uuid,
}

Let me explain meaning of fields


  • Room: This uuid shows room id. I will implement a ‘rooms’. The rooms will be just a HashMap that maps each room Uuid to List of Socket Ids. Because each socket exists in a room.
  • Lobby_addr: This address shows where the lobby is. We need this id because each socket’s room exist in a lobby. We need this property because it is used to send data to lobby.
  • Hb: This time shows when we receive the last heartbeat. Sometimes connection lost without any warning, so we should handle this situation. Having the actor forever is unnecessary. So, we send a heartbeat to socket every specified N second, if we do not get any response, we terminate the actor.
  • Id: this is the id of socket. Assigning an id to each socket is helpful in many cases. Especially, if we want to send private message to specific socket.

    Step Two: The Web-Socket Actor

    So far, WebSocketSession is just a simple struct. We need to convert this struct into an actor. Let me implement the Actor trait on it.

    Here is the implementation:

    impl Actor for WebSocketSession  {
        type Context = ws::WebsocketContext<Self>;

        fn started(&mut self, ctx: &mut Self::Context) {
            self.hb(ctx);

            let addr = ctx.address();
            self.lobby_addr
                .send(Connect {
                    addr: addr.recipient(),
                    lobby_id: self.room.clone(),
                    self_id: self.id,
                })
                .into_actor(self)
                .then(|res, _, ctx| {
                    match res {
                        Ok(_res) => (),
                        _ => ctx.stop(),
                    }
                    fut::ready(())
                })
                .wait(ctx);
        }

        fn stopping(&mut self, _: &mut Self::Context) -> Running {
            self.lobby_addr.do_send(Disconnect { id: self.id, room_id: self.room.clone() });
            Running::Stop
        }
    }


    Since we are creating WebSocket actor we need a WebSocket context. So context is declared as ws::WebsocketContext. This context allowed us to do WebSocket works. To create and close the Actor we should have started and stopping methods.

    Started function is called when the actor starts up. In started function, we run the heartbeat function first. We will see this function in a bit, but basically it will automatically close the socket if the heartbeat is not echo’d.

    Then we take the addres of the lobby because we send a Connect message to that lobby. This message also includes id and address of the socket. I will show how lobby handle this connect message. If anything goes in wrong way, we just stop the whole Actor with ctx.stop. 

    In stopping function, we send a Disconnect message to the lobby. If you noticed, do_send is used here. This allows us to send message synchronously. We do not care if message has been sent or read. As you seen in started method, send needs to be awaited.

    WebSocketSession is now an Actor!

    Here’s the helper heartbeat method

    impl WebSocketSession  {
        fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
            ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
                if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                    println!("Disconnecting failed heartbeat");
                    act.lobby_addr.do_send(Disconnect { id: act.id, room_id: act.room.clone() });
                    ctx.stop();
                    return;
                }
                ctx.ping(b"hi");
            });
        }
    }


    This method pings the client and wait for a response on a specified interval. If it does not get response, send a Disconnect message to the lobby. Then lobby clears this session and socket died.


    How to handle WS messages

    We need to implement the StreamHandler trait that will allow us to process an events stream coming into the actor. This pattern matches on all possible WebSocket events.

    impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketSession  {
        fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
            match msg {
                Ok(ws::Message::Ping(msg)) => {
                    self.hb = Instant::now();
                    ctx.pong(&msg);
                }
                Ok(ws::Message::Pong(_)) => {
                    self.hb = Instant::now();
                }
                Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
                Ok(ws::Message::Close(reason)) => {
                    ctx.close(reason);
                    ctx.stop();
                }
                Ok(ws::Message::Continuation(_)) => {
                    ctx.stop();
                }
                Ok(ws::Message::Nop) => (),
                Ok(Text(s)) => self.lobby_addr.do_send(BroadcastMessage {
                    id: self.id,
                    msg: serde_json::Value::String(s),
                    room_id: self.room.clone()
                }),
               
                Err(e) => panic!("{}", e),
            }
        }
    }

    The ping and pong events are basic sending of data to validate the connection is still alive. Since we know they are alive, reset the heartbeat to the current timestamp. If it is a close request, just close and stop the context. On a text message, send it to the lobby. The lobby will figure out to where it needs to go.


    Responding to text messages

    impl Handler<WsMessage> for WebSocketSession  {
        type Result = ();

        fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
            ctx.text(msg.0);
        }
    }

    If the server puts a WsMessage into our mailbox, we just pass the message String to the actor context as a text message. We forward the messages from the server to the client in this way.

    This is all about  WebSocketSession  for this project.


    Step Three: Defining Messages for our Mailboxes

    Here is the message structs we used in this project

    use actix::prelude::{Message, Recipient};
    use uuid::Uuid;
    use serde_json::Value;
    use serde::{Deserialize, Serialize};

    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct WsMessage(pub String);

    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct Connect {
        pub addr: Recipient<WsMessage>,
        pub lobby_id: String,
        pub self_id: Uuid,
    }

    #[derive(Message)]
    #[rtype(result = "()")]
    pub struct Disconnect {
        pub id: Uuid,
        pub room_id: String,
    }

    #[derive(Message, Deserialize, Serialize, Clone)]
    #[rtype(result = "()")]
    pub struct BroadcastMessage {
        pub id: Uuid,
        pub msg: Value,
        pub room_id: String
    }

    impl BroadcastMessage {
        pub fn new(id: Uuid, data: Value, r_id: String) -> Self {
            Self {
                id,
                msg :data,
                room_id: r_id
            }
        }
    }

    It is important to describe return type here. Because it must be the same as the type that get returned after the message is handled.

    The msg  field is one of Value from serde_json. So any type that can be formed  into a Value type.

    Step Four: Defining the Lobby

    We need to define Lobby struct. Here is the plain simple Lobby struct.


    type Socket = Recipient<WsMessage>;

    pub struct Lobby {
        sessions: HashMap<Uuid, Socket>, //self id to self
        rooms: HashMap<String, HashSet<Uuid>>,      //room id  to list of users id
    }


    The socket here is a recipient of WsMessage. So, when a client connects, we can receive the Recipient< WsMessage > from the HTTP request and store it in the Server.

    Lobby will store the the current sessions using an Uuid as a key. It will also store the chat rooms using a room Uuid as a key.

     We will implement a default method for the server.

    impl Default for Lobby {
        fn default() -> Lobby {
            Lobby {
                sessions: HashMap::new(),
                rooms: HashMap::new(),
            }
        }
    }


    And here is the helper that sends a message to a specified client actor.

    impl Lobby {
        fn send_message(&self, message: &str, id_to: &Uuid) {
            if let Some(socket_recipient) = self.sessions.get(id_to) {
                let _ = socket_recipient
                    .do_send(WsMessage(message.to_owned()));
            } else {
                println!("attempting to send message but couldn't find user id.");
            }
        }
    }


    This method takes a string message and an id, if the id is existing send that message to a client with that id. If it does not exist just give an error or print something says that given id does not exist.

    Making the lobby an actor

    We need to make our lobby an actor because Server needs to receive messages as an Actor. Now we can pull it in to our route handlers and send messages to it. Actix will send these messages asynchronously.

    impl Actor for Lobby {
        type Context = Context<Self>;
    }

    If you noticed, we do not have to look out any lifecycle of the Lobby. The only launch when the app starts and remove it when the app closes. 


    Handling Messages

    Our server will get 3 types of messages: Connects, Disconnects and WsMessage from the client actor. All of them come from the WsConn lifecycle methods from the actor trait.

    impl Handler<Disconnect> for Lobby {
        type Result = ();

        fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
            if self.sessions.remove(&msg.id).is_some() {
                self.rooms
                    .get(&msg.room_id)
                    .unwrap()
                    .iter()
                    .filter(|conn_id| *conn_id.to_owned() != msg.id)
                    .for_each(|user_id| self.send_message(&format!("{} disconnected.", &msg.id), user_id));
                if let Some(lobby) = self.rooms.get_mut(&msg.room_id) {
                    if lobby.len() > 1 {
                        lobby.remove(&msg.id);
                    } else {
                        //only one in the lobby, remove it entirely
                        self.rooms.remove(&msg.room_id);
                    }
                }
            }
        }
    }

    Here we are responding Disconnect message in a two different way:

    1.remove a single client from a room – remove its identifier. You can inform other clients. send everyone else X disconnected!

    2.If that client was the last one in the room, remove the room completely to prevent blocked HashMap

    impl Handler<Connect> for Lobby {
        type Result = ();

        fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
            self.rooms
                .entry(msg.lobby_id.clone())
                .or_insert_with(HashSet::new).insert(msg.self_id);

            self
                .rooms
                .get(&msg.lobby_id)
                .unwrap()
                .iter()
                .filter(|conn_id| *conn_id.to_owned() != msg.self_id)
                .for_each(|conn_id| self.send_message(&format!("{} just joined!", msg.self_id), conn_id));

            self.sessions.insert(
                msg.self_id,
                msg.addr,
            );

            self.send_message(&format!("your id is {}", msg.self_id), &msg.self_id);
        }
    }


    We are responding Connect message in this way:

    If we cannot found room id,create new one then add the id to it. Otherwise, inform everyone in the room that new client just joined then add new socket id to sessions.

    impl Handler<BroadcastMessage> for Lobby {
        type Result = ();

        fn handle(&mut self, msg: BroadcastMessage, _ctx: &mut Context<Self>) -> Self::Result {

            if let Some(_socket_recipient) = self.sessions.get(&msg.id) {
                self.rooms.get(&msg.room_id).unwrap().iter().for_each(|_client| self.send_message(&to_string(&msg).unwrap(), _client));
            } else {
                println!("attempting to send message but couldn't find admin id.");
            }
        }
    }


    Finally, the lobby listen BroadcastMessage. Clients can send messages to lobby for the lobby to forward to clients.


    Final Step: Setting up the Route / Running the Server

    I define a route that just has a topic name  as a path param. Then, I create a new WebSocketSession with a refrence to the Lobby address. Finally, I upgrade the request to a WebSocket request then I have an open persistant connection.

    #[get("/{topic_name}")]
    pub async fn start_connection(
        req: HttpRequest,
        stream: Payload,
        topic_name: web::Path<String>,
        srv: Data<Addr<Lobby>>,
    ) -> Result<HttpResponse, Error> {

        println!("client");

        let topic_name = topic_name.into_inner();
        let ws = WebSocketSession::new(
            topic_name,
            srv.get_ref().clone(),
        );
        let resp = ws::start(ws, &req, stream)?;
        Ok(resp)
    }


    Then I define second route to broadcast some statistics. I call the web-socket server, converting the statistics into a value, then send it as a BroadcastMessage. Using this route, you can send statistics as an admin to everyone who connect to "dailyNews " room.


    #[post("/dailyDashBoard")]
    pub async fn send_statistics(
        websocket_srv: Data<Addr<Lobby>>,
        params: Json<Vec<StatisticRecord>>,
    ) -> Result<HttpResponse, Error> {

        let _msg =params.into_inner();
        let msg = BroadcastMessage{
            id:Uuid::parse_str("470bb217-ffa7-43d8-a0cc-b3d30421d1werfw").unwrap(),
            msg:json!(_msg) ,
            room_id:  "dailyNews".to_string()
        };
        websocket_srv.do_send(msg);
        return  Ok(HttpResponse::Ok().json(()));
    }

    The last step is to register the Lobby as shared data so we can get address of the server. Here is the main class;



    #[actix_web::main]
    async fn main() -> std::io::Result<()> {
        let chat_server = Lobby::default().start(); //create and spin up a lobby

        HttpServer::new(move || {
            App::new()
                .service(start_connection_route)
                .service(send_statistics) //register our route. rename with "as" import or naming conflict
                .data(chat_server.clone()) //register the lobby
        })
        .bind("127.0.0.1:8081")?
        .run()
        .await
    }

    To test the socket client, you can use web-socket extension for chrome.


    Have fun with web-sockets!



    check out repo for the completed tutorial: edayardim/web-socket (github.com)


    54 Comments
    Leave a Comment
    captcha