Table of contents
1.
Introduction📝
2.
What are Streams?🤔
2.1.
Entry Id
3.
Accessing Data from Streams
4.
XRANGE and XREVRANGE
5.
Listening with XREAD Command for new items
6.
Consumer groups🛸
7.
How to create a consumer group🧐
8.
Automatic Claiming of Data
9.
Capped Streams
10.
Zero Length Streams
11.
FAQ
11.1.
Are Redis Streams in memory?
11.2.
Why do we use Redis Streams?
11.3.
How does the Redis Stream work?
11.4.
Define the Redis module?
12.
Conclusion
Last Updated: Mar 27, 2024
Easy

Streams

Author ANKIT MISHRA
0 upvote
Career growth poll
Do you think IIT Guwahati certified course can help you in your career?

Introduction📝

The Stream is a new type of data type introduced with Redis 5.0 that abstractly models a log data structure. However, the essence of a log remains: Redis Streams are an append-only data structure, similar to a log file, which is commonly implemented as a file open in append-only mode.

Introduction

 Redis Streams, at least conceptually, enable strong operations to overcome the restrictions of a log file because they are an abstract data type represented in memory. Let's learn more about it in depth.

What are Streams?🤔

Streams are an append-only data structure, with the basic write command, called XADD, appending a new entry to the chosen stream. A stream-entry comprises one or more field-value pairs rather than just a string. This way, each stream stream-entry is already structured, similar to an append-only file produced in CSV format with numerous separated fields on each line.

XADD mystream * sensor-id 0001 temperature 26.3
XADD mystream * sensor-id 0002 temperature 28.26


Output

1650418503680-0
1650418503680-1


The above XADD command call adds an entry sensor-id: 0001 and 0002, temperature: 26.3 and 28.26 to the stream at key mystream, using an auto-generated entry ID, namely 1650418503680-0 and 1650418503680-1, returned by the command.

It receives as its first argument the key name mystream, and as its second argument the entry ID, which identifies each entry within a stream. However, we passed * in this case because we want the server to produce a new ID for us. Every new ID will be monotonically growing. Therefore every new entry added will have a higher ID than all previous ones. The server's auto-generation of IDs is almost always what you want, and the reasons for explicitly specifying an ID are pretty unusual.

Another similarity with log files is that each Stream entry contains an ID, which may be used to identify a specific entry using line numbers or the byte offset inside the file. Returning to our XADD example, the next arguments after the key name and ID are the field-value pairs that comprise our stream-entry.

The XLEN command can be used to determine the number of items in a Stream:

XLEN mystream


Output

(integer) 2

Let's have a look at how Entry Id is managed for each input in Redis:

Entry Id

The entry ID produced by the XADD command, which uniquely identifies each entry inside a given stream, is made up of two parts:

<millisecondsTime>-<sequenceNumber>


The milliseconds time part is the local time in the local Redis node generating the stream ID; however, if the current milliseconds time is less than the last entry time, the last entry time is used instead, so that if a clock jumps backwards, the monotonically incrementing ID property remains. For entries created in the same millisecond, the sequence number is used. Because the sequence number is 64 bits wide, there is no practical limit to the number of entries generated in the same millisecond.

The format of such IDs may appear weird at first, and the casual reader may wonder why the time is included. The reason for this is that Redis streams offer ID-based range queries. Because the ID is linked to the time the item is created, it is possible to ask for time ranges for free. This will be covered shortly while discussing the XRANGE command.

Suppose, for some reason, the user requires incremental IDs unrelated to time but are instead associated with another external system ID, as previously mentioned. In that case, the XADD command can accept an explicit ID instead of the * wildcard ID that triggers auto-generation, as shown in the examples below:

XADD examplestream 0-1 field value


Output

0-1


Note that in this case, the minimum ID is 0-1 and that the command will not accept an ID equal or smaller than a previous one.

XADD somestream 0-1 foo bar


Output

(error) ERR The ID specified in XADD is to be equal or smaller than the target stream top item

Accessing Data from Streams

Accessing Data from Streams

🚀We can finally use XADD to append entries to our stream. However, while adding data to a stream is clear, the method by which streams can be queried to extract data is not so obvious. Continuing with the log file analogy, one obvious way is to do what we usually do with the Unix command tail -f

🚀It should be noted that, unlike Redis's blocking list operations, where a given element will only reach a single client who is blocking in a pop type operation like BLPOP, we want numerous consumers to view the new messages that are appended to the stream (the same way that many tail -f processes can see what is added to the log).

🚀Using old language, we want the streams to be able to send messages to numerous clients simultaneously.

🚀However, this is only one possible route of access. We may also look at a stream differently: as a time series repository rather than a communications mechanism. In this scenario, getting the new messages appended may be valuable. 

🚀Still, another obvious query technique is to get messages by time periods or iterate the messages using a cursor to verify all the history incrementally. This is unquestionably another helpful form of access.

