# 流数据是怎么产生的

@GetMapping(
    value = "/users/stream",
    produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<User> streamUsers() {

    return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new User(
                    String.valueOf(i),
                    "用户" + i
            ));
}
1
2
3
4
5
6
7
8
9
10
11
12

map是对谁遍历?i为会自增?

这是理解 WebFlux 和 Reactor 的关键。

你看到的代码:

Flux
    .interval(Duration.ofSeconds(1))
    .map(index -> {

        User user = new User();

        user.setId(String.valueOf(index));

        user.setName("用户-" + index);

        return user;
    });
1
2
3
4
5
6
7
8
9
10
11
12

很多人第一次看都会问:

map到底遍历谁?

index为什么会自动增长?


# 先看 Flux.interval()

实际上:

Flux.interval(Duration.ofSeconds(1))
1

等价于创建了一个无限流:

0
1
2
3
4
5
6
...
1
2
3
4
5
6
7
8

每隔1秒产生一个Long类型数据。

类型实际上是:

Flux<Long>
1

即:

Flux<Long> flux =
        Flux.interval(Duration.ofSeconds(1));
1
2

# 它内部在干什么

可以理解成:

while(true){

    Thread.sleep(1000);

    emit(counter++);

}
1
2
3
4
5
6
7

伪代码:

0
↓ 1秒

1
↓ 1秒

2
↓ 1秒

3
↓ 1秒

...
1
2
3
4
5
6
7
8
9
10
11
12
13

所以:

index
1

实际上就是:

0
1
2
3
4
...
1
2
3
4
5
6

# map是对谁遍历

map遍历的是:

Flux.interval()
1

产生的每一个元素。

例如:

Flux.interval(Duration.ofSeconds(1))
1

产生:

0
1
2
3
1
2
3
4

然后:

.map(index -> ...)
1

相当于:

0 -> User(0)
1 -> User(1)
2 -> User(2)
3 -> User(3)
1
2
3
4

流程:

Flux<Long>

0
1
2
3

      map

↓
↓
↓

User(0)
User(1)
User(2)
User(3)

Flux<User>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 类似Java Stream

你熟悉的话:

List<Integer> list =
    Arrays.asList(1,2,3);

list.stream()
    .map(i -> i * 10)
1
2
3
4
5

结果:

1 -> 10
2 -> 20
3 -> 30
1
2
3

Reactor中的:

Flux<Long>
    .map(i -> ...)
1
2

思想完全一样。

区别只是:

Stream一次处理完。

Flux可以不断产生数据。


# 加日志看看

@GetMapping(
        value="/users/stream",
        produces=MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<User> streamUsers(){

    return Flux
            .interval(Duration.ofSeconds(1))
            .map(index -> {

                System.out.println(
                        "收到index="+index
                );

                return new User(
                        String.valueOf(index),
                        "用户-"+index
                );
            });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

控制台:

收到index=0

收到index=1

收到index=2

收到index=3

收到index=4
1
2
3
4
5
6
7
8
9

# Reactor中的数据流思想

你可以把它想成管道:

Flux.interval()

0
1
2
3
4
5
...
1
2
3
4
5
6
7
8
9

进入:

.map(...)
1

变成:

User-0
User-1
User-2
User-3
...
1
2
3
4
5

进入:

.filter(...)
1

例如:

.filter(user ->
    Long.parseLong(user.getId()) % 2 == 0
)
1
2
3

变成:

User-0
User-2
User-4
User-6
...
1
2
3
4
5

进入:

.take(5)
1

变成:

User-0
User-2
User-4
User-6
User-8
1
2
3
4
5

然后发送给浏览器。


# 更接近实际项目的写法

实际开发中,index 很少来自 interval

更多是数据库查询结果:

Flux<User> users =
    userRepository.findAll();
1
2

假设数据库有:

Tom
Jack
Lucy
Bob
1
2
3
4

那么:

users.map(user -> {
    user.setName(
        user.getName().toUpperCase()
    );
    return user;
});
1
2
3
4
5
6

实际遍历的是:

Tom
Jack
Lucy
Bob
1
2
3
4

转换后:

TOM
JACK
LUCY
BOB
1
2
3
4

所以记住一句话:

map() 永远是对 Flux 中流过的每一个元素进行转换。

在你的例子里:

Flux.interval()
1

流过的是:

0
1
2
3
4
...
1
2
3
4
5
6

所以 index 自动递增;而 map(index -> ...) 就是在把这些数字一个个转换成 User 对象。

Last Updated: 6/12/2026, 4:20:22 AM