# 流数据是怎么产生的
@GetMapping(
value = "/users/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<User> streamUsers() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> new User(
String.valueOf(i),
"用户" + i
));
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
map是对谁遍历?i为会自增?
这是理解 WebFlux 和 Reactor 的关键。
你看到的代码:
Flux
.interval(Duration.ofSeconds(1))
.map(index -> {
User user = new User();
user.setId(String.valueOf(index));
user.setName("用户-" + index);
return user;
});
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
很多人第一次看都会问:
map到底遍历谁?
index为什么会自动增长?
# 先看 Flux.interval()
实际上:
Flux.interval(Duration.ofSeconds(1))
1
等价于创建了一个无限流:
0
1
2
3
4
5
6
...
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
每隔1秒产生一个Long类型数据。
类型实际上是:
Flux<Long>
1
即:
Flux<Long> flux =
Flux.interval(Duration.ofSeconds(1));
1
2
2
# 它内部在干什么
可以理解成:
while(true){
Thread.sleep(1000);
emit(counter++);
}
1
2
3
4
5
6
7
2
3
4
5
6
7
伪代码:
0
↓ 1秒
1
↓ 1秒
2
↓ 1秒
3
↓ 1秒
...
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
所以:
index
1
实际上就是:
0
1
2
3
4
...
1
2
3
4
5
6
2
3
4
5
6
# map是对谁遍历
map遍历的是:
Flux.interval()
1
产生的每一个元素。
例如:
Flux.interval(Duration.ofSeconds(1))
1
产生:
0
1
2
3
1
2
3
4
2
3
4
然后:
.map(index -> ...)
1
相当于:
0 -> User(0)
1 -> User(1)
2 -> User(2)
3 -> User(3)
1
2
3
4
2
3
4
流程:
Flux<Long>
0
1
2
3
map
↓
↓
↓
User(0)
User(1)
User(2)
User(3)
Flux<User>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 类似Java Stream
你熟悉的话:
List<Integer> list =
Arrays.asList(1,2,3);
list.stream()
.map(i -> i * 10)
1
2
3
4
5
2
3
4
5
结果:
1 -> 10
2 -> 20
3 -> 30
1
2
3
2
3
Reactor中的:
Flux<Long>
.map(i -> ...)
1
2
2
思想完全一样。
区别只是:
Stream一次处理完。
Flux可以不断产生数据。
# 加日志看看
@GetMapping(
value="/users/stream",
produces=MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<User> streamUsers(){
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> {
System.out.println(
"收到index="+index
);
return new User(
String.valueOf(index),
"用户-"+index
);
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
控制台:
收到index=0
收到index=1
收到index=2
收到index=3
收到index=4
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# Reactor中的数据流思想
你可以把它想成管道:
Flux.interval()
0
1
2
3
4
5
...
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
进入:
.map(...)
1
变成:
User-0
User-1
User-2
User-3
...
1
2
3
4
5
2
3
4
5
进入:
.filter(...)
1
例如:
.filter(user ->
Long.parseLong(user.getId()) % 2 == 0
)
1
2
3
2
3
变成:
User-0
User-2
User-4
User-6
...
1
2
3
4
5
2
3
4
5
进入:
.take(5)
1
变成:
User-0
User-2
User-4
User-6
User-8
1
2
3
4
5
2
3
4
5
然后发送给浏览器。
# 更接近实际项目的写法
实际开发中,index 很少来自 interval。
更多是数据库查询结果:
Flux<User> users =
userRepository.findAll();
1
2
2
假设数据库有:
Tom
Jack
Lucy
Bob
1
2
3
4
2
3
4
那么:
users.map(user -> {
user.setName(
user.getName().toUpperCase()
);
return user;
});
1
2
3
4
5
6
2
3
4
5
6
实际遍历的是:
Tom
Jack
Lucy
Bob
1
2
3
4
2
3
4
转换后:
TOM
JACK
LUCY
BOB
1
2
3
4
2
3
4
所以记住一句话:
map()永远是对 Flux 中流过的每一个元素进行转换。
在你的例子里:
Flux.interval()
1
流过的是:
0
1
2
3
4
...
1
2
3
4
5
6
2
3
4
5
6
所以 index 自动递增;而 map(index -> ...) 就是在把这些数字一个个转换成 User 对象。