Redis 4 Stream
uwupu 啦啦啦啦啦

Stream

介绍

https://redis.io/docs/data-types/streams-tutorial/

说明

Stream是一个只能添加数据的数据结构;依赖于添加命令XADD,Stream可以添加新的entry到指定流;

每个Stream Entry都是一个field-value;

示例1:XADD使用

1
2
127.0.0.1:6379> XADD myStream * sensor-id 12 temperature 11.1
"1663848814626-0"

在上面的命令中,

  • XADD为myStream添加了{sensor-id:12, temperature:11.1}属性;
  • 第一个参数myStream为指定的key的名字;
  • 第二个参数是一个ID,每个entry都有一个ID;这里使用“*”,意为让服务器自动生成一个ID;
  • 服务器生成的ID会依据生成先后自动增加;
  • 大多数情况下都使用服务器自动生成的结果作为ID,无需手动提供ID;
  • 后面的参数即field-value,用于提供要保存的键值对。

示例2:XLEN

1
2
127.0.0.1:6379> XLEN myStream
(integer) 1

XLEN可以获取流的长度。

Entry ID介绍

Entry ID的格式如下:

1
<millisecondsTime>-<sequenceNumber>

millisecondsTime即当前本地的时间;

Entry ID都是逐步递增的,如果Redis获取的时间上个提交的Entry时间之前,Redis将使用最新的进行记录(即上个提交的Entry时间);

为什么用时间作为ID?。(从文档翻译)用时间作为ID可能会很奇怪,有些读者可能想知道问什么这样做。原因是Redis支持依据ID值的范围进行查询Entry,如果将时间作为ID,就可以很方便地依据时间去查询Entry。(e…Long型的时间好像也不方便…)

Entry最小的ID是0-1,Redis不允许ID小于上一次添加的Entry的ID;

示例3:ID可以仅提供milliseconds部分,由系统去生成后半部分

如果用的Redis版本是7或以上,可以提供ID的前半部分,后半部分使用*代替,Redis会自动生成后半部分且逐次递增;

1
2
3
4
5
6
127.0.0.1:6379> XADD s5 0-* apple 1
"0-1"
127.0.0.1:6379> XADD s5 0-* apple 2
"0-2"
127.0.0.1:6379> XADD s5 0-* apple 3
"0-3"

示例4:查询的操作

可以用时间范围去查询Entry

只需要指定两个ID来表示起点和终点,就可以查询一个流;

两个特殊的参数“-”和“+”分别表示最小ID和最大ID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
127.0.0.1:6379> XADD s5 0-* apple 1
"0-1"
127.0.0.1:6379> XADD s5 0-* apple 2
"0-2"
127.0.0.1:6379> XADD s5 0-* apple 3
"0-3"
127.0.0.1:6379> xrange s5 - +
1) 1) "0-1"
2) 1) "apple"
2) "1"
2) 1) "0-2"
2) 1) "apple"
2) "2"
3) 1) "0-3"
2) 1) "apple"
2) "3"

每个entry返回一个含有field-value的数组;

在XADD指定ID时使用*,则ID的左半部分表示当时的时间,这将意味着我们可以使用XRANGE去查询一个时间范围内的所有Entry;

因为使用毫秒时间作为ID的左半部分,所以XRANGE查询的时间甚至可以精确到毫秒;

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> xadd s6 * apple 1
"1663850754240-0"
127.0.0.1:6379> xadd s6 * apple 2
"1663850755536-0"
127.0.0.1:6379> xadd s6 * apple 3
"1663850756295-0"
127.0.0.1:6379> xrange s6 1663850754240 1663850754241 # 查询1毫秒间的日志
1) 1) "1663850754240-0"
2) 1) "apple"
2) "1"

XRANGE可以限制查询数量

有时候,在有些范围内,可能会有特别多的Entry,这样查询起来可能会麻烦;

使用XRANGE的COUNT参数可以限制查询的数量;

1
2
3
4
5
6
7
8
9
10
11
12
13
127.0.0.1:6379> xadd s6 * apple 1
"1663850754240-0"
127.0.0.1:6379> xadd s6 * apple 2
"1663850755536-0"
127.0.0.1:6379> xadd s6 * apple 3
"1663850756295-0"
127.0.0.1:6379> xrange s6 - + COUNT 2
1) 1) "1663850754240-0"
2) 1) "apple"
2) "1"
2) 1) "1663850755536-0"
2) 1) "apple"
2) "2"

XRANGE可以查询接下来的两个Entry;

将查询结果的最后一个ID作为查询的起点,使用“(”前缀,可以查询接下来的两个Entry;

1
2
3
4
5
6
7
127.0.0.1:6379> xrange s6 (1663850755536-0 + COUNT 2
1) 1) "1663850756295-0"
2) 1) "apple"
2) "3"
# 左括号表示不包含当前项
# +表示最高ID
# COUNT 2 表示查询两个Entry

XRANGE的复杂度是O(log(N)),返回M个元素则另外包含O(M),复杂度是对数,这样效率是比较高的,所以可以代替XSCAN;

