Denis Badurina I am a self-taught full-stack developer with a distinguishing trait of resiliently finding simple solutions to complex problems using communication through words and code. Starting from my first Lego set, I've been in love with development throughout my whole life. As a creator, having the ability to turn any thought into reality is a gift I find essential. Forever learning through practical applications, bad decisions, and positive thoughts — I ultimately turned a hobby into an obsession.

Using WebSocket subscriptions without queues

7 min read 1960

JavaScript Logo

We live in a connected world and the necessity for real-time has never been greater. There are two main players in the real-time realm: Server-Sent Events (SSE) and WebSockets.

The two are quite similar in terms of allowing the server to push data to the client without having the client poll the server for updates.

However, the main difference is that WebSockets allow the client to do the same thing (send data to the server), while SSE does not. In the SSE world, you issue one request for which the server can reply multiple times.

Both methods have their pros and cons, and, depending on your needs, you might choose one over the other. In this article, we’ll focus on WebSockets.

What are Websockets?

The WebSocket Protocol describes a full-duplex web channel which is frequently used when facing real-time data requirements. It offers a way of establishing an active connection (socket) between the server and the client for low overhead two-way communication, together with well-defined ways for communicating supported languages and fatal errors.

It is widely used for multiplayer games, live finances, online chat, and in many places where real-time collaboration takes place.

You start by sending a protocol update request through HTTP, the server evaluates the request, checks the supported subprotocols (in-socket communication languages), and concludes the upgrade with an 101: Switching Protocols response.

After a successful update, the communication occurring through this active TCP connection follows the WebSocket Protocol. At this point, both the server and the client can send each other messages whenever they wish for the whole duration of the socket connection.

Managing subscriptions through WebSockets

We will focus on the client side, as it is much more complicated than the server, and at the same time, much more interesting.

When designing a WebSocket client, you’re faced with many challenges, such as connecting to servers, reusing existing sockets, handling disconnects, communicating fatal errors, lazy connections, etc.

We made a custom demo for .
No really. Click here to check it out.

However, the WebSocket Protocol helps by abstracting away the low-level channel management intricacies to user-friendly connection imperatives, clear close events, and simple data sending methods.

But, that’s now all you need to manage. Having decided to use WebSockets as your Pub/Sub system you need to also manage: silent reconnects, starting and ending subscriptions, re-subscribing on connection interruptions, propagating errors to subscribers, managing message delivery to appropriate listeners, and much more, depending on your use case.

By adding subscriptions to sockets, you have to manage two layers of “active channels”: the actual network connection socket and the single subscription channels within it.

When designing a subscriptions system, you’ll need to think about queues of active subscribers with a central manager that take care of dispatching messages to appropriate destinations, communicating errors, as well as handle the queue itself by adding or removing listeners when necessary.

This centralized system is what makes subscriptions unnecessarily complicated and hard to maintain.

Using a JavaScript event tool instead

Let’s try something much more simple instead, something like using the “queue” that is built right into the language: the JavaScript event loop.

Here, we block the asynchronous event loop with promises that emit events while pending, have them resolve once the connection/subscription completes, or reject if any problems surface during its lifetime.

This approach radically simplifies the process of maintenance by coercing you to rely on the language primitives. Error handling? Try/catch. Retrying? Put it in a loop. Completing? Simply return/resolve.

Instead of building a queue within a queue, the JS event loop becomes the only queue we need — timing, efficiency, and memory management is now something that we don’t have to think about anymore.

Instead of doing this ❌:

const complete = subscribe({
  onNext: (msg: unknown) => void
  onError: (err: Error) => void,
  onComplete: () => void,
});

Do this ✅:

const [complete: () => void, waitForCompleteOrThrow: Promise<void>] = await subscribe(listener: (msg: unknown) => void);

And use it like this:

const [complete, waitForCompleteOrThrow] = await subscribe((msg) => {
  // handle message
});

// complete/cancel/stop wherever and whenever you want
onLeavePage(cancel);
onClickOnClose(cancel);
onComponentUnmount(cancel);

try {
  await waitForCompleteOrThrow;
  // completed
} catch (err) {
  // handle err
}

Implementing the client

The following code examples are self-explanatory, so please read them with attention and revert to comments for extra understanding.

First, we start by building a connect function, which establishes a proper connection with the server and provides simple means of managing it:

/**
 * A simple WebSocket connect function that resolves once the socket
 * opens and the server acknowledges the connection.
 */
export async function connect(
  url: string,
): Promise<
  [
    socket: WebSocket,
    complete: () => void,
    throwOnCloseOrWaitForComplete: () => Promise<void>,
  ]
