原文地址: https://debezium.io/blog/2023/09/23/flink-spark-online-learning/
歡迎關注留言,我是收集整理小能手,工具翻譯,僅供參考,筆芯筆芯.
Online machine learning with the data streams from the database
September 23, 2023 by Vojtěch Juránek
machine-learning flink spark online-learning examples apache-kafka
在…中 先前的博客帖子 我們已經展示了如何利用Debezns利用數據庫中的現有數據來培訓神經網絡模型,并使用這個預先培訓的模型來分類新存儲到數據庫中的圖像。在這篇博文中,我們將進一步推進它–我們將使用Debezum從數據庫中創建多個數據流,并使用其中一個流進行持續學習和改進我們的模型,而第二個流用于對數據進行預測。當模型不斷改進或調整以適應最近的數據樣本時,這種方法稱為 在線機器學習.在線學習只適合于某些用例,而實現某一特定算法的在線變體可能具有挑戰性甚至是不可能的。然而,在可以在線學習的情況下,它成為一種非常強大的工具,因為它使人們能夠對數據的實時變化作出反應,避免重新培訓和部署新模型的需要,從而節省硬件和業務成本。隨著數據流變得越來越普遍,例如。隨著在線學習的到來,我們可以期待在線學習變得越來越受歡迎。在可能的情況下,它通常是分析流數據的最佳工具。
正如在前一個博客中提到的,我們這里的目標不是為給定的用例構建可能的最佳模型,而是研究我們如何能夠構建一個完整的管道,從將數據插入數據庫到模型并將其用于模型培訓和預測。為了保持簡單,我們將使用另一個通常在ML教程中使用的眾所周知的數據示例。我們將探討如何使用網上變異的虹膜花,對不同種類的虹膜花進行分類。 K-元聚類算法 .我們用的 阿帕奇電匯 和 阿帕奇火花處理數據流。這兩個框架都是非常流行的數據處理框架,包括一個機器學習庫,除其他外,它還實現在線k-均值算法。因此,我們可以專注于構建一個完整的管道,將數據從數據庫傳遞到給定的模型,實時處理它,而不必處理算法的實現細節。
本博客文章后面提到的所有代碼都可以作為一個德貝齊姆示例在 去貝茲示例存儲庫 ,所有其他有用的東西,如碼頭制造和逐步指令在 自述 文件。
數據集準備
我們會利用 網花數據集 .我們的目標是根據對虹膜花的幾種測量來確定虹膜的種類:它的花瓣長度,花瓣寬度,花瓣寬度。
圖片來自官網原文
花色的
石竹色,來源 維基百科
數據集可從各種來源下載。我們可以利用這一事實,即它已經被預先處理過了。 學習 工具包從那里開始使用。每個樣本行包含一個數據點(花被長度、花被寬度、花瓣長度和花瓣寬度)和一個標簽。標簽編號0,1,或2,其中0代表虹膜,1代表虹膜花色,2代表虹膜。數據集很小,只包含150個數據點。
當我們將數據加載到數據庫時,我們將首先準備SQL文件,然后將其傳遞到數據庫。我們需要把原始數據樣本分成三個子樣本–兩個用于培訓,一個用于測試。初步培訓將使用第一個培訓數據樣本。當我們第一次測試該模型時,這個數據樣本故意小到不能產生好的預測,這樣我們就可以看到,當我們向模型提供更多的數據時,該模型的預測會如何實時增加。
您可以使用所附演示存儲庫中的下列組腳本生成所有三個SQL文件。
$ ./iris2sql.py
…postgres 目錄包含了這個演示使用的文件。train1.sql 會自動加載到該數據庫的開始。test.sql 和train2.sql 稍后將手動加載到數據庫中。
與阿帕奇的分類鏈接
首先,讓我們來看看如何做網上的虹膜花分類和學習阿帕奇。下圖描述了整個管道的高層架構。
圖片來自官網原文
后調到FLK架構
我們將使用波斯特格雷斯作為我們的源數據庫。作為卡夫卡連接源連接器部署的Debezum跟蹤數據庫中的變化,并創建從新插入的數據發送到卡夫卡的數據流。卡夫卡將這些流發送到阿帕奇弗林克,后者采用流K平均算法進行模型擬合和數據分類。對測試數據流模型的預測是作為另一個流生成的,并發送給卡夫卡。
您也可以不使用卡夫卡就直接吸收數據庫更改到FINK中。VERVEREKA的實現的疾病預防控制中心的源連接器嵌入德貝佐姆直接進入FLINK。請參閱FLINC連接器 文件 更多的細節。
我們的數據庫里有兩張表。第一個存儲我們的訓練數據,第二個存儲測試數據。因此,有兩個數據流,每個數據流對應一個表–一個用于學習的數據流和一個需要分類的數據點。在實際應用程序中,您只能使用一個表,或者相反地,更多的表。您甚至可以部署更多的德貝齊亞連接器,從而合并來自多個數據庫的數據。
使用德貝茲和卡夫卡作為源數據流
阿帕奇·弗林克與卡夫卡有著極好的融合。我們可以通過德貝司記錄。JSON記錄。對于創建FLINK表,它甚至支持Debezum的記錄格式,但是對于流,我們需要提取部分Debezns消息,其中包含表中新存儲的行。不過,這很容易,因為德貝茲提供SMT, 提取新記錄狀態SMT ,正是它做到了這一點。完全的德貝茲配置可以是這樣的:
{
“name”: “iris-connector-flink”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “1”,
“database.hostname”: “postgres”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “postgres”,
“database.dbname” : “postgres”,
“topic.prefix”: “flink”,
“table.include.list”: “public.iris_.*”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”
}
}
這個配置捕捉了public 以表開頭的架構iris_ 前綴。因為我們將訓練和測試數據存儲在兩個表中,兩個卡夫卡主題被命名為flink.public.iris_train 和flink.public.iris_test 分別被創造出來。弗林克的DataStreamSource 表示傳入的數據流。當我們將記錄編碼為JSON時,它將是一個JSON流ObjectNode 東西。構造源流非常簡單:
KafkaSource train = KafkaSource.builder()
.setBootstrapServers(“kafka:9092”)
.setTopics(“flink.public.iris_train”)
.setClientIdPrefix(“train”)
.setGroupId(“dbz”)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(false)))
.build();
DataStreamSource trainStream = env.fromSource(train, WatermarkStrategy.noWatermarks(), “Debezium train”);
傳真業務主要由Table 抽象對象。另外,ML模型只接受表作為輸入,預測也是作為表生成的。因此,我們必須首先將輸入流轉換為Table 反對。我們將首先將輸入數據流轉換為表行流。我們需要定義一個映射函數來返回Row 具有包含一個數據點的向量的對象。因為K平均值算法屬于 無監督學習 算法,即模型不需要對應的"正確答案",我們可以跳過label 矢量的場:
private static class RecordMapper implements MapFunction<ObjectNode, Row> {
@Override
public Row map(ObjectNode node) {
JsonNode payload = node.get(“value”).get(“payload”);
StringBuffer sb = new StringBuffer();
return Row.of(Vectors.dense(
payload.get(“sepal_length”).asDouble(),
payload.get(“sepal_width”).asDouble(),
payload.get(“petal_length”).asDouble(),
payload.get(“petal_width”).asDouble()));
}
}
內部的FLINK管道的各個部分可以在不同的工作節點上運行,因此,我們也需要提供有關表的類型信息。因此,我們準備創建表對象:
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
TypeInformation<?>[] types = {DenseVectorTypeInfo.INSTANCE};
String names[] = {“features”};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream inputStream = trainStream.map(new RecordMapper()).returns(typeInfo);
Table trainTable = tEnv.fromDataStream(inputStream).as(“features”);
建筑物的K型均值
一旦我們有了Table 目標,我們可以把它傳給我們的模型。所以讓我們創建一個并通過一個火車流到它進行持續的模型訓練:
OnlineKMeans onlineKMeans = new OnlineKMeans()
.setFeaturesCol(“features”)
.setPredictionCol(“prediction”)
.setInitialModelData(tEnv.fromDataStream(env.fromElements(1).map(new IrisInitCentroids())))
.setK(3);
OnlineKMeansModel model = onlineKMeans.fit(trainTable);
為了使事情更簡單,我們直接將所需集群的數目設置為3個,而不是通過挖掘數據(例如使用數據)找到最佳集群的數目。 肘法 )。我們還為集群的中心設置了一些初始值,而不是使用隨機數(FLINT提供了一種方便的方法-KMeansModelData.generateRandomModelData() 如果你想嘗試隨機中心)。
為了獲得測試數據的預測,我們再次需要將測試流轉換為表。模型將帶有測試數據的表轉換為帶有預測的表。最后,將預測轉化為流并持續進行,例如。在卡夫卡語中:
DataStream testInputStream = testStream.map(new RecordMapper()).returns(typeInfo);
Table testTable = tEnv.fromDataStream(testInputStream).as(“features”);
Table outputTable = model.transform(testTable)[0];
DataStream resultStream = tEnv.toChangelogStream(outputTable);
resultStream.map(new ResultMapper()).sinkTo(kafkaSink);
現在,我們已經準備好構建我們的應用程序,并且幾乎準備好將它提交給FL鏈接以供執行。在這樣做之前,我們需要首先創建所需的卡夫卡主題。雖然主題可能是空的,但FLINK要求它們至少存在。當數據庫啟動時,我們將一個小的數據集包含在該數據庫中,當在卡夫卡連接中注冊該數據集連接器時,該數據將創建一個相應的主題。由于測試數據表還不存在,我們需要手動創建卡夫卡中的主題:
$ docker compose -f docker-compose-flink.yaml exec kafka /kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic flink.public.iris_test
現在,我們準備將我們的申請提交給弗林克。完整代碼,請參閱德貝齊姆的相應源代碼 示例存儲庫
如果您不使用作為本演示的源代碼的一部分提供的DOCer,請包括 方格語言庫 在閃光中lib 文件夾,因為ML庫不是默認的鏈接分發的一部分。
提供友好的用戶界面,可在 http://localhost:8081/ .在那里,你可以檢查,除其他事項外,你的工作狀況,也,例如。出色的圖形化執行計劃:
圖片來自官網原文
后調到FLK架構
評估模型
從用戶的角度來看,所有與我們的模型的交互都是通過在數據庫中插入新的記錄或者用預測來閱讀卡夫卡主題來實現的。當數據庫開始時,我們已經在數據庫中創建了一個非常小的初始訓練數據樣本,我們可以通過將測試數據樣本插入數據庫直接檢查模型預測:
$ psql -h localhost -U postgres -f postgres/iris_test.sql
插入結果產生了卡夫卡測試數據的即時數據流,將其傳遞到模型中,并將預測發送到iris_predictions 卡夫卡主題。當將模型訓練在只有兩個集群的非常小的數據集上時,預測是不準確的。以下是我們最初的預測:
[5.4, 3.7, 1.5, 0.2] is classified as 0
[4.8, 3.4, 1.6, 0.2] is classified as 0
[7.6, 3.0, 6.6, 2.1] is classified as 2
[6.4, 2.8, 5.6, 2.2] is classified as 2
[6.0, 2.7, 5.1, 1.6] is classified as 2
[5.4, 3.0, 4.5, 1.5] is classified as 2
[6.7, 3.1, 4.7, 1.5] is classified as 2
[5.5, 2.4, 3.8, 1.1] is classified as 2
[6.1, 2.8, 4.7, 1.2] is classified as 2
[4.3, 3.0, 1.1, 0.1] is classified as 0
[5.8, 2.7, 3.9, 1.2] is classified as 2
對我們來說,正確的答案應該是:
[5.4, 3.7, 1.5, 0.2] is 0
[4.8, 3.4, 1.6, 0.2] is 0
[7.6, 3.0, 6.6, 2.1] is 2
[6.4, 2.8, 5.6, 2.2] is 2
[6.0, 2.7, 5.1, 1.6] is 1
[5.4, 3.0, 4.5, 1.5] is 1
[6.7, 3.1, 4.7, 1.5] is 1
[5.5, 2.4, 3.8, 1.1] is 1
[6.1, 2.8, 4.7, 1.2] is 1
[4.3, 3.0, 1.1, 0.1] is 0
[5.8, 2.7, 3.9, 1.2] is 1
在比較結果時,我們只有11個數據點中的5個由于初始樣本訓練數據的大小而正確分類。另一方面,由于我們沒有從完全隨機集群開始,我們的預測也并非完全錯誤。
讓我們看看,當我們向模型提供更多的訓練數據時,情況會發生什么變化:
$ psql -h localhost -U postgres -f postgres/iris_train2.sql
為了看到更新的預測,我們將相同的測試數據樣本再次插入數據庫:
$ psql -h localhost -U postgres -f postgres/iris_test.sql
下面的預測是更好的,因為我們有所有三個類別。我們還正確地將11個數據點中的7個分類。
[5.4, 3.7, 1.5, 0.2] is classified as 0
[4.8, 3.4, 1.6, 0.2] is classified as 0
[7.6, 3.0, 6.6, 2.1] is classified as 2
[6.4, 2.8, 5.6, 2.2] is classified as 2
[6.0, 2.7, 5.1, 1.6] is classified as 2
[5.4, 3.0, 4.5, 1.5] is classified as 2
[6.7, 3.1, 4.7, 1.5] is classified as 2
[5.5, 2.4, 3.8, 1.1] is classified as 1
[6.1, 2.8, 4.7, 1.2] is classified as 2
[4.3, 3.0, 1.1, 0.1] is classified as 0
[5.8, 2.7, 3.9, 1.2] is classified as 1
由于整個數據樣本很小,為了進一步的模型訓練,我們可以重新使用第二列數據樣本:
$ psql -h localhost -U postgres -f postgres/iris_train2.sql
$ psql -h localhost -U postgres -f postgres/iris_test.sql
這將導致下列預測。
[5.4, 3.7, 1.5, 0.2] is classified as 0
[4.8, 3.4, 1.6, 0.2] is classified as 0
[7.6, 3.0, 6.6, 2.1] is classified as 2
[6.4, 2.8, 5.6, 2.2] is classified as 2
[6.0, 2.7, 5.1, 1.6] is classified as 2
[5.4, 3.0, 4.5, 1.5] is classified as 1
[6.7, 3.1, 4.7, 1.5] is classified as 2
[5.5, 2.4, 3.8, 1.1] is classified as 1
[6.1, 2.8, 4.7, 1.2] is classified as 1
[4.3, 3.0, 1.1, 0.1] is classified as 0
[5.8, 2.7, 3.9, 1.2] is classified as 1
我們現在發現11個數據點中有9個是正確分類的。雖然這仍然不是一個很好的結果,我們只期望部分準確的結果,因為這只是一個預測。這里的主要動機是展示整個管道,并證明該模型在添加新數據時無需重新訓練和重新部署該模型,即可改進預測。
與阿帕奇火花分類
從用戶的角度來看,阿帕奇火花公司非常類似于FLINK,實現也非常類似。本章更簡短,使這篇博文更容易理解。
火花有兩種流線型:老的 驅動器 ,目前仍處于傳統狀態,以及最近的建議 結構性流 .然而,由于SARSHML庫中所包含的流式K-均值算法僅用于D流,為了簡單起見,本示例中使用了D流。一個更好的方法是使用結構化的流媒體,并實現流媒體的k意味著我們自己。然而,這超出了這篇博文的范圍和主要目標。
火花支持流來自卡夫卡使用d流。然而,將DLULHUK寫回卡夫卡并沒有得到支持,盡管它是可能的,但并不是簡單的。
結構化流支持雙向,讀和寫卡夫卡,非常容易。
再次,為了簡單起見,我們跳過了最后的部分,只將預測寫到控制臺,而不是寫到卡夫卡。我們管道的總體情況如下:
圖片來自官網原文
后推以激發架構
定義數據流
同樣,從卡夫卡流中創建火花流也很簡單,大多數參數都是不言而喻的:
Set trainTopic = new HashSet<>(Arrays.asList(“spark.public.iris_train”));
Set testTopic = new HashSet<>(Arrays.asList(“spark.public.iris_test”));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “kafka:9092”);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, “dbz”);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JavaInputDStream<ConsumerRecord<String, String>> trainStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(trainTopic, kafkaParams));
JavaDStream train = trainStream.map(ConsumerRecord::value)
.map(SparkKafkaStreamingKmeans::toLabeledPointString)
.map(LabeledPoint::parse);
在最后一行中,我們將卡夫卡流轉換為標記的點流,這是星火XML庫用于其ML模型的工作。標記點被期望為字符串格式化為數據點標簽,用逗號與空間分隔的數據點值分開。所以地圖功能是這樣的:
private static String toLabeledPointString(String json) throws ParseException {
JSONParser jsonParser = new JSONParser();
JSONObject o = (JSONObject)jsonParser.parse(json);
return String.format("%s, %s %s %s %s",
o.get(“iris_class”),
o.get(“sepal_length”),
o.get(“sepal_width”),
o.get(“petal_length”),
o.get(“petal_width”));
}
它仍然使用k平均值是一個無監督的算法,不使用數據點標簽。不過,把它們傳給LabeledPoint 稍后的課程中,我們可以用模型預測來展示它們。
我們將一個映射函數鏈接起來,以解析字符串并從中創建一個標記數據點。在這種情況下,它是星火的內置功能LabeledPoint .
與弗林克的情況相反,"星火"并不要求卡夫卡主題事先存在,所以在部署模型時,我們不必創建主題。一旦創建了帶有測試數據的表并填充了數據,我們就可以讓Debezum創建它們。
定義和評價模型
對K-均值流模型的定義非常類似于FLINK:
StreamingKMeans model = new StreamingKMeans()
.setK(3)
.setInitialCenters(initCenters, weights);
model.trainOn(train.map(lp -> lp.getFeatures()));
此外,在這種情況下,我們直接將集群的數目設置為3個,并為集群提供相同的初始中心點。我們也只是通過了訓練的數據點,而不是標簽。
如前所述,我們可以使用標簽顯示它們和預測:
JavaPairDStream<Double, Vector> predict = test.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()));
model.predictOnValues(predict).print(11);
我們用預測將11個流元素打印到結果流上的控制臺,因為這是我們測試樣本的大小。像FLINK一樣,在非常小的數據樣本上進行初步訓練后的結果會更好。數據元組中的第一個數字是數據點標簽,而第二個數字是我們模型所做的相應預測:
spark_1 | (0.0,0)
spark_1 | (0.0,0)
spark_1 | (2.0,2)
spark_1 | (2.0,2)
spark_1 | (1.0,0)
spark_1 | (1.0,0)
spark_1 | (1.0,2)
spark_1 | (1.0,0)
spark_1 | (1.0,0)
spark_1 | (0.0,0)
spark_1 | (1.0,0)
然而,當我們提供更多的培訓數據時,預測會更好:
spark_1 | (0.0,0)
spark_1 | (0.0,0)
spark_1 | (2.0,2)
spark_1 | (2.0,2)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,2)
spark_1 | (1.0,0)
spark_1 | (1.0,1)
spark_1 | (0.0,0)
spark_1 | (1.0,0)
如果我們再次通過第二個訓練數據樣本進行訓練,我們的模型對整個試驗樣本作出正確的預測:
spark_1 | (0.0,0)
spark_1 | (0.0,0)
spark_1 | (2.0,2)
spark_1 | (2.0,2)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (1.0,1)
spark_1 | (0.0,0)
spark_1 | (1.0,1)
該預測是一個集群中的一些k平均值算法創建,并與我們的數據樣本中的標簽無關。這意味著。(0.0,1) 不會是個錯誤的預測。可能發生的情況是,標簽0的數據點被分配給了正確的集群,然而,SASS內部將其標記為集群編號1。在評估模型時需要牢記這一點。
因此,類似于FLINK,當我們通過了更多的培訓數據而無需重新培訓和重新部署模型時,我們得到了更好的結果。在這種情況下,我們得到的結果甚至比弗林克的模型更好。
結論
在這篇博文中,我們繼續探索Debezns如何幫助將數據吸收到各種ML框架中。我們已經展示了如何將數據從數據庫傳遞到阿帕奇弗林克和阿帕奇火花實時數據流。在這兩種情況下,集成都很容易建立,而且效果很好。我們在一個例子中演示了它,使我們能夠使用在線學習算法,即在線k-均值算法,以突出數據流的力量。在線機器學習使我們能夠對數據流進行實時預測,并在新的培訓數據到來時立即改進或調整模型。模型調整不需要在一個單獨的計算集群上進行任何模型重新訓練,也不需要重新部署一個新的模型,這使得ml-osps更加簡單,更具成本效益。
像往常一樣,我們將非常感謝這篇博文的反饋。你有沒有想過在這個領域如何改變數據捕獲的方法?什么將有助于調查,是否集成到另一個ML框架,集成到特定的ML特性存儲等。?如果你對此有任何意見,請立即聯系我們。 郁金香聊天 , 郵寄清單 或者你可以直接把你的想法變成 吉拉功能請求 .