XREVRANGE

XREVRANGE等效于XRANGE,不同的是XREVRANGE的顺序是相反的;

通过XREVRANGE获取最新的Entry:

1
2
3
4
127.0.0.1:6379> xrevrange s6 + - COUNT 1
1) 1) "1663850756295-0"
2) 1) "apple"
2) "3"

示例5:XREAD 监听新元素的添加

有时候我们可能不需要通过一个返回从流中获取数据,仅仅只是想获取流中新抵达的元素(订阅流中新抵达的元素);

这就是Redis的发布/订阅;

阻塞队列与Stream是不同的

  1. Stream收到的新的元素,会平等的发送到每个等待的消费者;而在阻塞队列中,消费者获取到的元素是不同的;
  2. Stream会将收到的元素无限期地存储在流中(除非用户指定),而阻塞队列在pop后会将元素删除;
  3. Streams Consumer Groups提供了Pub/Sub的控制级别功能;每个消费者会被手动地分配到不同组,对于不同组,则有不同的访问范围;StreamsConsumerGroups能够记录项目是否被完成,同时记录消费者处理事务的历史,对于每个消费者,只能获取到自己处理的事务;

对于消费者,使用XREAD来监听流的新到元素;

通过XREAD获取流中的最新数据

1
2
3
4
5
6
7
8
127.0.0.1:6379> XREAD COUNT 2 STREAMS s6 0
1) 1) "s6"
2) 1) 1) "1663850754240-0"
2) 1) "apple"
2) "1"
2) 1) "1663850755536-0"
2) 1) "apple"
2) "2"

其中,STREAMS是必要参数,用于指定key和一个ID;

Redis会返回流中大于指定ID的entry;

STREAMS参数后面可以跟多个key,key后面根同等数量的ID,如:STREAMS s1 s2 0 0;因此STREAMS参数必须放在命令的最后面;

XREAD可以阻塞网络直到有新的entry被添加

添加一个BLOCK参数就可以阻塞网络直到获取到新的被添加entry;

BLOCK后面根超时时间,0表示无限等待;

使用“$”表示流内最新的Entry的ID;

image

在示例中:

  • XREAD被阻塞
    • 0表示无限期等待
    • STREAMS用于指定key和ID;
    • $表示s6下最新的ID
  • 当XADD执行结束后,XREAD阻塞取消,返回新添加的值;

XREAD BLOCK 可以不使用$作为id;

在此模式下,STREAMS可以提供多个key和ID,此时如果其中一个流添加了新的数据,将从该流读取出现的新数据并返回。

XREAD不会删除数据,仅仅只是读取数据。

使用示例

添加几个温度记录到Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
127.0.0.1:6379> xadd temperatures:table:1001 apple 23.2 bread 18.2 milk 11.2 # 苹果23.2 面包18.2 牛奶11.2 一般这里表示硬件温度;
(error) ERR Invalid stream ID specified as stream command argument
127.0.0.1:6379> xadd temperatures:table:1001 * apple 23.2 bread 18.2 milk 11.2

"1663773543545-0" # 返回的streamID
127.0.0.1:6379> xadd temperatures:table:1001 * apple 24.1 bread 14.7 milk 12.3

"1663773560847-0"
127.0.0.1:6379> xadd temperatures:table:1001 * apple 21.1 bread 12.7 milk 15.3

"1663773569889-0"
127.0.0.1:6379> xrange temperatures:table:1001 1663773543545-0 + COUNT 3 #查询
1) 1) "1663773543545-0"
2) 1) "apple"
2) "23.2"
3) "bread"
4) "18.2"
5) "milk"
6) "11.2"
2) 1) "1663773560847-0"
2) 1) "apple"
2) "24.1"
3) "bread"
4) "14.7"
5) "milk"
6) "12.3"
3) 1) "1663773569889-0"
2) 1) "apple"
2) "21.1"
3) "bread"
4) "12.7"
5) "milk"
6) "15.3"

在流的尾部,读取100个新的stream entries,如果没有entries被写入则阻塞等待300ms;

1
XREAD COUNT 100 BLOCK 300 STREAMS temperatures:table $

基础命令

XADD:添加一个新的entry到stream;

XREAD:在给定的位置开始并向前读取,读取一个或多个entries;

XRANGE:返回给定两个ID指向的entry之间的entry;

XLEN:返回流的长度;

性能

添加entry到stream是O(1);

访问任意单个entry是O(n),n为ID的length;

其他命令

赋权 XGROUP XINFO 整理
XADD XACK XLEN XAUTOCLAIM CREATE COMSUMERS XTRIM
XDEL XPENDING XCLAIM CREATECONSUMER GROUPS
XRANGE DELCONSUMER STREAM
XREVRANGE DESTROY
XREAD SETID
XREADGROUP

TRIM 整理/修剪

TRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]

TRIM可以使流在需要时移除旧的entries;