> {
  const socket = new WebSocket(url);

  /**
   * For if the socket closes before you start listening
   * for the
   */
  let closed: CloseEvent;

  /**
   * Once promises settle, all following resolve/reject calls will simply
   * be ignored. So, for the sake of simplicity, I wont be unlistening.
   */
  await new Promise<void>((resolve, reject) => {
    /**
     * From: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_client_applications
     * > If an error occurs while attempting to connect, first a simple event
     * > with the name error is sent to the WebSocket object (thereby invoking
     * > its onerror handler), and then the CloseEvent is sent to the WebSocket
     * > object (thereby invoking its onclose handler) to indicate the reason for
     * > the connection's closing.
     *
     * Keeping this in mind, listening to the `onclose` event is sufficient.
     * Close events (code + reason) should be used to communicate any critical
     * problem with the socket.
     */
    socket.onclose = (event) => {
      closed = event;
      reject(event);
    };

    /**
     * Sometimes the socket opens and closes right after, so try relying an
     * acknowledgment message from the server to confirm the connection instead
     * of the `onopen` event.
     */
    socket.onmessage = ({ data }) =>
      data === 'ack' ? resolve() : reject(new Error("Didn't acknowledge!"));
  });

  return [
    socket,
    () => socket.close(1000, 'Normal Closure'), // normal closure is completion
    /**
     * The promise is the state flag. If pending, socket is active; if rejected,
     * socket closed; and if resolved, socket completed.
     */
    () =>
      new Promise<void>((resolve, reject) => {
        const check = (event: CloseEvent) => {
          if (event.code === 1000) {
            resolve();
          } else {
            reject(event);
          }
        };
        if (closed) return check(closed);
        socket.addEventListener('close', check);
      }),
  ];
}

Pretty straight forward, right? But, it feels (and is) too simple. You often want more complicated behaviour, like establishing WebSocket connections only when needed and closing them when you’re done.

Implementing a lazy connect is rather simple, by reusing the connect function:

/**
 * Makes a lazy connect function that establishes a connection
 * on first lock and closes it on last release.
 */
export function makeLazyConnect(
  url: string,
): () => Promise<
  [
    socket: WebSocket,
    release: () => void,
    waitForReleaseOrThrowOnClose: () => Promise<void>,
  ]
> {
  let connecting: ReturnType<typeof connect> | null,
    locks = 0;
  return async function lazyConnect() {
    /**
     * A new lazy connection is established, increment the locks.
     * Once all locks are released, the actual socket connection will
     * complete.
     */
    locks++;

    /**
     * Promises can resolve only once and will return the fullfiled value
     * on each subsequent call. So we simply reuse the connect promise.
     */
    if (!connecting) connecting = connect(url);
    const [socket, complete, throwOnCloseOrWaitForComplete] = await connecting;

    let release = () => {
      /**
       * Release the lazy connect lock. The actual decrementation
       * happens below, in the release waiter. Note that this function
       * will be replaced with the `released` resolve function in the
       * following promise.
       */
    };
    const released = new Promise<void>((resolve) => (release = resolve)).then(
      () => {
        /**
         * Release the lock by decrementing the locks.
         */
        if (--locks === 0) {
          /**
           * If no lazy connection locks exist anymore, complete
           * the actual socket conection.
           */
          complete();
        }
      },
    );

    return [
      socket,
      release,
      () =>
        Promise.race([
          released,
          throwOnCloseOrWaitForComplete()
            /**
             * Complete or close, both close the socket, create
             * a new one on next connect.
             */
            .finally(() => (connecting = null)),
        ]),
    ];
  };
}

Awesome! Now that we leverage lazy connections, the last piece is the actual subscribe function for subscriptions. All elements built before now come together:

/** A globally unique ID used for connecting responses. */
export type ID = number;

/**
 * For starting a subscriptions. Holds the unique ID
 * for connecting future responses.
 */
export interface RequestMsg {
  id: ID;
  request: string;
}

/**
 * The response message for an active subscription. ID would
 * be the same one as requested in the request message.
 */
export interface ResponseMsg {
  id: ID;
  response: string;
}

/**
 * Complete message indicating that the subscription behind
 * the ID is done and will not be emitting further events. Complete
 * message is bi-directional so both the server and the client
 * can complete a subscription.
 */
export interface CompleteMsg {
  complete: ID;
}

