# webFlux返回流数据
@GetMapping("/users")
public Flux<User> getAllUsers() {
return Flux.fromArray(users);
}
1
2
3
4
2
3
4
虽然返回的是 Flux<User>,但实际上浏览器收到的还是:
[
{
"id":"1",
"name":"Alice"
},
{
"id":"2",
"name":"Bob"
},
{
"id":"3",
"name":"Charlie"
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
原因是:
Spring WebFlux 默认会等待 Flux 完成,然后序列化成一个 JSON 数组。
# 普通JSON响应流程
Flux<User>
↓
User1
User2
User3
↓ 等待全部结束
[
{...},
{...},
{...}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
前端:
fetch("/users")
.then(res => res.json())
1
2
2
必须等整个数组返回完。
# 真正的流式返回
WebFlux支持:
方式1:SSE(推荐)
Server Sent Event
后端:
@GetMapping(
value = "/users/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<User> streamUsers() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> new User(
String.valueOf(i),
"用户" + i
));
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
关键:
produces = MediaType.TEXT_EVENT_STREAM_VALUE
1
即:
Content-Type: text/event-stream
1
# 返回效果
浏览器收到的不是:
[
{...},
{...}
]
1
2
3
4
2
3
4
而是:
data: {"id":"0","name":"用户0"}
data: {"id":"1","name":"用户1"}
data: {"id":"2","name":"用户2"}
data: {"id":"3","name":"用户3"}
1
2
3
4
5
6
7
2
3
4
5
6
7
一条一条推送。
# 前端适配
不能再用:
fetch().then(res=>res.json())
1
而是:
const eventSource =
new EventSource("/users/stream");
eventSource.onmessage = function(event) {
const user = JSON.parse(event.data);
console.log(user);
document.body.innerHTML += `
<div>
${user.id} - ${user.name}
</div>
`;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 完整流程图
┌─────────────┐
│ 浏览器 │
└──────┬──────┘
│
│ GET /users/stream
▼
┌─────────────┐
│ WebFlux │
│ Controller │
└──────┬──────┘
│
│ Flux.interval()
▼
┌─────────────┐
│ Reactor Flux│
└──────┬──────┘
│
│ 每秒产生一个User
▼
┌──────────────────────┐
│ text/event-stream │
└──────┬───────────────┘
│
│ data:{...}
▼
┌─────────────┐
│ 浏览器 │
│ EventSource │
└──────┬──────┘
│
│ 实时收到
▼
┌─────────────┐
│ 页面渲染 │
└─────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 如果是AI聊天(ChatGPT这种)
通常不用SSE对象,而是:
@GetMapping(
value = "/chat",
produces = MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<String> chat() {
return Flux.just(
"你好",
"我是",
"ChatGPT"
)
.delayElements(Duration.ofSeconds(1));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
返回:
data:你好
data:我是
data:ChatGPT
1
2
3
4
5
2
3
4
5
前端:
const source =
new EventSource("/chat");
source.onmessage = e => {
console.log(e.data);
};
1
2
3
4
5
6
2
3
4
5
6
# 企业项目中的AI流式输出
例如:
Vue
↓
fetch()
↓
Spring WebFlux
↓
WebClient
↓
DeepSeek/OpenAI
↓
Flux<String>
↓
SSE
↓
Vue实时渲染
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
代码通常长这样:
@GetMapping(
value="/chat",
produces=MediaType.TEXT_EVENT_STREAM_VALUE
)
public Flux<String> chat(String msg){
return webClient.post()
.uri("/v1/chat/completions")
.retrieve()
.bodyToFlux(String.class);
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
对应前端(Vue/Nuxt):
const response = await fetch('/chat');
const reader =
response.body.getReader();
const decoder =
new TextDecoder();
while(true){
const {done,value}
= await reader.read();
if(done) break;
const chunk =
decoder.decode(value);
console.log(chunk);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这里已经不是等全部完成后返回 JSON,而是边生成边返回(Chunked Transfer Encoding + Flux),这才是大模型、股票行情、日志监控等场景真正使用的 WebFlux 流式模式。