1、pom依賴添加
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
2、事例代碼
package com.pojo.prj.controller;import com.pojo.common.core.utils.StringUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;import java.time.Duration;
import java.util.Map;@RestController
public class TestController {@GetMapping(value = "/stream/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<Map<String,String>> streamFlux() {// 每隔 1 秒發送一條數據,共發送 10 條String query = "select * from test";return Flux.interval(Duration.ofSeconds(1)).map(sequence -> StringUtils.streamFlux(query,sequence)).take(10);}
}
StringUtils.streamFlux的方法
public static Map<String, String> streamFlux(String query, Long sequence) {Map<String, String> map = new HashMap<>();map.put(sequence + "", query + " " + sequence);return map;}
- 在 @GetMapping 中設置 produces = MediaType.TEXT_EVENT_STREAM_VALUE 表示以 SSE 格式推送數據。
- Flux.interval(...) 每隔一秒生成一個遞增的數字序列,然后通過 map 操作轉換成map消息 。
- take(10) 限制只發送 10 個數據,流結束后自動關閉。
這種方式適用于響應式編程,并且可以充分利用 Reactor 框架的特性實現復雜數據流邏輯。
測試效果
nginx在配置代理SSE接口時需加一下配置
proxy_http_version 1.1; # 強制使用HTTP/1.1協議?:proxy_buffering off; # 關閉響應緩沖,確保流式傳輸?proxy_set_header Connection '';
?