Mario Zupan I'm a software developer originally from Graz but living in Vienna, Austria. I previously worked as a full-stack web developer before quitting my job to work as a freelancer and explore open source. Currently, I work at timeular.

How to build a WebSocket server with Rust

10 min read 2860

How to Build a Websocket Server in Rust

In a previous post, we covered how to create an async CRUD web service using the warp web framework. Another cool thing about warp is that it supports WebSockets.

In this tutorial, we’ll demonstrate how to build a basic message relay service that clients can connect to via WebSockets.

After a client registers with their user_id, they get a unique connection ID they can use to connect via WebSockets. They are then able to receive real-time messages, which are published on the service.

Clients can also communicate topics they are interested in via the WebSocket connection. For example, if they subscribe to the topics “cats” and “dogs,” they’ll only get published messages tagged with those topics. Published messages can be addressed directly to a specific user_id or broadcast to all users.

Setup

To follow along, all you need is a reasonably recent Rust installation (1.39+). A tool to test WebSockets connections, such as websocket.org or websocat, and a tool to send HTTP requests, such as curl or Postman, will also be useful.

First, create a new Rust project.

cargo new warp-ws-example
cd warp-ws-example

Next, edit the Cargo.toml file and add the dependencies you’ll need.

[dependencies]
tokio = { version = "0.2", features = ["macros", "sync"] }
warp = "0.2"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = { version = "0.3", default-features = false }
uuid = { version = "0.4", features = ["serde", "v4"] }

There is nothing particularly surprising here. We need warp and Tokio to run the web server and Serde to serialize and deserialize JSON. The uuid library will be used to create the connection ID and the futures library will be useful when dealing with the asynchronous data streams of the WebSocket.

Data structures

Before we get started, let’s look at some of the data structures we’ll use to get some more context.

First of all, the Client is at the core of this application.

We made a custom demo for .
No really. Click here to check it out.

pub struct Client {
  pub user_id: usize,
  pub topics: Vec<String>,
  pub sender: Option<mpsc::UnboundedSender<std::result::Result<Message, warp::Error>>>,
}

There is a difference between a client and a user in this case. A user can have several clients — think of the same user connecting to the API using a mobile app and a web app, for example.

Every client has a user_id, a list of topics they’re interested in, and a sender. This sender is the sending part of an MPSC (multiple producer, single consumer) channel. We’ll get to the sender later on, but suffice it to say for now that it’s used to send messages to this connected client via WebSockets.

The next data structures are used in the REST API for registering users and for broadcasting events.

pub struct RegisterRequest {
  user_id: usize,
}

pub struct RegisterResponse {
  url: String,
}

pub struct Event {
  topic: String,
  user_id: Option<usize>,
  message: String,
}

These are not particularly interesting. As mentioned above, events are tagged with a specific topic and can be addressed directly to a user, in which case the user_id will be set explicitly.

Last but not least, we need a way for clients to communicate the topics they’re interested in. If they don’t set the topics explicitly, they’ll be defaulted to cats — because who doesn’t love those?

pub struct TopicsRequest {
  topics: Vec<String>,
}

Getting the server up and running

Now that you have a mental model of what we’re going to build, let’s start by spinning up a warp web server with all the routes we’ll need.

mod handler;
mod ws;

type Result<T> = std::result::Result<T, Rejection>;
type Clients = Arc<Mutex<HashMap<String, Client>>>;

#[tokio::main]
async fn main() {
  let clients: Clients = Arc::new(Mutex::new(HashMap::new()));

  let health_route = warp::path!("health").and_then(handler::health_handler);

  let register = warp::path("register");
  let register_routes = register
    .and(warp::post())
    .and(warp::body::json())
    .and(with_clients(clients.clone()))
    .and_then(handler::register_handler)
    .or(register
      .and(warp::delete())
      .and(warp::path::param())
      .and(with_clients(clients.clone()))
      .and_then(handler::unregister_handler));

  let publish = warp::path!("publish")
    .and(warp::body::json())
    .and(with_clients(clients.clone()))
    .and_then(handler::publish_handler);

  let ws_route = warp::path("ws")
    .and(warp::ws())
    .and(warp::path::param())
    .and(with_clients(clients.clone()))
    .and_then(handler::ws_handler);

  let routes = health_route
    .or(register_routes)
    .or(ws_route)
    .or(publish)
    .with(warp::cors().allow_any_origin());

  warp::serve(routes).run(([127, 0, 0, 1], 8000)).await;
}

