java9 反應編程_Java9第四篇-Reactive Stream API響應式編程

876128abed77

file

我計劃在后續的一段時間內,寫一系列關于java 9的文章,雖然java 9 不像Java 8或者Java 11那樣的核心java版本,但是還是有很多的特性值得關注。期待您能關注我,我將把java 9 寫成一系列的文章,大概十篇左右。

Java 9的 Reactive Streams是對異步流式編程的一種實現。它基于異步發布和訂閱模型,具有非阻塞“背壓”數據處理的特點。

Non-blocking Back Pressure(非阻塞背壓):它是一種機制,讓發布訂閱模型中的訂閱者避免接收大量數據(超出其處理能力),訂閱者可以異步通知發布者降低或提升數據生產發布的速率。它是響應式編程實現效果的核心特點!

一、Java9 Reactive Stream API

Java 9提供了一組定義響應式流編程的接口。所有這些接口都作為靜態內部接口定義在java.util.concurrent.Flow類里面。

876128abed77

file

下面是Java 響應式編程中的一些重要角色和概念,先簡單理解一下

發布者(Publisher)是潛在的無限數量的有序數據元素的生產者。 它根據收到的需求(subscription)向當前訂閱者發布一定數量的數據元素。

訂閱者(Subscriber)從發布者那里訂閱并接收數據元素。與發布者建立訂閱關系后,發布者向訂閱者發送訂閱令牌(subscription),訂閱者可以根據自己的處理能力請求發布者發布數據元素的數量。

訂閱令牌(subscription)表示訂閱者與發布者之間建立的訂閱關系。 當建立訂閱關系后,發布者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發布者進行交互,例如請求數據元素的數量或取消訂閱。

二、Java響應式編程四大接口

2.1.Subscriber Interface(訂閱者訂閱接口)

public static interface Subscriber {

public void onSubscribe(Subscription subscription);

public void onNext(T item);

public void onError(Throwable throwable);

public void onComplete();

}

onSubscribe:在發布者接受訂閱者的訂閱動作之后,發布任何的訂閱消息之前被調用。新創建的Subscription訂閱令牌對象通過此方法傳遞給訂閱者。

onNext:下一個待處理的數據項的處理函數

onError:在發布者或訂閱遇到不可恢復的錯誤時調用

onComplete:當沒有訂閱者調用(包括onNext()方法)發生時調用。

2.2.Subscription Interface (訂閱令牌接口)

訂閱令牌對象通過Subscriber.onSubscribe()方法傳遞

public static interface Subscription {

public void request(long n);

public void cancel();

}

request(long n)是無阻塞背壓概念背后的關鍵方法。訂閱者使用它來請求n個以上的消費項目。這樣,訂閱者控制了它當前能夠接收多少個數據。

cancel()由訂閱者主動來取消其訂閱,取消后將不會在接收到任何數據消息。

2.3.Publisher Interface(發布者接口)

@FunctionalInterface

public static interface Publisher {

public void subscribe(Subscriber super T> subscriber);

}

調用該方法,建立訂閱者Subscriber與發布者Publisher之間的消息訂閱關系。

2.4.Processor Interface(處理器接口)

處理者Processor 可以同時充當訂閱者和發布者,起到轉換發布者——訂閱者管道中的元素的作用。用于將發布者T類型的數據元素,接收并轉換為類型R的數據并發布。

public static interface Processor extends Subscriber, Publisher {

}

二、實戰案例

現在我們要去實現上面的四個接口來完成響應式編程

Subscription Interface訂閱令牌接口通常不需要我們自己編程去實現,我們只需要在知道request()方法和cancle()方法含義即可。

Publisher Interface發布者接口,Java 9 已經默認為我們提供了實現SubmissionPublisher,該實現類除了實現Publisher接口的方法外,提供了一個方法叫做submit()來完成消息數據的發送。

Subscriber Interface訂閱者接口,通常需要我們自己去實現。因為在數據訂閱接收之后,不同的業務有不同的處理邏輯。

Processor實際上是 Publisher Interface和Subscriber Interface的集合體,有需要數據類型轉換及數據處理的需求才去實現這個接口

下面的例子實現的式字符串的數據消息訂閱處理

實現訂閱者Subscriber Interface

import java.util.concurrent.Flow;

public class MySubscriber implements Flow.Subscriber {

private Flow.Subscription subscription; //訂閱令牌

@Override

public void onSubscribe(Flow.Subscription subscription) {

System.out.println("訂閱關系建立onSubscribe: " + subscription);

this.subscription = subscription;

subscription.request(2);

}

@Override

public void onNext(String item) {

System.out.println("item: " + item);

// 一個消息處理完成之后,可以繼續調用subscription.request(n);向發布者要求數據發送

//subscription.request(n);

}

@Override

public void onError(Throwable throwable) {

System.out.println("onError: " + throwable);

}

@Override

public void onComplete() {

System.out.println("onComplete");

}

}

SubmissionPublisher消息發布者

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Flow;

