Java 響應式編程是一種基于異步數據流處理的編程范式,它強調數據流的聲明式構建和傳播變化的自動響應。Java 9 引入的Flow API
為響應式編程提供了標準接口,而 Reactor 和 RxJava 等第三方庫則提供了更豐富的操作符和工具。
核心概念
- Publisher(發布者):產生數據流的源頭。
- Subscriber(訂閱者):消費數據流的接收者。
- Subscription(訂閱):連接發布者和訂閱者的橋梁,管理背壓(Backpressure)。
- Processor(處理者):兼具發布者和訂閱者的功能,用于轉換數據流。
簡單示例:使用 Java Flow API
下面是一個使用 Java 標準庫Flow API
的簡單響應式編程示例:
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;public class ReactiveExample {public static void main(String[] args) throws InterruptedException {// 創建發布者try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {// 創建訂閱者Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("訂閱成功");this.subscription = subscription;subscription.request(1); // 請求1個數據}@Overridepublic void onNext(String item) {System.out.println("接收到數據: " + item);subscription.request(1); // 處理完后再請求1個}@Overridepublic void onError(Throwable throwable) {System.out.println("發生錯誤: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("數據流處理完成");}};// 訂閱publisher.subscribe(subscriber);// 發布數據publisher.submit("Hello");publisher.submit("Reactive");publisher.submit("World");// 等待所有數據處理完成Thread.sleep(1000);}}
}
常用操作符(以 Reactor 庫為例)
Reactor 是 Spring 生態中推薦的響應式編程庫,提供了Mono
(0-1 個元素)和Flux
(0-N 個元素)兩種核心類型:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;public class ReactorExample {public static void main(String[] args) {// 創建FluxFlux<String> flux = Flux.just("A", "B", "C").map(String::toLowerCase) // 轉換操作.filter(s -> s.startsWith("a")); // 過濾操作// 創建MonoMono<String> mono = Mono.just("Hello").flatMap(s -> Mono.just(s + " World")); // 異步轉換// 訂閱并消費flux.subscribe(System.out::println, // 正常數據處理Throwable::printStackTrace, // 錯誤處理() -> System.out.println("Flux完成") // 完成回調);mono.subscribe(System.out::println);}
}
背壓(Backpressure)處理
響應式編程的重要特性是支持背壓,即消費者可以控制生產者發送數據的速率:
Flux.range(1, 1000) // 生成1到1000的整數.onBackpressureBuffer(100) // 緩沖100個元素.subscribe(num -> {// 模擬慢速處理try { Thread.sleep(100); } catch (InterruptedException e) {}System.out.println(num);},Throwable::printStackTrace,() -> System.out.println("處理完成"));
響應式 Web 示例(Spring WebFlux)
Spring WebFlux 是基于 Reactor 的響應式 Web 框架:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@SpringBootApplication
public class WebFluxExample {public static void main(String[] args) {SpringApplication.run(WebFluxExample.class, args);}
}@RestController
class HelloController {@GetMapping("/hello")public Mono<String> hello() {return Mono.just("Hello, Reactive Web!");}
}
總結
Java 響應式編程通過異步數據流提供了高效處理大量并發請求的能力,適合構建非阻塞、低延遲的應用程序。主要應用場景包括微服務、實時數據處理和高并發系統。