A while ago, Redis released it’s newest version, and with it, they announced a brand new data type available called Streams. Now if you read their documentation, or at least scratched the surface of it (it’s a lot of text to digest), you might’ve seen the similarities with Pub/Sub or even some smart structures like blocking lists.
In this article, I’m going to summarize the documentation from Redis Streams and also, take the opportunity to tell you about Pub/Sub and blocking lists, in case you’re not up to date on them.
Let’s first review the previously existing structures and mechanics. A quick refresher (if you already use them) or a quick introduction to them (if you haven’t before). Don’t get me wrong, they’re very powerful and useful tools to use as part of your developer arsenal, but they’re not the main focus of this article, so I will not go into a lot of details on them.
This is one of my favorite features of Redis and I tend to use it as much as I can because it’s basically a free message queue embedded in a key-value in-memory database. You’re getting two for the price of one.
Now the gist of it is that by using it, Redis gives access to developers to the Publish / Subscribe messaging paradigm which allows you to communicate two or more entities without them knowing about each other.
In other words, you have your “message senders” (also known as publishers) and your “message receivers” (also known as subscribers) and if you want your publishers to communicate with your subscribers you have a few options:
You either communicate them directly:
Which comes with several disadvantages, such as:
You could also go the crazier route and communicate every publisher with every subscriber directly:
Although the above setup solves the problem of crashing subscribers, you increase the complexity on the publisher side, having to know the exact location of every single subscriber. And the overwhelming part is not solved, in fact, now every subscriber is getting more traffic, so it would be even easier to crash them.
Or you could add a buffer-like structure in the middle, allowing publishers to create content as fast as they can, and subscribers to pull content at their own pace. Additionally, letting go of the knowledge of the address of every related subscriber, simplifying the logic and management of the publisher mechanics.
Of course, the above diagram is a very simplified version of the paradigm, but it’s good enough the explain the point.
Some key properties of Pub/Sub that will be relevant down the line are:
Some classic examples of Pub/Sub are:
Now blocking lists are very similar to the above structure (and not to spoil the mystery, but you’ll find that the same is true for streams as well). The main difference though is that this is not really another separate feature Redis comes with, but instead, it’s just a data type (Lists) with specific blocking mechanics that work in our favor. Let me explain.
When working with Lists in Redis, you can add elements to it (either to its head with LPUSH or to its tail with RPUSH) and you can get the top element (with LPOP from the head and RPOP from the tail). By default, the *POP operations aren’t blocking, meaning that if the list has no data, you’ll get an empty result, and that’s the end of it.
But you also have two very useful blocking commands BLPOP and BRPOP, which are the blocking versions of their counterparts LPOP and RPOP. The fact that they’re blocking is essential here because when used with an empty list, the connection from the client to Redis will be blocked until a new element is added. And that blocking mechanics essentially gives you a messaging system you can use.
Essentially, we’re mimicking the same setup as with Pub/Sub, where any number of publishers can send content into a buffer-like structure and a random number of subscribers can listen for new data and react to it. But, as they say, the devil is in the details, so let’s go in a bit deeper:
That’s the gist of it, the key is the blocking mechanics of the POP operation because it’ll keep subscribers waiting until a message is added to the list. The moment that happens, the subscriber that’s been waiting the longest will immediately pop it out for itself.
The main differences between this structure and Pub/Sub are:
Now that I’ve covered the known and existing structures, let’s look at the brand new shining streams.
The main design thinking behind Streams is log files. Which is an interesting thing to say, but hear me out: when you’re inspecting a log file in order to debug your application, you usually do something like this:
$ tail -f yourlogfile.log
That’ll show you the last few lines of your file, letting you know the last few things that happened. Not only that, but the command line will be blocked, since it’ll keep waiting for new lines to be added to your file, showing them immediately once they appear.
So far, it’s sounding a lot like the blocking lists, isn’t it? But as you’re about to learn, Streams implement a couple more complex operations, which give you more control over what you can do with them (compared to blocking lists).
The other main difference between everything we’ve seen so far and Streams, is that instead of just being able to handle string elements, the internal structure of messages is a set of key-value pairs, so your messages can actually have complex structures directly in Redis (instead of being a stringified version of your JSON objects).
The basic operations you can perform on Streams are pushing data into them and getting data out of them.
To publish data into them, you have the XADD command, which is very straightforward:
>XADD yourstreamname *key1 value1 key2 value2
That command would add a structure like the following, into a stream called “yourstreamname”:
{
"key1":"value1,
"key2":"value2"
}
Every message added to the stream will have an internal ID, which is the second argument of the XADD operation. Passing a “*” will let Redis know to auto-generate it for us, and in turn, it’ll return it as a result of our adding operation. You could potentially specify an ID yourself, but for most use cases you won’t need to worry about that and you can just let Redis handle it.
Now, getting data from the stream is where things get interesting. There are two ways you can get data from a Stream structure.
You can do something similar to what we’ve been doing so far using XREAD. This command will let you subscribe to a Stream waiting for new messages to arrive.
>XREAD COUNT 2 STREAMS yourstreamname 0
Or
>XREAD BLOCK 0 STREAMS yourstreamname 0
The first version of the command will return the first two unread messages added to “yourstreamname” that have an ID greater than 0. This version is not blocking though, so as you would expect, if there is nothing new, the command will return no messages.
The second version though is blocking with a timeout of 0 (meaning there is no timeout), so this time, just like with blocking lists, the command will not return until there is a new message added.
A couple of comments:
Using XRANGE and XREVRANGE (this being the same as the other one but with the reverse order in the returned data) you can get a range of messages from the Stream, something impossible to do with the previous structures!
>XRANGE yourstreamname 13213131-0 + COUNT 3
The command is not that complex, the parameters are:
Just so it’s clear, message IDs have two parts, the first one is the local time (in the local Redis node) expressed in milliseconds. The second part though is simply an auto-increment, meant to solve possible collisions when messages are received at the exact same time.
Another big difference and one of the most interesting additions to the publisher/subscriber mechanics added by Streams are the consumer groups.
This is not a new concept, in fact, Kafka has the same concept, although not implemented in the same way.
The main use case for consumer groups is when you want different consumers getting data from the same stream, but at different speeds. In these cases you might want to have messages delivered only to one consumer, essentially working like the blocking lists instead of Pub/Sub.
If you do this, you’re ensured that:
When creating a group, you specify the stream the group belongs to, its name and the top ID, meaning that any ID higher than that one will be added into this group.
To do so, you can use the XGROUP command:
> XGROUP CREATE yourstreamname yourgroup $
The top ID used in the example ( $
), references the last ID, so in the above group, only new messages received after the group was created will be added.
You could potentially create a group like this:
> XGROUP CREATE yourstream historicgroup 0
Making this new group would contain all messages ever received.
The command XREADGROUP can be used to get data off of a group. The syntax is the same as XREAD, but a single added attribute called “GROUP” that receives the group name and the consumer name.
So, it would look something like this:
> XREADGROUP GROUP historicgroup consumer1 BLOCK 0 STREAMS yourstream
The above command, essentially reads (in a blocking manner) from the historicgroup group from the yourstream stream, identifying itself as the consumer1 consumer.
Finally, in order to provide confirmation that you’ve read a message successfully, you need to send the XACK command back to Redis, otherwise, hat message will remain in a pending status.
The attributes required for this operation are the Stream’s name, the group’s name and finally, the message Id:
>XACK yourstream historicgroup 1231241142-0
Not really, actually there is still more to Streams that I haven’t covered. The content in this article though should be enough to understand not only how to use Streams but also, why they were added to the set of data types already in existence.
Feel free to read the full documentation if you want to know more about Streams and everything you can do with them.
Thanks so much for reading and please feel free to leave a comment if you’re already using Streams or if you’re planning on doing so!
Install LogRocket via npm or script tag. LogRocket.init()
must be called client-side, not
server-side
$ npm i --save logrocket // Code: import LogRocket from 'logrocket'; LogRocket.init('app/id');
// Add to your HTML: <script src="https://cdn.lr-ingest.com/LogRocket.min.js"></script> <script>window.LogRocket && window.LogRocket.init('app/id');</script>
Hey there, want to help make our blog better?
Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
Sign up nowLearn how to manage memory leaks in Rust, avoid unsafe behavior, and use tools like weak references to ensure efficient programs.
Bypass anti-bot measures in Node.js with curl-impersonate. Learn how it mimics browsers to overcome bot detection for web scraping.
Handle frontend data discrepancies with eventual consistency using WebSockets, Docker Compose, and practical code examples.
Efficient initializing is crucial to smooth-running websites. One way to optimize that process is through lazy initialization in Rust 1.80.