import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {

public static void main(String[] args) throws InterruptedException {

ExecutorService executor = Executors.newFixedThreadPool(1);

SubmissionPublisher sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());

sb.subscribe(new MySubscriber()); //建立訂閱關系,可以有多個訂閱者

sb.submit("數據 1"); //發送消息1

sb.submit("數據 2"); //發送消息2

sb.submit("數據 3"); //發送消息3

executor.shutdown();

}

}

控制臺打印輸出結果

訂閱關系建立

onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39

item: 數據 1

item: 數據 2

請注意:即使發布者submit了3條數據,MySubscriber也僅收到了2條數據進行了處理。是因為我們在MySubscriber#onSubscribe()方法中使用了subscription.request(2);。這就是“背壓”的響應式編程效果,我有能力處理多少數據,就會通知消息發布者給多少數據。

歡迎關注我的博客,里面有很多精品合集

本文轉載注明出處(必須帶連接,不能只轉文字):字母哥博客。

覺得對您有幫助的話,幫我點贊、分享!您的支持是我不竭的創作動力! 。另外,筆者最近一段時間輸出了如下的精品內容,期待您的關注。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/530138.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/530138.shtml
英文地址,請注明出處:http://en.pswp.cn/news/530138.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

bb10系統支持java嗎_黑莓BB10怎么樣 BlackBerry 10系統好用嗎?

曾幾何時黑莓Blackberry OS是一款十分受用戶歡迎的手機系統&#xff0c;不過隨著手機系統市場已經被蘋果iOS、谷歌安卓、微軟Windows Phone三分天下&#xff0c;致使曾經的黑莓帝國逐漸淪陷&#xff0c;體驗和性能都已經明顯跟不上iOS與安卓等系統的腳步了&#xff0c;也因為如…

java中興參與實參相同_中興通訊_傳輸SDH試題(含答案)

