Stream消息队列
https://pdai.tech/md/db/nosql-redis/db-redis-data-type-stream.html
Stream是一个新的强大的支持多播的可持久化的消息队列。
Stream结构
![image]()
consumer Group:消费组,使用XGROUP CREATE创建,一个消费组有多个消费者,这些消费者之间是竞争关系;
last_delivered_id:游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动;
Pending Entries List:pending_ids。维护并存放消费者读取消息的状态(消费者是否已经向服务器回应ACK);pending_ids记录了当前已被客户端读取的消息,但还没有ack的消息;一旦某个消息被ack,这个队列会将消息id移除,表示消费者收到了该消息;用于确保客户端消费了消息一次,而不是在网络传输途中丢失了没处理。
消息ID:即Stream创建Entry时会生成的ID。
- 格式:timestampInMillis-sequence 12345567-9 毫米时间戳-序号;
- 序号为在同一时间戳内生成的消息的序号,用于区分在同一时间戳生成的多个消息;
- 格式必须为
整数-整数
,后面加入的消息ID必须大于前面的消息ID;
消息内容:消息内容就是Entry的键值对;
生产者命令
XADD:添加消息到末尾;
XTRIM:对流进行修剪;
XDEL:删除消息;
XLEN:获取流包含的元素数量;
XRANGE:获取消息列表;
xxxxxxxxxx http { include mime.types; default_type application/octet-stream; upstream yupstream{ # 服务器资源 server 127.0.0.1:8080 weight=1; # 权重为1 server 127.0.0.1:8081 weight=1; } server{ listen 801; server_name localhost; # 代理 # 根目录请求 location / { root html; index index.html,index.htm; proxy_pass http://yupstream; # 反向代理 } location /admin { } }}perl
XREAD:以阻塞或非阻塞方式获取消息列表。
消费组消费
![image]()
使用示例
XGROUP CREATE 创建消费者组
XREADGROUP GROUP 读取消费组中的消息
XACK 将消息标记为已处理
XGROUP SETID 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER 删除消费者
XGROUP DESTROY 删除消费者组
XPENDING 显示待处理消息的相关信息
XCLAIM 转移消息的归属权
XINFO 查看流和消费者组的相关信息
XINFO GROUPS 查询组信息
XINFO STREAM 打印流信息
XINFO CONSUMERS 组成员信息
创建组/查询信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| 127.0.0.1:6379> xadd st1 * apple 1 banana 23 "1664179267483-0" 127.0.0.1:6379> xadd st1 * apple 12 banana 2 # 为st1添加两个消息 "1664179271399-0" 127.0.0.1:6379> xgroup create st1 g1 0-0 # 为st1创建一个消费者组g1,从0-0开始消费 OK 127.0.0.1:6379> xgroup create st1 g2 $ # 为st1创建一个消费者组g2,仅获取最新的消息 OK 127.0.0.1:6379> xinfo stream st1 # 获取st1的stream信息 1) "length" 2) (integer) 2 # 长度 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "last-generated-id" 8) "1664179271399-0" # 最后生成的id 9) "max-deleted-entry-id" 10) "0-0" 11) "entries-added" 12) (integer) 2 13) "recorded-first-entry-id" 14) "1664179267483-0" 15) "groups" 16) (integer) 2 17) "first-entry" # 第一个entry 18) 1) "1664179267483-0" 2) 1) "apple" 2) "1" 3) "banana" 4) "23" 19) "last-entry" # 最后一个entry 20) 1) "1664179271399-0" 2) 1) "apple" 2) "12" 3) "banana" 4) "2" 127.0.0.1:6379> xinfo groups st1 #获取st1的组信息 1) 1) "name" 2) "g1" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "0-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 2 2) 1) "name" 2) "g2" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1664179271399-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 0
|
消费组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| # 消费者消费数据 xreadgroup group 组名 消费者名 count 数量 streams steam名 127.0.0.1:6379> xreadgroup group g1 consumer001 count 1 streams st1 > 1) 1) "st1" 2) 1) 1) "1664179267483-0" 2) 1) "apple" 2) "1" 3) "banana" 4) "23" # 消费者consumer001再次从流st1、组g1消费一个数据 127.0.0.1:6379> xreadgroup group g1 consumer001 count 1 streams st1 > 1) 1) "st1" 2) 1) 1) "1664179271399-0" 2) 1) "apple" 2) "12" 3) "banana" 4) "2" # st1中已经没有数据,再次读取返回nil 127.0.0.1:6379> xreadgroup group g1 consumer001 count 1 streams st1 > (nil) # 尝试带阻塞读取数据 127.0.0.1:6379> xreadgroup group g1 consumer001 block 0 count 1 streams st1 > # 此处阻塞 # 在另一个终端向st1中添加数据后取消阻塞 1) 1) "st1" 2) 1) 1) "1664179863235-0" 2) 1) "apple" 2) "12" 3) "qwe" 4) "11" (17.78s) # 等待时间 127.0.0.1:6379> xinfo groups st1 # 获取st1的组信息 # 包含 组名,消费者数量,获取的数量等数据 1) 1) "name" 2) "g1" 3) "consumers" 4) (integer) 1 5) "pending" 6) (integer) 3 7) "last-delivered-id" 8) "1664179863235-0" 9) "entries-read" 10) (integer) 3 11) "lag" 12) (integer) 0 2) 1) "name" 2) "g2" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1664179271399-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (nil) # 获取组g1的流st1的消费者数据 127.0.0.1:6379> xinfo consumers st1 g1 1) 1) "name" 2) "consumer001" # 消费者名 3) "pending" 4) (integer) 3 # 未处理的数据 5) "idle" 6) (integer) 61034 127.0.0.1:6379> xack st1 g1 1664179271399-0 # 使用ack表示数据已消费 (integer) 1 127.0.0.1:6379> xinfo consumers st1 g1 # 查看st1 g1下的消费者数据 1) 1) "name" 2) "consumer001" 3) "pending" 4) (integer) 2 # 未处理的数据减少了 5) "idle" 6) (integer) 325172 127.0.0.1:6379> xack st1 g1 1664179863235-0 (integer) 1 127.0.0.1:6379> xack st1 g1 1664179267483-0 # 将剩余的数据进行消费 (integer) 1 127.0.0.1:6379> xinfo consumers st1 g1 # 再次查询 1) 1) "name" 2) "consumer001" 3) "pending" 4) (integer) 0 # 没有数据需要处理了 5) "idle" 6) (integer) 382595
|
一些命令
XGROUP 组操作
XGROUP CREATE 创建组
XGROUP CREATE key groupname <id | $> [MKSTREAM] [ENTRIESTREAD entries_read]
为key创建一个消费者组,名为groupname;
每个组的名字在同一个key里是惟一的;若尝试创建一个已经存在的组,会返回错误
参数:
<id | $>
:从什么开始消费,其中id为指定id,$只接受新的消息,如:
- 123-1 为从id为123-1的节点开始;
- 0-0 为从头开始;
- $,只接受新的消息。
MKSTREAM:添加这个参数后,若指定流不存在,不再返回错误,而是创建名为key的流;
ENTRIESREAD:hole…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 127.0.0.1:6379> xadd st2 123-2 apple 3 stone 5 # 为st2添加数据(一般不用指定id,这里仅学习使用) "123-2" 127.0.0.1:6379> xadd st2 * apple 3 stone 123 # 为st2添加数据,id为自动生成,目前st2的最大id就是下面这个了 "1664183965455-0" 127.0.0.1:6379> xgroup create st2 cg1 $ # 创建一个消费者组,指定$表示获取最新数据 OK 127.0.0.1:6379> xinfo groups st2 1) 1) "name" 2) "cg1" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1664183965455-0" # 使用 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 0
|
XGROUP DESTROY 删除组
XGROUP DESTROY key groupname
完全删除一个消费者组;
不论消费者组中是否存在活跃的消费者或是未处理的数据,会直接将消费者组删除;
返回成功删除的消费者组的数量
1 2 3 4
| 127.0.0.1:6379> xgroup create st1 cg1 0-0 # 创建消费者组,名为cg1,绑定的流为st1,起点0-0 OK 127.0.0.1:6379> xgroup destroy st1 cg1 # 删除消费者组,名为cg1,绑定的流为st1 (integer) 1
|
XGROUP SETID 设置组的消费id
XGROUP SETID key groupname <id | $> [ENTRIESREAD entries_read]
设置消费者组要获取数据的起点id;
ENTRIESREAD:hole…
XGROUP CREATECONSUMER 创建消费者
XGROUP CREATECONSUMER key groupname consumername
创建一个消费者,key为流的名字,groupname为组的名字,consumername为消费者的名字;
可以使用XREADGROUP进行数据消费;
返回值为1或0,表示消费者是否被成功创建
XGROUP DELCONSUMER 删除消费者
XGROUP DELCONSUMER key groupname consumername
删除消费者,key为流的名字,groupname为组的名字,consumername为消费者的名字;
XINFO 信息查询
XINFO CONSUMERS
XINFO CONSUMERS key groupname
返回绑定key的消费者组下的消费者信息;
XINFO GROUPS
XINFO GROUPS key
返回绑定key的所有消费者组;
XINFO STREAM
XINFO STREAM key [FULL [COUNT count]]
返回流的信息;
信息包含:
- length:流中实体的数量;
- radix-tree-keys:
- radix-tree-nodes:
- groups:定义在流中消费者组的数量
- last-generated-id:最后一个实体使用的id;
- max-deleted-entry-id:流中删除的最大id的条目;
- entries-added:在流存活期间,所有添加过的实体的数量;
- first-entry:第一个entry的信息;
- last-entry:最后一个entry的信息。
可选参数:
- FULL:将返回更多的信息;
- 对于entries属性,以升序全部或指定数量的实体信息;
- 对于groups属性,包括XINFO GROUPS和XINFO CONSUMERS返回的信息。
- COUNT:指定FULL中返回实体的数量
消费者操作
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
获取消费组内被读取但未处理完毕的信息;
应用场景:为了解决组内数据被读取但在处理期间消费者崩溃的问题。