關于響應式編程的理解與SpringCloudGateway的理解
- 一. 響應式編程與函數式編程的區別
- 二. 響應式編程中常用的組件
- 2.1 RxJava定義
- 2.2 Rxjava基本概念
- 2.3 RxJava 用法
- 三 SpringcloudGateway
- 四 常見的四種限流規則
一. 響應式編程與函數式編程的區別
總的來說,響應式編程主要體現在①異步
、②觀察者模式
以webflux和servlet為例子
一個是每個api請求代表著一個線程。
另一個是一個主線程內,可以使用多個子線程,達到異步的效果。
二. 響應式編程中常用的組件
2.1 RxJava定義
RxJava是一個可以在JVM上運行的,基于觀察者模式 實現異步操作的java庫。
2.2 Rxjava基本概念
// 創建觀察者
Observer observer = new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe");}@Overridepublic void onNext(String o) {}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError data is :" + e.toString());}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}
};// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(@NonNull ObservableEmitter e) throws Exception {e.onNext("hello");e.onNext("world");e.onComplete();}
});
// 訂閱
observable.subscribe(observer);
2.3 RxJava 用法
- Rxjava 實現線程切換
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {//1、“異步線程” 執行耗時操作//2、“執行完畢” 調用onNext觸發回調,通知觀察者e.onNext("1");e.onComplete();}
}).subscribeOn(Schedulers.io()) // io線程.observeOn(AndroidSchedulers.mainThread()) // 主線程.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {// 訂閱線程 訂閱的那一刻在訂閱線程中執行}@Overridepublic void onNext(String value) {// “主線程”執行的方法}@Overridepublic void onError(Throwable e) {// "主線程"執行的方法}@Overridepublic void onComplete() {// "主線程"執行的方法}});
- Rxjava 使用操作符
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {// IO 線程// 請求網絡數據e.onNext("123456");}
}).map(new Function<String, Integer>() {@Overridepublic Integer apply(String s) {// IO 線程// 網絡數據解析(數據轉化)//// throw new RequestFailException("獲取網絡請求失敗");return 123;}
}).doOnNext(new Consumer<Integer>() { //保存登錄結果UserInfo@Overridepublic void accept(@NonNull Integer bean) throws Exception {// IO 線程// 保存網絡數據}
}).subscribeOn(Schedulers.io()) //IO線程
.observeOn(AndroidSchedulers.mainThread()) //主線程
.subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer bean) throws Exception {// 更新UI}
}, new Consumer<Throwable>() {@Overridepublic void accept(@NonNull Throwable throwable) throws Exception {// 錯誤 顯示錯誤頁面}
});
- Flowable
Flowable是為了應對
Backpressure(背壓)
產生的。
Backpressure(背壓) 即生產者的生產速度大于消費者的消費能力引起的問題。
異步線程中 生產者有無限的生產能力;
主線程中 消費者消費能力不足,從而造成事件無限堆積,最后導致OOM。
Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {//1、“異步線程” 執行耗時操作//2、“執行完畢” 調用onNext觸發回調,通知觀察者emitter.onNext(0);emitter.onComplete();}// 若消費者消費能力不足,則拋出MissingBackpressureException異常
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {// 訂閱時執行,發生在“訂閱線程”// 這個方法是用來向生產者申請可以消費的事件數量// 這里表明消費者擁有Long.MAX_VALUE的消費能力s.request(Long.MAX_VALUE);}@Overridepublic void onNext(Integer integer) {// “主線程”執行的方法}@Overridepublic void onError(Throwable t) {// "主線程"執行的方法}@Overridepublic void onComplete() {// "主線程"執行的方法}});
各種背壓策略在此不做多說明。
三 SpringcloudGateway
是使用webFlux編寫的網關組件。
webflux與rxjava的異步編程是兩種哲學編程概念。但是都是體現在了異步與觀察者模式。
關于SpringcloudGateway,我理解我們需要做的東西
-
導入springcloudgateway依賴與nacos依賴。
-
書寫配置類
2.1 GatewayConfiguration,這個配置類用于定義全局異常
與全局過濾器
。
2.2 RateLimiterConfiguration,這個配置類用于定義限流規則。 -
具體有關限流的配置,在
nacos配置中心
配置。
四 常見的四種限流規則
給大家推薦一篇文章