fn with_clients(clients: Clients) -> impl Filter<Extract = (Clients,), Error = Infallible> + Clone {
  warp::any().map(move || clients.clone())
}

That’s quite a bit of code. Let’s go through it step by step.

As mentioned above, we want clients to connect via WebSockets to our service. To accommodate this, we need a way to keep track of these clients within the service.

We can solve this in many ways, but in this simple case, we’ll simply keep them around in memory — more specifically, within a HashMap. However, because this collection of clients needs to be accessed and mutated by several actors throughout the system (e.g., registering new clients, sending messages, updating topics, etc.), we need to make sure it can be safely passed around between threads and concurrently modified without issues.

That’s why the Clients types is the first thing we defined above — an Arc<Mutex<HashMap<String, Client>>>. This type may look scary, but essentially, we just want to have the map of connection IDs to clients behind a Mutex so it can only be accessed by one part of the code at any given time. To safely pass it to other threads, we wrap it into an Arc, which is an atomic smart pointer type that provides shared ownership in a thread-safe way.

So far, so good. In the main function, this Clients data structure is initialized, followed by a handful of route definitions:

  • GET /health — Indicates whether the service is up
  • POST /register — Registers clients in the application
  • DELETE /register/{client_id} — Unregisters the client with the a ID
  • POST /publish — Broadcasts an event to clients
  • GET /ws — The WebSocket endpoint

The with_clients filter is created to pass the clients into these routes. This clones and passes a pointer to the clients into the routes using them.

Besides that, all handlers (except for the WebSockets one) are pretty basic. For the /ws route, the warp::ws() filter is used, which makes it possible to upgrade the connection to a WebSocket connection in the handler.

At the end of main, the routes are combined into a router with CORS support and the server is started on port 8000.

Client registration

Now that the server is set up, let’s look at the handlers for the routes defined above, starting with client registration.

To make the code a bit nicer to read, let’s put the handlers into a different file, called handler.rs.

Let’s start with registering a new client, where a JSON body with a user_id is sent to the service.

pub async fn register_handler(body: RegisterRequest, clients: Clients) -> Result<impl Reply> {
  let user_id = body.user_id;
  let uuid = Uuid::new_v4().simple().to_string();

  register_client(uuid.clone(), user_id, clients).await;
  Ok(json(&RegisterResponse {
    url: format!("ws://127.0.0.1:8000/ws/{}", uuid),
  }))
}

async fn register_client(id: String, user_id: usize, clients: Clients) {
  clients.lock().await.insert(
    id,
    Client {
      user_id,
      topics: vec![String::from("cats")],
      sender: None,
    },
  );
}

The process of registering a new client is simple.

First, a new uuid is created. This ID is used to create a new Client with an empty sender, the user’s ID, and default topics. These are simply added to the clients data structure, returning a WebSocket URL with the uuid to the user. The user can connect the client via WebSockets with this URL.

To add the newly created client to the shared clients structure, we need to lock() the Mutex. Since we’re using Tokio’s asynchronous Mutex in this case, this is a future and should, therefore, be awaited.

After the lock is acquired, simply insert() the new client to the underlying HashMap.

Once the lock goes out of scope, it’s dropped and others can once again access the data structure.

Great! We can call the register endpoint like this:

curl -X POST 'http://localhost:8000/register' -H 'Content-Type: application/json' -d '{ "user_id": 1 }'
{"url":"ws://127.0.0.1:8000/ws/625ac78b88e047a1bc7b3f8459702078"}

Unregistering clients is even easier.

pub async fn unregister_handler(id: String, clients: Clients) -> Result<impl Reply> {
  clients.lock().await.remove(&id);
  Ok(StatusCode::OK)
}