参数

  • MAXLEN:指定流要控制的长度,这样做会移除旧的entries直到流的长度控制为指定长度,长度必须为整数;(在文档里,这里用threshold来表示需要控制的长度)

  • MINID:移除id低于threshold的entries;

  • **~**:仅在使用MAXLEN时使用。使用参数后,Redis可能不会准确地将entries数量修剪到指定长度,而是比指定长度多一点或等于指定长度;

    • 使用~后,Redis会尽早的停止修剪,以促进性能节约;
    • 有时候,用户并非需要精确地将流裁剪为指定长度,可以有些误差,但不能少于指定值,使用“~”后,可以达到要求的同时提高性能。
  • LIMIT:限制TRIM移除的entries最大大小;

    • 有时候,如果需要修剪entries过多,可能会影响性能,通过LIMIT限制本次操作要清理的entries数量,可以尽快地结束操作进行;
    • 若没有指定LIMIT参数,将以entries的数量x100作为count值;
    • 指定count=0,可以禁止移除数量限制。

返回值

被移除的entries数量。

使用

修剪流使流达到指定长度
1
2
127.0.0.1:6379> xtrim s MAXLEN 1000 # 这里指定长度为1000
(integer) 0
移除ID小于指定值的entries
1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379> xadd s * a 2 b 3  # a=2 b=3
"1663845075734-0"
127.0.0.1:6379> xadd s * a 12 b 123 # a=12 b=123
"1663845081393-0"
127.0.0.1:6379> xtrim s MINID 1663845075735 # 修剪1663845075735,这个值比上面第一个大1.
(integer) 1
127.0.0.1:6379> xrange s 1663845075734-0 + COUNT 3 #输出从1663845075734开始往后数3个
1) 1) "1663845081393-0"
2) 1) "a"
2) "12"
3) "b"
4) "123"

XADD 增加entry到流

XADD key [NOMKSTREAM] [<MAXLEN | MINID>] [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

添加entry到指定流;

一个entry由一个键值对组成;

XRANGE和XREAD可以用来从流中读取数据;

XADD可以向流中添加数据;

XDEL和XTRIM可以从流中删除数据。

使用
使用参数指定Stream ID

如果使用“*”指定ID,XADD命令将为流自动生成唯一的Stream ID;

ID一般由Redis生成,但也可以自己手动指定ID:

  • ID的格式为:123456789-33
  • 两个数字都是64bit的数字,ID生成过程中,第一个数字使用Unix时间生成,第二部分是一个序列号,用于区分在同一时间生成的不同数据;
  • 可以手动指定一个不完整的id,如:123456789-*。用*表示的第二部分将由Redis去生成;
  • ID保证生成过程是递增的;若在某次插入过程中,指定的ID比之前大得多,在后来的插入过程,将按照这个指定的ID继续进行递增;
限制流的大小

若流的内容条数等于指定的允许大小(也就是满了),在新添加内容时,XADD将最旧的条目逐出流,然后将新的条目添加进去;

[<MAXLEN | MINID>] [= | ~] threshold [LIMIT count]]的用法详见XTRIM

参数

NOMKSTREAM:若key不存在,不创建新的对象;

XDEL

XDEL key id [id ...]

从流中移除指定的entry,返回被移除的entry的数量;

原理

XDEL并非真正地将entry从radix tree中移除,仅仅只是将entry标注为被删除;

如果一个宏节点中所有的entry被删除,整个节点会被销毁,内存将会被回收;

对于流来说,为保持流的性能,XDEL仅仅只是将entry标注为deleted,因此会产生大量的碎片,这是一个较为严重的问题;

在未来的版本中,Redis可能会添加垃圾回收机制去清理被删除的节点,但是这样会增加复杂度,这不是一个好主意(即有内存碎片,但是目前不打算解决

使用
1
2
3
4
5
6
7
8
9
10
11
12
127.0.0.1:6379> xadd s3 * apple 1 cat 2
"1663847348847-0"
127.0.0.1:6379> xadd s3 * apple 12 cat 22
"1663847354912-0"
127.0.0.1:6379> xdel s3 1663847348847-0
(integer) 1
127.0.0.1:6379> xrange s3 1663847348847-0 + COUNT 10
1) 1) "1663847354912-0"
2) 1) "apple"
2) "12"
3) "cat"
4) "22"

XREAD

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

读取一个或多个流,只返回带有id或大于id的数据表;

参数:

COUNT:数量

BLOCK:若当前没有数据则阻塞,直到有新的数据输入,0表示无限期阻塞等待,非0表示超时等待;

STREAMS:可以提供多个流和id,将从多个流中读取数据;

1
2
3
4
5
6
7
8
9
127.0.0.1:6379> xadd stream001 * apple 3 stone 5  # 插入一个数据
"1664182678233-0"
127.0.0.1:6379> xread COUNT 1 STREAMS stream001 0-0 # 查询一个数据
1) 1) "stream001"
2) 1) 1) "1664182678233-0"
2) 1) "apple"
2) "3"
3) "stone"
4) "5"

内部命令

XSETID

XSETID key last-id [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]

用于让Redis复制最后交付的流的ID;

RPC

 评论