/**
 * Isolated, self sustained, unit that has all the necessary logic built
 * right in. It establishes a lazy connection with the configured server,
 * silently retries on abrupt closures, generates unique subscription IDs,
 * dispatches relevant messages to the listener, offers a stop method (complete)
 * which closes the lazy connection on last unsubscribe and a promise that resolves
 * on completions and rejects on possible problems that might occur with the socket.
 */
let currId = 0;
export function subscribe(
  connect: ReturnType<typeof makeLazyConnect>,
  request: string,
  listener: (response: string) => void,
): [complete: () => void, waitForCompleteOrThrow: Promise<void>] {
  /**
   * A reference to the completer which will be replaced with a new
   * complete function once the connection is established and the
   * subscription is requested. If the user completes the subscription
   * early (before having connected), the `completed` flag is used
   * to release the connection lock ASAP.
   */
  let completed = false;
  const completerRef = {
    current: () => {
      /** For handling early completions. */
      completed = true;
    },
  };

  const waitForCompleteOrThrow = (async () => {
    for (;;) {
      try {
        const [socket, release, waitForReleaseOrThrowOnClose] = await connect();

        /**
         * If the user completed the subscription before the connection,
         * release it right away - we dont need it.
         */
        if (completed) return release();

        /**
         * Subscribe and listen...
         */
        const id = currId++;
        socket.send(JSON.stringify({ id, request } as RequestMsg));
        const onMessage = ({ data }: MessageEvent) => {
          const msg = JSON.parse(data) as ResponseMsg | CompleteMsg;
          if ('complete' in msg && msg.complete === id) {
            release();
          } else if ('id' in msg && msg.id === id) {
            listener(msg.response);
          }
        };
        socket.addEventListener('message', onMessage);

        /**
         * Assign a new completer which notifies the server that we are
         * done with the subscription, removes the socket message listener
         * and releases the lazy connection lock.
         */
        completerRef.current = () => {
          socket.send(JSON.stringify({ complete: id } as CompleteMsg));
          release();
        };

        /**
         * Completing the subscription releases the connection lock,
         * waiting for the release is the same as waiting for the complete.
         */
        await waitForReleaseOrThrowOnClose();
        socket.removeEventListener('message', onMessage);
        return;
      } catch (err) {
        if ('code' in err && err.code === 1006) {
          /**
           * Its completely up to you when you want to retry, I've chosen
           * to retry on the CloseEvent code 1006 as it is used when the
           * socket connection closes abruptly (for example: due to client
           * network issues).
           */
          continue;
        } else {
          /**
           * All other errors are considered fatal, rethrow them to break
           * the loop and report to the caller.
           */
          throw err;
        }
      }
    }
  })();

  return [() => completerRef.current(), waitForCompleteOrThrow];
}

What we end up with is an isolated, self-sustained unit that has all the necessary logic built right in.

The subscribe function establishes a lazy connection with the configured server, silently retries on abrupt closures, generates unique subscription IDs, dispatches relevant messages to the listeners, offers a stop method (complete) that closes the lazy connection on the last unsubscribe, and a promise that resolves on completions and rejects on possible problems that might occur with the socket.

Conclusion

It really is that simple! With just a handful lines of code, you are able to implement a resilient subscriptions client that uses the WebSocket Protocol as the transport layer.

Further improvements and solutions are easy to add, the logic is easy to understand, and the code does not strain your eyes. Furthermore, the same idiomatics can be applied on the server-side to increase stability and reduce complexity.

You can see the code from this article in action.

Thanks for reading, and I hope you found this article helpful with your real-time endeavours! 👋

P.S. These simple ideas and conventions are what helped with bringing graphql-ws to life.

If you are interested in how all this can be applied in a fairly complex environment, you’ll find its client-side implementation fairly interesting.

: Debug JavaScript errors easier by understanding the context

Debugging code is always a tedious task. But the more you understand your errors the easier it is to fix them.

LogRocket allows you to understand these errors in new and unique ways. Our frontend monitoring solution tracks user engagement with your JavaScript frontends to give you the ability to find out exactly what the user did that led to an error.

LogRocket records console logs, page load times, stacktraces, slow network requests/responses with headers + bodies, browser metadata, and custom logs. Understanding the impact of your JavaScript code will never be easier!

.
Denis Badurina I am a self-taught full-stack developer with a distinguishing trait of resiliently finding simple solutions to complex problems using communication through words and code. Starting from my first Lego set, I've been in love with development throughout my whole life. As a creator, having the ability to turn any thought into reality is a gift I find essential. Forever learning through practical applications, bad decisions, and positive thoughts — I ultimately turned a hobby into an obsession.

Leave a Reply