🚀Finally, suppose we view a stream from the point of consumers. In that case, we may want to access the stream in yet another way, that is, a stream of messages that can be partitioned to multiple consumers who are processing such messages so that groups of consumers only see a subset of the messages arriving in a single stream. 

🚀This allows the message processing to be scaled among different consumers without requiring a single consumer to process all of the messages: each consumer will get different messages to process. 

🚀Essentially, this is what Kafka (TM) does with consumer groups. Another interesting way to read Redis Stream messages is to use consumer groups.

Redis Streams support all three query modes discussed above via various commands. The following sections will demonstrate them all, beginning with the most basic and straightforward.

XRANGE and XREVRANGE

To query the stream by range, we only need to provide two IDs: start and end. The range returned will contain elements with a start or finish as ID. Therefore it is all-inclusive. The two unique IDs - and + denote the smallest and biggest IDs possible.

XRANGE mystream - +


Output

1650420242754-0
sensor-id
0001
temperature
26.3
1650420242755-0
sensor-id
0002
temperature
28.26


Each returned entry is an array with two items: the ID and a list of field-value pairs. We have said that the entry IDs are time-related because the part to the left of the - character represents the Unix time in milliseconds of the local node that created the stream entry at the time the entry was created (however, note that streams are replicated with fully specified XADD commands so that the replicas will have identical IDs to the master). This means that I could use XRANGE to query a time range.

XRANGE mystream 1650420242754 1650420242755


Output

1650420242754-0
sensor-id
0001
temperature
26.3


We only have one entry in this range. Still, I could query for hours in real data sets, or there may be many items in only two milliseconds, and the output could be massive. As a result, XRANGE includes an optional COUNT option at the conclusion.

XRANGE mystream - + COUNT 2


Output

1650420869424-0
sensor-id
0001
temperature
26.3


Since the XRANGE complexity is O(log(N)) to seek and then O(M) to return M elements, the command has a logarithmic time complexity with a tiny count, which means that each step of the iteration is quick.

The command XREVRANGE is the same as XRANGE, except it returns the components in reverse order; hence one practical application for XREVRANGE is to determine the final item in a Stream:

XREVRANGE mystream + - COUNT 1


Output

1650421127566-1
sensor-id
0002
temperature
28.26

Listening with XREAD Command for new items

XREAD is the command that allows you to listen for new messages arriving in a stream. It's a little more complicated than XRANGE, so we'll start with simple forms and work our way up to the entire command layout.

XREAD COUNT 2 STREAMS mystream 0


Output

mystream
1650431841908-0
sensor-id
0001
temperature
26.3
1650431841908-1
sensor-id
0002
temperature
28.26


🚀The above is the non-blocking version of XREAD. The COUNT option is not required; in fact, the only mandatory option of the command is the STREAMS option, which specifies a list of keys along with the corresponding maximum ID already seen for each stream by the calling consumer so that the command will only return messages with an ID greater than the one we specified.

🚀We used STREAMS mystream 0 in the preceding command because we want all messages in the Stream mystream to have an ID greater than 0-0. As shown in the above example, the command returns the key name since it is possible to run this command with more than one key to read from many streams simultaneously. For example, I could write: STREAMS mystream otherstream 0 0. Please note how, after the STREAMS option, we must provide the key names and, ultimately, the IDs. As a result, the STREAMS option must always come last.

🚀When we don't want to access items in a stream by a range, we usually want to subscribe to new items that arrive in the stream. This notion may appear similar to Redis Pub / Sub, where you subscribe to a channel, or to Redis blocking lists, where you wait for a key to obtain new elements. Still, there are significant distinctions in how you consume a stream:

🚀Multiple customers (consumers) may be waiting for data in a stream. Every new item will be sent by default to every consumer waiting for data in a specified stream. This behavior differs from blocking lists, in which each consumer receives a unique element. The capacity to fan out to various consumers, on the other hand, is akin to Pub/Sub.

🚀Unlike Pub/Sub, where messages are fired and never saved, and blocking lists, where when the client receives a message, it is popped (essentially removed) from the list, streams work fundamentally differently. All messages are appended forever to the stream (until the user explicitly requests deletion): various consumers will know what is the new message from their perspective by remembering the ID of the previous message received.

🚀Streams Consumer Groups offer a level of control that Pub/Sub or blocking lists cannot match, with different groups for the same stream, explicit acknowledgement of the processed items, the ability to inspect pending items, claiming of the unprocessed messages, and the coherent history visibility for each client, which can only see its personal history of messages.

Consumer groups🛸

A consumer group is a method of dividing a stream of messages across numerous customers to speed up processing or relieve the load on slower consumers. In an ideal environment, data producers and consumers work at the same speed, and there is no data loss or backlog.

Consumer groups are used to scale out your data consumption process. Consider an image processing program as an example. The solution necessitates three significant components:

A producer (perhaps one or more cameras) takes and saves images.

