A while ago, Redis released it’s newest version, and with it, they announced a brand new data type available called Streams. Now if you read their documentation, or at least scratched the surface of it (it’s a lot of text to digest), you might’ve seen the similarities with Pub/Sub or even some smart structures like blocking lists.
In this article, I’m going to summarize the documentation from Redis Streams and also, take the opportunity to tell you about Pub/Sub and blocking lists, in case you’re not up to date on them.
A quick overview of the past
Let’s first review the previously existing structures and mechanics. A quick refresher (if you already use them) or a quick introduction to them (if you haven’t before). Don’t get me wrong, they’re very powerful and useful tools to use as part of your developer arsenal, but they’re not the main focus of this article, so I will not go into a lot of details on them.
This is one of my favorite features of Redis and I tend to use it as much as I can because it’s basically a free message queue embedded in a key-value in-memory database. You’re getting two for the price of one.
Now the gist of it is that by using it, Redis gives access to developers to the Publish / Subscribe messaging paradigm which allows you to communicate two or more entities without them knowing about each other.
In other words, you have your “message senders” (also known as publishers) and your “message receivers” (also known as subscribers) and if you want your publishers to communicate with your subscribers you have a few options:
You either communicate them directly:
Which comes with several disadvantages, such as:
- Your publishers lose their subscriber if it crashes and suddenly can’t produce anything else
- The publishers need to know the exact address of their subscribers, which would not work when subscribers have dynamic addresses
- Publishers could potentially overwhelm subscribers if they produce faster than the latter can process
You could also go the crazier route and communicate every publisher with every subscriber directly:
Although the above setup solves the problem of crashing subscribers, you increase the complexity on the publisher side, having to know the exact location of every single subscriber. And the overwhelming part is not solved, in fact, now every subscriber is getting more traffic, so it would be even easier to crash them.
Or you could add a buffer-like structure in the middle, allowing publishers to create content as fast as they can, and subscribers to pull content at their own pace. Additionally, letting go of the knowledge of the address of every related subscriber, simplifying the logic and management of the publisher mechanics.
Of course, the above diagram is a very simplified version of the paradigm, but it’s good enough the explain the point.
Some key properties of Pub/Sub that will be relevant down the line are:
- Pub/Sub works under the premise of “fire and forget”. This essentially means that every published message will be delivered to as many subscribers as there are then it will be lost from the buffer
- All messages will be delivered to all subscribers. Mind you, you can have subscribers listening for different channels, which would prevent this from happening. But if you have more than one subscriber on the same channel, then all of them would get the same message. It would be up to them then, to decide what to do about that.
- There is no ACK message. Some communication protocols deal with an acknowledge message, in order for the subscribers to let the publisher know the message was received. In this case, there is nothing like that, so if your subscriber gets the message and then crashes, that data will be lost for good
When would you use Pub/Sub?
Some classic examples of Pub/Sub are:
- Chat servers, allowing you to create chat rooms easily by letting Redis take care of all the hard work of distributing messages amongst users. By default, these chat rooms would not persist messages, but you could find a way around that by adding some storage logic to your chat server
- Notification service: Another interesting use case, where you can subscribe to a set of notifications you’d like to receive, and then it’s a matter of publishers sending them to the right channel
- Log centralization. You could easily build a logging hub, where your own app is the publisher and different services make sure they send the information to the right destination. This would allow you to have a very flexible logging scheme, being able to swap from storing to disk to sending everything to an ELK instance or to a cloud service, or even all of them at once! Think about the possibilities!
Now blocking lists are very similar to the above structure (and not to spoil the mystery, but you’ll find that the same is true for streams as well). The main difference though is that this is not really another separate feature Redis comes with, but instead, it’s just a data type (Lists) with specific blocking mechanics that work in our favor. Let me explain.
When working with Lists in Redis, you can add elements to it (either to its head with LPUSH or to its tail with RPUSH) and you can get the top element (with LPOP from the head and RPOP from the tail). By default, the *POP operations aren’t blocking, meaning that if the list has no data, you’ll get an empty result, and that’s the end of it.
But you also have two very useful blocking commands BLPOP and BRPOP, which are the blocking versions of their counterparts LPOP and RPOP. The fact that they’re blocking is essential here because when used with an empty list, the connection from the client to Redis will be blocked until a new element is added. And that blocking mechanics essentially gives you a messaging system you can use.
Essentially, we’re mimicking the same setup as with Pub/Sub, where any number of publishers can send content into a buffer-like structure and a random number of subscribers can listen for new data and react to it. But, as they say, the devil is in the details, so let’s go in a bit deeper:
- The buffer-like structure I mentioned is simply a list
- Publishers simply use LPUSH or RPUSH to add messages to the list
- Subscribers don’t actually subscribe, but instead, they just do a blocking POP (either BLPOP or BRPOP)
That’s the gist of it, the key is the blocking mechanics of the POP operation because it’ll keep subscribers waiting until a message is added to the list. The moment that happens, the subscriber that’s been waiting the longest will immediately pop it out for itself.
The main differences between this structure and Pub/Sub are:
- Messages aren’t distributed to all subscribers, in fact, every message is only delivered to one subscriber thanks to the fact that the first one to be notified, pops it out
- The fact that messages are stored in a list in Redis, they are stored inside it until a subscriber is connected. And if you configure Redis to store data in the disk, you can get a pretty reliable queueing system
Looking at the future: What are streams?
Now that I’ve covered the known and existing structures, let’s look at the brand new shining streams.
The main design thinking behind Streams is log files. Which is an interesting thing to say, but hear me out: when you’re inspecting a log file in order to debug your application, you usually do something like this:
$ tail -f yourlogfile.log
That’ll show you the last few lines of your file, letting you know the last few things that happened. Not only that, but the command line will be blocked, since it’ll keep waiting for new lines to be added to your file, showing them immediately once they appear.
So far, it’s sounding a lot like the blocking lists, isn’t it? But as you’re about to learn, Streams implement a couple more complex operations, which give you more control over what you can do with them (compared to blocking lists).
The other main difference between everything we’ve seen so far and Streams, is that instead of just being able to handle string elements, the internal structure of messages is a set of key-value pairs, so your messages can actually have complex structures directly in Redis (instead of being a stringified version of your JSON objects).
Consuming from and publishing into a Stream
The basic operations you can perform on Streams are pushing data into them and getting data out of them.
To publish data into them, you have the XADD command, which is very straightforward:
>XADD yourstreamname *key1 value1 key2 value2
That command would add a structure like the following, into a stream called “yourstreamname”:
Every message added to the stream will have an internal ID, which is the second argument of the XADD operation. Passing a “*” will let Redis know to auto-generate it for us, and in turn, it’ll return it as a result of our adding operation. You could potentially specify an ID yourself, but for most use cases you won’t need to worry about that and you can just let Redis handle it.
Now, getting data from the stream is where things get interesting. There are two ways you can get data from a Stream structure.
You can do something similar to what we’ve been doing so far using XREAD. This command will let you subscribe to a Stream waiting for new messages to arrive.
>XREAD COUNT 2 STREAMS yourstreamname 0
More great articles from LogRocket:
- Don't miss a moment with The Replay, a curated newsletter from LogRocket
- Learn how LogRocket's Galileo cuts through the noise to proactively resolve issues in your app
- Use React's useEffect to optimize your application's performance
- Switch between multiple versions of Node
- Discover how to animate your React app with AnimXYZ
- Explore Tauri, a new framework for building binaries
- Advisory boards aren’t just for executives. Join LogRocket’s Content Advisory Board. You’ll help inform the type of content we create and get access to exclusive meetups, social accreditation, and swag.
>XREAD BLOCK 0 STREAMS yourstreamname 0
The first version of the command will return the first two unread messages added to “yourstreamname” that have an ID greater than 0. This version is not blocking though, so as you would expect, if there is nothing new, the command will return no messages.
The second version though is blocking with a timeout of 0 (meaning there is no timeout), so this time, just like with blocking lists, the command will not return until there is a new message added.
A couple of comments:
- Although the blocking operation looks like a blocking list, new messages are sent to every subscribed client, just like with Pub/Sub
- Now, although you might be thinking this is like Pub/Sub, messages added to a Stream are kept within the Stream (unlike the fire and forget mechanics of Pub/Sub)
- Because of the above fact, there is actually another way to get data from Streams with XRANGE, we will look at that next
Using XRANGE and XREVRANGE (this being the same as the other one but with the reverse order in the returned data) you can get a range of messages from the Stream, something impossible to do with the previous structures!
>XRANGE yourstreamname 13213131-0 + COUNT 3
The command is not that complex, the parameters are:
- Name of the Stream you’re reading from
- The first ID to read
- The last ID to read (with + being the highest possible ID)
- Optionally, you can also limit the number of results to return
A word about message Ids
Just so it’s clear, message IDs have two parts, the first one is the local time (in the local Redis node) expressed in milliseconds. The second part though is simply an auto-increment, meant to solve possible collisions when messages are received at the exact same time.
What about consumer groups?
Another big difference and one of the most interesting additions to the publisher/subscriber mechanics added by Streams are the consumer groups.
This is not a new concept, in fact, Kafka has the same concept, although not implemented in the same way.
The main use case for consumer groups is when you want different consumers getting data from the same stream, but at different speeds. In these cases you might want to have messages delivered only to one consumer, essentially working like the blocking lists instead of Pub/Sub.
If you do this, you’re ensured that:
- Messages are delivered to only one consumer within the group
- Consumers need to be identified uniquely within the group they belong to. The Ids are case-sensitive and need to be provided by the consumer itself
- Each group keeps track of the unread messages, so when a consumer requires one, it will always return an unread message
- Consumers need to send an ACK message back to Redis to notify the message was properly processed
Creating a group
When creating a group, you specify the stream the group belongs to, its name and the top ID, meaning that any ID higher than that one will be added into this group.
To do so, you can use the XGROUP command:
> XGROUP CREATE yourstreamname yourgroup $
The top ID used in the example (
$), references the last ID, so in the above group, only new messages received after the group was created will be added.
You could potentially create a group like this:
> XGROUP CREATE yourstream historicgroup 0
Making this new group would contain all messages ever received.
Reading from a group
The command XREADGROUP can be used to get data off of a group. The syntax is the same as XREAD, but a single added attribute called “GROUP” that receives the group name and the consumer name.
So, it would look something like this:
> XREADGROUP GROUP historicgroup consumer1 BLOCK 0 STREAMS yourstream
The above command, essentially reads (in a blocking manner) from the historicgroup group from the yourstream stream, identifying itself as the consumer1 consumer.
Confirming a read message
Finally, in order to provide confirmation that you’ve read a message successfully, you need to send the XACK command back to Redis, otherwise, hat message will remain in a pending status.
The attributes required for this operation are the Stream’s name, the group’s name and finally, the message Id:
>XACK yourstream historicgroup 1231241142-0
And that’s it!
Not really, actually there is still more to Streams that I haven’t covered. The content in this article though should be enough to understand not only how to use Streams but also, why they were added to the set of data types already in existence.
Feel free to read the full documentation if you want to know more about Streams and everything you can do with them.
Thanks so much for reading and please feel free to leave a comment if you’re already using Streams or if you’re planning on doing so!
Get setup with LogRocket's modern error tracking in minutes:
- Visit https://logrocket.com/signup/ to get an app ID.
- Install LogRocket via NPM or script tag.
LogRocket.init()must be called client-side, not server-side.
- (Optional) Install plugins for deeper integrations with your stack:
- Redux middleware
- ngrx middleware
- Vuex plugin
$ npm i --save logrocket
import LogRocket from 'logrocket';
Add to your HTML:
<script>window.LogRocket && window.LogRocket.init('app/id');</script>