The client with the given ID (the above-generated uuid) is simply removed from the Clients data structure.

You might ask yourself, “What happens if you’re already connected via WebSockets using this ID?” They’re simply disconnected, and everything is closed and cleaned up on the side of the service.

This works because removing the Client makes it go out of scope, which means it gets dropped. This, in turn, drops the sending side of the channel within the client, which closes the channel and triggers an error. As we’ll see later on, this is a signal we can use to close the connection on our side.

Calling unregister works like this:

curl -X DELETE 'http://localhost:8000/register/e2fa90682255472b9221709566dbceba'

Connecting via WebSocket

Now that clients can register and unregister, it’s time to let them connect to our real-time WebSocket endpoint.

Let’s start with the handler.

pub async fn ws_handler(ws: warp::ws::Ws, id: String, clients: Clients) -> Result<impl Reply> {
  let client = clients.lock().await.get(&id).cloned();
  match client {
    Some(c) => Ok(ws.on_upgrade(move |socket| ws::client_connection(socket, id, clients, c))),
    None => Err(warp::reject::not_found()),
  }
}

First, the given client ID is checked against the Clients data structure. If no such client exists, a 404 error is returned. If a client is found, ws.on_upgrade() is used to upgrade the connection to a WebSocket connection, where the ws::client_connection function is called.

pub async fn client_connection(ws: WebSocket, id: String, clients: Clients, mut client: Client) {
  let (client_ws_sender, mut client_ws_rcv) = ws.split();
  let (client_sender, client_rcv) = mpsc::unbounded_channel();

  tokio::task::spawn(client_rcv.forward(client_ws_sender).map(|result| {
    if let Err(e) = result {
      eprintln!("error sending websocket msg: {}", e);
    }
  }));

This is the core part of the WebSockets logic, so let’s step through it slowly. The function gets a warp::ws::WebSocket passed into it by the warp::ws filter. You can loosely consider this the upgraded WebSocket connection and an asynchronous Stream and Sink.

The split() function of futures::StreamExt splits this up into a stream and a sink, which can be considered a sender and a receiver.

Next, create an unbounded MPSC channel to send messages to the client. Also, if you remember the sender on the Client object, the client_sender is exactly this sender part of the channel.

The next step is to spawn a Tokio task in which the messages sent to the receiver part of the client (client_rcv) are propagated to the sender part of the WebSocket stream (client_ws_sender) using futures::StreamExt::forward(). If the message send fails, we log the error. In a different scenario it could also be feasible to close the connection at this point, depending on the error.

The next step is to update the client with the newly created sender.

client.sender = Some(client_sender);
clients.lock().await.insert(id.clone(), client);

println!("{} connected", id);

The passed-in client gets the sender part of the sending channel and the data structure is updated. From this point on, if someone sends anything into this sender end of the channel, it will be forwarded to the client via WebSocket, so we log the client as connected.

Now we can send to a connected client, but we also want to be able to receive data on the WebSocket. First of all, the client should have the ability to send us pings to check whether the connection is healthy, but we also want to enable clients to change their preferred topics via WebSocket.

while let Some(result) = client_ws_rcv.next().await {
  let msg = match result {
    Ok(msg) => msg,
    Err(e) => {
      eprintln!("error receiving ws message for id: {}): {}", id.clone(), e);
      break;
      }
  };
  client_msg(&id, msg, &clients).await;
}

clients.lock().await.remove(&id);
  println!("{} disconnected", id);
}

To receive messages from the WebSocket, we can use the receiver end (Stream) of the WebSocket. As with any other async stream, we can simply wait for values using .next().await in a loop.

When a message is received, that message is forwarded to the client_msg function, which we’ll look at next. But, maybe more interestingly, if an error happens, that error is logged and we break out of the loop, which leads to the end of the function.

The only way to get here is if there is an error. In that case, we want to close the connection and remove the client from the shared data structure.

This is also where we’ll end up if a client is unregistered with a running connection. The only difference is that the client will already have been removed, triggering the error on the connection.

To finish the WebSockets part, let’s look at the client_msg function, which deals with incoming messages from the client.