中興傳輸SDH試題一、單項選擇題(每小題2分&#xff0c;共30分)1、在SDH系統中, RSOH指(A)。A.再生段開銷B.復用段開銷C.再生段通道開銷D.復用段通道開銷2.、同步數字體系SDH具有(A)幀結構。A.塊狀B.串行C.鏈形D.三維3、管理指針單元的作用是(A)。A、用來指示信息凈負荷的第一個…

php 正則提取url,php 正則表達式提取網頁超級鏈接url的函數

function match_links($document) {preg_match_all("]))[^>]*>?(.*?)isx",$document,$links);while(list($key,$val) each($links[2])) {if(!empty($val))$match[link][] $val;}while(list($key,$val) each($links[3])) {if(!empty($val))$match[link][] …

php array colum,php5.5新數組函數array_column使用

array_column 用于獲取二維數組中的元素(PHP 5 > 5.5.0)&#xff0c;但我們有時候需要在低版本中使用&#xff0c;那么就可以使用下面的代碼即可PHP5.5發布了&#xff0c;其中增加了一個新的數組函數array_column&#xff0c;感覺不錯的&#xff01;但是低版本PHP要使用&…

php 將字符串打亂,PHP內部實現打亂字符串順序函數str_shuffle的方法

前言2019年春節已過&#xff0c;今天是上班第一天&#xff0c;還得翻一翻之前沒有看完的PHP源碼。今天聊的是字符串順序打亂函數str_shuffle。這個函數本身使用頻率并不高。但是&#xff0c;其內部實現還是非常有趣的。str_shuffle() 函數隨機地打亂字符串中的所有字符。要注意…

php+js+return+true,js中return、return false、return true的區別

1.語法及返回方式①返回控制與函數結果語法為:return 表達式;語句結果函數的執行,返回調用函數,而且把表達式的值作為函數結果返回出去②返回控制無函數結果語法為:return;在大多數情況下,為事件處理函數如果讓其返回false,可以防止默認的事件行為.例如,默認情況下,點擊一個標簽…

php strlen遇0截斷,聊下php下的截斷問題

0x01 起因有天在群里說起上傳的%00截斷的一些問題&#xff0c;就想起之前自己在這個問題踩過坑&#xff0c;想起了自己曾經的flag說要寫文章&#xff0c;一直沒寫&#xff0c;現在來填坑了。0x02 經過源碼理解1234//test.phpinclude "1.txt\000.jpg";?>1234//1.t…

test.php.bak,記一次phpmyadmin 4.8.1 遠程文件包含漏洞(BUUCTF web)

題目很簡單&#xff0c;一個滑稽打開源碼&#xff0c;發現存在source.php文件于是訪問文件&#xff0c;發現出現一串php源碼提示存在hint.php&#xff0c;于是訪問發現一句話flag not here, and flag in ffffllllaaaagggg再回過頭來觀察source.php明顯是一道代碼審計的問題&…

php中files和FILRS,php獲取文件內容最后一行示例

php獲取文件內容最后一行示例復制代碼 代碼如下:$rs README.md;$fp fopen($rs, r);fseek($fp,-1,SEEK_END);$s ;while(($c fgetc($fp)) ! false){if($c "\n" && $s) break;$s $c . $s;fseek($fp, -2, SEEK_CUR);}fclose($fp);echo $s;exit;時間&#x…

php 實現貪吃蛇游戲,C++實現簡單貪吃蛇游戲

我大概在一個多月前把自己上學期寫的c代碼的貪吃蛇游戲push到csdn上&#xff0c;并且說c風格的貪吃蛇寫起來有些麻煩(貪吃蛇游戲的c語言實現)&#xff0c;準備用面向對象的c再寫一遍。現在我們專業恰好剛教完了c&#xff0c;學校也布置了一道簡單的貪吃蛇的編程題目&#xff0c…

java中的斜杠和反斜杠,老生常談java路徑中的反斜杠和斜杠的區別

JAVA中的斜杠有正斜杠與反斜杠之分&#xff0c;正斜杠&#xff0c;一般就叫做斜杠&#xff0c;符號為“/”&#xff1b;反斜杠的符號為“\”。斜杠(/)在JAVA中沒有什么特別的意義&#xff0c;就是代表一個字符‘/;反斜杠(\)則不然&#xff0c;它和緊跟著它的那個字符構成轉義字…

小程序 php cookie,微信小程序使用Cookie

微信小程序使用Cookie微信小程序不支持Cookie,因此,需要借助小程序的數據緩存來實現Cookie.環境: mpvue fly.js登錄成功后,在處理登錄驗證的method里,加入以下內容保存Cookie:wx.setStorageSync("sessionid",response.headers["set-cookie"][0])我對fly.j…

php對象好用嗎,在數據庫中使用對象的好處_php

我們都知道如何從mysql獲取我們需要的行(記錄)&#xff0c;讀取數據&#xff0c;然后存取一些改動。很明顯也很直接&#xff0c;在這個過程背后也沒有什么拐彎抹角的。然而對于我們使用面對對象的程序設計(OOP)來管理我們數據庫中的數據時&#xff0c;這個過程就需要大大改進一…

linux apache php顯示源碼,linux 源碼安裝apache PHP 問題

sudo ./configure --prefix/var/php --with-apxs2/usr/local/apache2/bin/apxsLoadModule php5_module modules/libphp5.soDirectoryIndex index.html index.html.var .phpa-bash-3.2$ pwd/usr/local/apache2/htdocs-bash-3.2$ cat info.phpphpinfo();?>打開info.ph…

mysql臨時表的的理解,如何理解存儲過程中已存在的mysql臨時表?

它在創建表時具有IF NOT EXISTS(13.1.17. CREATE TABLE Syntax)選項,在這種情況下可以使用.例&#xff1a;DELIMITER $$CREATE PROCEDURE temp_sp1()BEGINCREATE TEMPORARY TABLE IF NOT EXISTS temp_table (col2 int(11) DEFAULT NULL,col3 int(11) DEFAULT NULL);INSERT INTO…

python 發郵件 抄送,Python調用outlook發送郵件,發送給多人、抄送給多人并帶上附件...

我的報告目錄具體解釋在代碼中有詳細注釋import win32com.client as win32import datetime, osaddressee test01qq.com;test02jd.com#收件人郵箱列表cc test02163.com;test03alibaba.com#抄送人郵件列表mail_path os.path.join(rC:\Users\songlihui\PycharmProjects\test001…

php阻止輸入sql,在PHP中全面阻止SQL注入式攻擊之三

一、 建立一個安全抽象層我們并不建議你手工地把前面介紹的技術應用于每一個用戶輸入的實例中&#xff0c;而是強烈推薦你為此創建一個抽象層。一個簡單的抽象是把你的校驗方案加入到一個函數中&#xff0c;并且針對用戶輸入的每一項調用這個函數。當然&#xff0c;我們還可以創…

Oracle12081,【Oracle介質】Oracle 12C Linux x86-64 最新OPatch patch 6880880 12.2.0.1.7

天萃荷凈Linux x86-64 補丁程序6880880: OPatch patch of version 12.2.0.1.7 for Oracle software releases 12.1.0.x (installer) and 12.2.0.x (AUG 2016)上次更新時間 2016-8-26 上午1:48 (8 天前)產品 Oracle Global Lifecycle Management OPatc…

如何使用oracle ebs,Oracle EBS進行集成的實際操作步驟

我們今天主要向大家介紹的是如何使用WebService和Oracle EBS進行集成&#xff0c;以及在使用WebService和Oracle EBS進行集成時&#xff0c;所需要的一些項目的描述&#xff0c;以下的文章就是對相關內容的描述。架構系統從總體上分為兩部分&#xff0c;一部為企業的EBS及接口系…

linux nls_lang oracle,linux操作系統環境變量LANG和NLS_LANG的區別

例如&#xff1a;復制代碼代碼如下:export LANGzh_CN.GB2312export NLS_LANGAMERICAN_AMERICA.ZHS16GBK$export LANGzh_CN.GB2312$date2012年 11月 27日 星期二 16:20:35 CST顯示是中文界面。復制代碼代碼如下:$export NLS_LANGAMERICAN_AMERICA.ZHS16GBK$sqlplus / as sysdbaS…