Redis Stream preserves images in the order they arrive (in a stream data storage), and an image processor processes each image.

Now, assume your producer saves 500 photos every second, but your image processor only processes 100 images per second at total capacity. This rate discrepancy will result in a backlog, which your image processor will never be able to catch up with. A straightforward solution is to operate five image processors (as illustrated in Figure 2), each process a mutually exclusive collection of images. This is possible using a consumer group, which allows you to segment your workloads and route them to multiple consumers.

A consumer group does more than data partitioning — it ensures the safety of data and enables disaster recovery.

How to create a consumer group🧐

Assuming I already have a crucial mystream of type stream, I only need to execute the following to create a consumer group:

XGROUP CREATE mystream mygroup $


Output

OK


If the stream does not already exist, XGROUP CREATE can create it automatically by passing the optional MKSTREAM subcommand as the last argument:

XGROUP CREATE newstream mygroup $ MKSTREAM


Output

OK

Automatic Claiming of Data

The XAUTOCLAIM command, added in Redis version 6.2, implements the auto claiming mechanism. XPENDING and XCLAIM are the fundamental building elements for many recovery procedures. This command optimizes the generic process by allowing Redis to control it and provides a simple answer for most recovery needs.

XAUTOCLAIM detects idle pending messages and assigns ownership to a consumer. The command's signature is as follows:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]


The command produces an array of the claimed messages. Still, it also returns a stream ID that may be used to iterate over the waiting entries. I may use the stream ID as a cursor in my next call to continue claiming idle pending messages:

When the "0-0" stream ID returned as a cursor from XAUTOCLAIM, it implies that it has reached the end of the list of pending items for the consumer group. That doesn't mean there aren't any new idle pending messages. Therefore the procedure is repeated by invoking XAUTOCLAIM from the beginning of the stream.

Capped Streams

Many applications do not wish to keep collecting data into a stream indefinitely. Sometimes it's helpful to have a maximum number of items in a stream; other times, once a specific size is achieved, it's helpful to migrate data from Redis to a storage that isn't in memory and isn't as quick but is suited to storing the history for decades. There is some support for this in Redis streams. The MAXLEN option of the XADD command is one example. This option is quite easy to use:

XADD mystream MAXLEN 2 * value 1


Output

1650434335573-1


The old entries are immediately evicted when the requested length is reached, leaving the stream at a constant size. There is presently no option to tell the stream only to keep items less than a certain age because such a command may block for a lengthy time to evict items.

Trimming with MAXLEN, on the other hand, might be costly. To be memory economical, streams are represented by macro nodes in a radix tree. Changing a single macronode, consisting of a few tens of elements, is not optimum. As a result, the command can be used in the following special form:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...


Because of the argument ~ between the MAXLEN option and the actual count, we don't need this to be exactly 1000 items. It doesn't matter if it's 1000, 1010, or 1030; make sure to save at least 1000 items. Trimming is only performed with this option when we can delete an entire node. This is far more efficient, and it is usually what you want.

There is also the XTRIM command, which does something similar to what the MAXLEN option does above, but it may be run independently:

XTRIM mystream MAXLEN 10

Zero Length Streams

The distinction between streams and other Redis data structures is that when the other data structures no longer have any items, the key itself is erased as a result of issuing commands that delete elements. For example, a sorted set will be totally eliminated when the last piece in the sorted set is removed by a call to ZREM. On the other hand, streams are allowed to remain at zero elements, either because a MAXLEN option with a count of zero was used (XADD and XTRIM commands) or because XDEL was called.

Such an asymmetry emerges because streams may be associated with consumer groups. We do not want to lose the current state in which the consumer groups are defined just because there were no longer any items existing in the stream. Even when there are no linked consumer groups, the stream is not destroyed.

I hope now you have understood the concepts of Streams in Redis.

Let’s move to the FAQs Section

FAQ

Are Redis Streams in memory?

Redis Streams is a newer data type than Redis Pub/Sub and is intended to facilitate "disconnected" distributed streaming applications. The data type is just a memory-stored append-only data structure—basically preserved messages!

Why do we use Redis Streams?

Redis Streams is an append-only data structure that can be used in various streaming applications such as real-time messaging (RTM), message brokering, and so on.

How does the Redis Stream work?

The Stream is a new type of data type that is introduced with Redis 5.0 that abstractly mimics a log data structure. However, the essence of a log remains: Redis Streams are an append-only data structure, much like a log file, which is commonly implemented as a file open in an append-only mode.

Define the Redis module?

Redis modules are dynamic libraries that can be loaded into Redis at boot time or with the MODULE LOAD command. Redis provides a C API via a single C header file named redismodule.

Conclusion

In this article, we have extensively discussed streams and how they differ from previously available data structures and keywords associated with Streams. For more such blogs, you can visit our Blogs section, also read Data Types in RedisTools for Redis.

If you want to learn more, check out our articles on Code studio

“Happy Coding!”

Live masterclass