async fn client_msg(id: &str, msg: Message, clients: &Clients) {
  println!("received message from {}: {:?}", id, msg);
  let message = match msg.to_str() {
    Ok(v) => v,
    Err(_) => return,
  };

  if message == "ping" || message == "ping\n" {
    return;
  }

  let topics_req: TopicsRequest = match from_str(&message) {
    Ok(v) => v,
    Err(e) => {
      eprintln!("error while parsing message to topics request: {}", e);
      return;
    }
  };

  let mut locked = clients.lock().await;
  match locked.get_mut(id) {
    Some(v) => {
      v.topics = topics_req.topics;
    }
    None => return,
  };
}

First, the incoming message is logged, giving us a way to see what is coming in for testing. Next, the message is converted to a string. If it can’t be, we bail since we’re only interested in string messages.

As mentioned earlier, clients should be able to send us pings, so if the message is “ping”, we simply return. We could also send back a “pong,” or whatever we want to use in this case.

The other kind of messages we’re interested in are TopicsRequests, which are sent when a client wants to change their preferred topics. To do that, the client has to send some JSON to the WebSocket, which is then parsed to a list of topics. This list is then updated within the client’s data structure. From that point on, the client will only get messages according to their new topic preferences.

Nice! WebSockets are now available for clients in the service. To test this, either use a site such as websocket.org or a tool such as websocat.

websocat -t ws://127.0.0.1:8000/ws/2fd7f6e2e3294057b714c6c1c5aa827d
{ "topics": ["cats", "dogs"] }

Relaying messages to clients

Almost done! The only piece of the puzzle we’re still missing is the ability to broadcast messages to connected clients. This is done using the /publish endpoint.

pub async fn publish_handler(body: Event, clients: Clients) -> Result<impl Reply> {
  clients
    .lock()
    .await
    .iter_mut()
    .filter(|(_, client)| match body.user_id {
      Some(v) => client.user_id == v,
      None => true,
    })
    .filter(|(_, client)| client.topics.contains(&body.topic))
    .for_each(|(_, client)| {
      if let Some(sender) = &client.sender {
        let _ = sender.send(Ok(Message::text(body.message.clone())));
      }
    });

  Ok(StatusCode::OK)
}

When anyone wants to broadcast a message to clients, we have to iterate the client’s data structure iff a user_id is set, filtering out all clients that are not the specified user.

We’re only interested in clients that are subscribed to the topic of the message. We use each client’s sender to transmit the message down the pipeline.

The publish endpoint can be called like this:

curl -X POST 'http://localhost:8000/publish' \
-H 'Content-Type: application/json' \
-d '{"user_id": 1, "topic": "cats", "message": "are awesome"}'

This message will be sent to the connected clients with a user_id of 1, subscribed to the topic cats.

Perfect! Everything we set out to build is done and works nicely.

You can find the full code for this example on GitHub.

Conclusion

WebSockets are fantastic, both for interactive, real-time web-experiences and in combination with REST APIs to update the UI without the need for clients to poll for changes.

warp makes WebSockets easy to use, with the caveat that depending on the use case, some background knowledge of asynchronous streams and concurrency in Rust is required. But since those are very useful skills within the area of Rust web development in general, that seems reasonable enough.

: Full visibility into your web apps

LogRocket is a frontend application monitoring solution that lets you replay problems as if they happened in your own browser. Instead of guessing why errors happen, or asking users for screenshots and log dumps, LogRocket lets you replay the session to quickly understand what went wrong. It works perfectly with any app, regardless of framework, and has plugins to log additional context from Redux, Vuex, and @ngrx/store.

In addition to logging Redux actions and state, LogRocket records console logs, JavaScript errors, stacktraces, network requests/responses with headers + bodies, browser metadata, and custom logs. It also instruments the DOM to record the HTML and CSS on the page, recreating pixel-perfect videos of even the most complex single-page apps.

.
Mario Zupan I'm a software developer originally from Graz but living in Vienna, Austria. I previously worked as a full-stack web developer before quitting my job to work as a freelancer and explore open source. Currently, I work at timeular.

Leave a Reply