
Stream
介绍
https://redis.io/docs/data-types/streams-tutorial/
说明
Stream是一个只能添加数据的数据结构;依赖于添加命令XADD,Stream可以添加新的entry到指定流;
每个Stream Entry都是一个field-value;
示例1:XADD使用
1 | 127.0.0.1:6379> XADD myStream * sensor-id 12 temperature 11.1 |
在上面的命令中,
- XADD为myStream添加了{sensor-id:12, temperature:11.1}属性;
- 第一个参数myStream为指定的key的名字;
- 第二个参数是一个ID,每个entry都有一个ID;这里使用“*”,意为让服务器自动生成一个ID;
- 服务器生成的ID会依据生成先后自动增加;
- 大多数情况下都使用服务器自动生成的结果作为ID,无需手动提供ID;
- 后面的参数即field-value,用于提供要保存的键值对。
示例2:XLEN
1 | 127.0.0.1:6379> XLEN myStream |
XLEN可以获取流的长度。
Entry ID介绍
Entry ID的格式如下:
1 | <millisecondsTime>-<sequenceNumber> |
millisecondsTime即当前本地的时间;
Entry ID都是逐步递增的,如果Redis获取的时间在上个提交的Entry时间之前,Redis将使用最新的进行记录(即上个提交的Entry时间);
为什么用时间作为ID?。(从文档翻译)用时间作为ID可能会很奇怪,有些读者可能想知道问什么这样做。原因是Redis支持依据ID值的范围进行查询Entry,如果将时间作为ID,就可以很方便地依据时间去查询Entry。(e…Long型的时间好像也不方便…)
Entry最小的ID是0-1,Redis不允许ID小于上一次添加的Entry的ID;
示例3:ID可以仅提供milliseconds部分,由系统去生成后半部分
如果用的Redis版本是7或以上,可以提供ID的前半部分,后半部分使用*代替,Redis会自动生成后半部分且逐次递增;
1 | 127.0.0.1:6379> XADD s5 0-* apple 1 |
示例4:查询的操作
可以用时间范围去查询Entry
只需要指定两个ID来表示起点和终点,就可以查询一个流;
两个特殊的参数“-”和“+”分别表示最小ID和最大ID。
1 | 127.0.0.1:6379> XADD s5 0-* apple 1 |
每个entry返回一个含有field-value的数组;
在XADD指定ID时使用*,则ID的左半部分表示当时的时间,这将意味着我们可以使用XRANGE去查询一个时间范围内的所有Entry;
因为使用毫秒时间作为ID的左半部分,所以XRANGE查询的时间甚至可以精确到毫秒;
1 | 127.0.0.1:6379> xadd s6 * apple 1 |
XRANGE可以限制查询数量
有时候,在有些范围内,可能会有特别多的Entry,这样查询起来可能会麻烦;
使用XRANGE的COUNT参数可以限制查询的数量;
1 | 127.0.0.1:6379> xadd s6 * apple 1 |
XRANGE可以查询接下来的两个Entry;
将查询结果的最后一个ID作为查询的起点,使用“(”前缀,可以查询接下来的两个Entry;
1 | 127.0.0.1:6379> xrange s6 (1663850755536-0 + COUNT 2 |
XRANGE的复杂度是O(log(N)),返回M个元素则另外包含O(M),复杂度是对数,这样效率是比较高的,所以可以代替XSCAN;
XREVRANGE
XREVRANGE等效于XRANGE,不同的是XREVRANGE的顺序是相反的;
通过XREVRANGE获取最新的Entry:
1 | 127.0.0.1:6379> xrevrange s6 + - COUNT 1 |
示例5:XREAD 监听新元素的添加
有时候我们可能不需要通过一个返回从流中获取数据,仅仅只是想获取流中新抵达的元素(订阅流中新抵达的元素);
这就是Redis的发布/订阅;
阻塞队列与Stream是不同的
- Stream收到的新的元素,会平等的发送到每个等待的消费者;而在阻塞队列中,消费者获取到的元素是不同的;
- Stream会将收到的元素无限期地存储在流中(除非用户指定),而阻塞队列在pop后会将元素删除;
- Streams Consumer Groups提供了Pub/Sub的控制级别功能;每个消费者会被手动地分配到不同组,对于不同组,则有不同的访问范围;StreamsConsumerGroups能够记录项目是否被完成,同时记录消费者处理事务的历史,对于每个消费者,只能获取到自己处理的事务;
对于消费者,使用XREAD来监听流的新到元素;
通过XREAD获取流中的最新数据
1 | 127.0.0.1:6379> XREAD COUNT 2 STREAMS s6 0 |
其中,STREAMS是必要参数,用于指定key和一个ID;
Redis会返回流中大于指定ID的entry;
STREAMS参数后面可以跟多个key,key后面根同等数量的ID,如:STREAMS s1 s2 0 0;因此STREAMS参数必须放在命令的最后面;
XREAD可以阻塞网络直到有新的entry被添加
添加一个BLOCK参数就可以阻塞网络直到获取到新的被添加entry;
BLOCK后面根超时时间,0表示无限等待;
使用“$”表示流内最新的Entry的ID;
在示例中:
- XREAD被阻塞
- 0表示无限期等待
- STREAMS用于指定key和ID;
- $表示s6下最新的ID
- 当XADD执行结束后,XREAD阻塞取消,返回新添加的值;
XREAD BLOCK 可以不使用$作为id;
在此模式下,STREAMS可以提供多个key和ID,此时如果其中一个流添加了新的数据,将从该流读取出现的新数据并返回。
XREAD不会删除数据,仅仅只是读取数据。
使用示例
添加几个温度记录到Stream
1 | 127.0.0.1:6379> xadd temperatures:table:1001 apple 23.2 bread 18.2 milk 11.2 # 苹果23.2 面包18.2 牛奶11.2 一般这里表示硬件温度; |
在流的尾部,读取100个新的stream entries,如果没有entries被写入则阻塞等待300ms;
1 | XREAD COUNT 100 BLOCK 300 STREAMS temperatures:table $ |
基础命令
XADD:添加一个新的entry到stream;
XREAD:在给定的位置开始并向前读取,读取一个或多个entries;
XRANGE:返回给定两个ID指向的entry之间的entry;
XLEN:返回流的长度;
性能
添加entry到stream是O(1);
访问任意单个entry是O(n),n为ID的length;
其他命令
增 | 删 | 查 | 赋权 | XGROUP | XINFO | 整理 |
---|---|---|---|---|---|---|
XADD | XACK | XLEN | XAUTOCLAIM | CREATE | COMSUMERS | XTRIM |
XDEL | XPENDING | XCLAIM | CREATECONSUMER | GROUPS | ||
XRANGE | DELCONSUMER | STREAM | ||||
XREVRANGE | DESTROY | |||||
XREAD | SETID | |||||
XREADGROUP |
TRIM 整理/修剪
TRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
TRIM可以使流在需要时移除旧的entries;
参数
MAXLEN:指定流要控制的长度,这样做会移除旧的entries直到流的长度控制为指定长度,长度必须为整数;(在文档里,这里用threshold来表示需要控制的长度)
MINID:移除id低于threshold的entries;
**~**:仅在使用MAXLEN时使用。使用参数后,Redis可能不会准确地将entries数量修剪到指定长度,而是比指定长度多一点或等于指定长度;
- 使用~后,Redis会尽早的停止修剪,以促进性能节约;
- 有时候,用户并非需要精确地将流裁剪为指定长度,可以有些误差,但不能少于指定值,使用“~”后,可以达到要求的同时提高性能。
LIMIT:限制TRIM移除的entries最大大小;
- 有时候,如果需要修剪entries过多,可能会影响性能,通过LIMIT限制本次操作要清理的entries数量,可以尽快地结束操作进行;
- 若没有指定LIMIT参数,将以entries的数量x100作为count值;
- 指定count=0,可以禁止移除数量限制。
返回值
被移除的entries数量。
使用
修剪流使流达到指定长度
1 | 127.0.0.1:6379> xtrim s MAXLEN 1000 # 这里指定长度为1000 |
移除ID小于指定值的entries
1 | 127.0.0.1:6379> xadd s * a 2 b 3 # a=2 b=3 |
增
XADD 增加entry到流
XADD key [NOMKSTREAM] [<MAXLEN | MINID>] [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
添加entry到指定流;
一个entry由一个键值对组成;
XRANGE和XREAD可以用来从流中读取数据;
XADD可以向流中添加数据;
XDEL和XTRIM可以从流中删除数据。
使用
使用参数指定Stream ID
如果使用“*”指定ID,XADD命令将为流自动生成唯一的Stream ID;
ID一般由Redis生成,但也可以自己手动指定ID:
- ID的格式为:123456789-33
- 两个数字都是64bit的数字,ID生成过程中,第一个数字使用Unix时间生成,第二部分是一个序列号,用于区分在同一时间生成的不同数据;
- 可以手动指定一个不完整的id,如:123456789-*。用*表示的第二部分将由Redis去生成;
- ID保证生成过程是递增的;若在某次插入过程中,指定的ID比之前大得多,在后来的插入过程,将按照这个指定的ID继续进行递增;
限制流的大小
若流的内容条数等于指定的允许大小(也就是满了),在新添加内容时,XADD将最旧的条目逐出流,然后将新的条目添加进去;
[<MAXLEN | MINID>] [= | ~] threshold [LIMIT count]]
的用法详见XTRIM。
参数
NOMKSTREAM:若key不存在,不创建新的对象;
删
XDEL
XDEL key id [id ...]
从流中移除指定的entry,返回被移除的entry的数量;
原理
XDEL并非真正地将entry从radix tree中移除,仅仅只是将entry标注为被删除;
如果一个宏节点中所有的entry被删除,整个节点会被销毁,内存将会被回收;
对于流来说,为保持流的性能,XDEL仅仅只是将entry标注为deleted,因此会产生大量的碎片,这是一个较为严重的问题;
在未来的版本中,Redis可能会添加垃圾回收机制去清理被删除的节点,但是这样会增加复杂度,这不是一个好主意(即有内存碎片,但是目前不打算解决)
使用
1 | 127.0.0.1:6379> xadd s3 * apple 1 cat 2 |
查
XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
读取一个或多个流,只返回带有id或大于id的数据表;
参数:
COUNT:数量
BLOCK:若当前没有数据则阻塞,直到有新的数据输入,0表示无限期阻塞等待,非0表示超时等待;
STREAMS:可以提供多个流和id,将从多个流中读取数据;
1 | 127.0.0.1:6379> xadd stream001 * apple 3 stone 5 # 插入一个数据 |
内部命令
XSETID
XSETID key last-id [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
用于让Redis复制最后交付的流的ID;
RPC