flink sink kafka的事務提交現象猜想

現象?

查看flink源碼時 sink kafka有事務提交機制,查看源碼發現是使用兩階段提交策略,而事務提交是checkpoint完成后才執行,那么如果checkpoint設置間隔時間比較長時,事務未提交之前,后端應該消費不到數據,而觀察實際現象為寫入kafka的消費數據可以立馬消費。

測試用例

測試流程

  1. 編寫任務1,設置較長的checkpoint時間,并且指定?CheckpointingMode.EXACTLY_ONCE,輸出輸出到kafka。
  2. 編寫任務2消費任務的結果topic,打印控制臺,驗證結果。
  3. 根據現象查看源碼,分析原因。

測試用例

測試任務1

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000*60l, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");//        超時時間,checkpoint沒在時間內完成則丟棄env.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);//最小間隔時間(前一次結束時間,與下一次開始時間間隔)env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//        當 Flink 任務取消時,保留外部保存的 checkpoint 信息KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test001").setGroupId("my-group")
//                .setStartingOffsets(OffsetsInitializer()).setStartingOffsets(OffsetsInitializer.committedOffsets()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 從文件讀取數據
//        DataStream<SensorReading> dataStream = env.addSource( new SourceTest4.MySensorSource() );DataStream<String> map = kafkaSource.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s;}});Properties properties = new Properties();
// 根據上面的介紹自己計算這邊的超時時間,滿足條件即可properties.setProperty("transaction.timeout.ms","900000");
//        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("192.168.65.128:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("test002").setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();map.sinkTo(sink);// 打印輸出env.execute();

測試任務2

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000*150l, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
//        當 Flink 任務取消時,保留外部保存的 checkpoint 信息Properties properties1 = new Properties();
//        properties1.put("isolation.level","read_committed");KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test002").setGroupId("my-group2").setProperties(properties1).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSource.print(" test2接受數據");// 打印輸出env.execute();

測試結果分析

測試結果:

任務1開啟后,無論是否執行checkpoint,任務checkpoint都可以正常消費數據,與預期不符合。

原因排查

查看kafkaSink 的源碼,找到跟與兩階段提交相關的代碼,1.18源碼中TwoPhaseCommittingSink有重構。kafkasink實現TwoPhaseCommittingSink接口實現,創建Commiter和Writer。

@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;Committer<CommT> createCommitter() throws IOException;SimpleVersionedSerializer<CommT> getCommittableSerializer();@PublicEvolvingpublic interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {Collection<CommT> prepareCommit() throws IOException, InterruptedException;}
}--------------------------------------
public class KafkaSink<IN>implements StatefulSink<IN, KafkaWriterState>,TwoPhaseCommittingSink<IN, KafkaCommittable> {private final DeliveryGuarantee deliveryGuarantee;private final KafkaRecordSerializationSchema<IN> recordSerializer;private final Properties kafkaProducerConfig;private final String transactionalIdPrefix;KafkaSink(DeliveryGuarantee deliveryGuarantee,Properties kafkaProducerConfig,String transactionalIdPrefix,KafkaRecordSerializationSchema<IN> recordSerializer) {this.deliveryGuarantee = deliveryGuarantee;this.kafkaProducerConfig = kafkaProducerConfig;this.transactionalIdPrefix = transactionalIdPrefix;this.recordSerializer = recordSerializer;}/*** Create a {@link KafkaSinkBuilder} to construct a new {@link KafkaSink}.** @param <IN> type of incoming records* @return {@link KafkaSinkBuilder}*/public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}
-- 創建Committer@Internal@Overridepublic Committer<KafkaCommittable> createCommitter() throws IOException {return new KafkaCommitter(kafkaProducerConfig);}@Internal@Overridepublic SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {return new KafkaCommittableSerializer();}
-- 創建writer@Internal@Overridepublic KafkaWriter<IN> createWriter(InitContext context) throws IOException {return new KafkaWriter<IN>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,context.asSerializationSchemaInitializationContext(),Collections.emptyList());}@Internal@Overridepublic KafkaWriter<IN> restoreWriter(InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException {return new KafkaWriter<>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,context.asSerializationSchemaInitializationContext(),recoveredState);}@Internal@Overridepublic SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {return new KafkaWriterStateSerializer();}@VisibleForTestingprotected Properties getKafkaProducerConfig() {return kafkaProducerConfig;}
}
KafkaWriter和KafkaCommitter源碼,

在KafkaWriter中snapshotState方法中發現如果deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE的開啟事務的判斷邏輯。

class KafkaWriter<IN>implements StatefulSink.StatefulSinkWriter<IN, KafkaWriterState>,TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, KafkaCommittable> {
.... 省略代碼  @Overridepublic Collection<KafkaCommittable> prepareCommit() {if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {return Collections.emptyList();}// only return a KafkaCommittable if the current transaction has been written some dataif (currentProducer.hasRecordsInTransaction()) {final List<KafkaCommittable> committables =Collections.singletonList(KafkaCommittable.of(currentProducer, producerPool::add));LOG.debug("Committing {} committables.", committables);return committables;}// otherwise, we commit the empty transaction as is (no-op) and just recycle the producercurrentProducer.commitTransaction();producerPool.add(currentProducer);return Collections.emptyList();}@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
-- 開啟事務判斷        
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {currentProducer = getTransactionalProducer(checkpointId + 1);currentProducer.beginTransaction();}return Collections.singletonList(kafkaWriterState);}
。。。。。
}

?查看?KafkaCommitter的commit()方法發現producer.commitTransaction();操作

/*** Committer implementation for {@link KafkaSink}** <p>The committer is responsible to finalize the Kafka transactions by committing them.*/
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE ="because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n"+ "To avoid data loss, the application will restart.";private final Properties kafkaProducerConfig;@Nullable private FlinkKafkaInternalProducer<?, ?> recoveryProducer;KafkaCommitter(Properties kafkaProducerConfig) {this.kafkaProducerConfig = kafkaProducerConfig;}@Overridepublic void commit(Collection<CommitRequest<KafkaCommittable>> requests)throws IOException, InterruptedException {for (CommitRequest<KafkaCommittable> request : requests) {final KafkaCommittable committable = request.getCommittable();final String transactionalId = committable.getTransactionalId();LOG.debug("Committing Kafka transaction {}", transactionalId);Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =committable.getProducer();FlinkKafkaInternalProducer<?, ?> producer;try {producer =recyclable.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject).orElseGet(() -> getRecoveryProducer(committable));--- 事務提交producer.commitTransaction();producer.flush();recyclable.ifPresent(Recyclable::close);} catch (RetriableException e) {LOG.warn("Encountered retriable exception while committing {}.", transactionalId, e);request.retryLater();} catch (ProducerFencedException e) {......}}}
。。。。
}
分析結果

發現除了設置checkpoint還需要kafkasink單獨設置.才會實現輸出端的開啟事務,因此在任務1中添加設置setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

 KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("192.168.65.128:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("test002").setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

再次驗證任務任務2依然可以正常消費。這是有一點頭大,不明白為什么?想到既然開啟事務肯定有事務的隔離級別,查詢了kafka的事務隔離級別,有兩種,分別是讀已提交和讀未提交,默認消費事務是讀未提交。

?
kafka的事務隔離級別:
讀已提交(Read?committed):此隔離級別保證消費者只能讀取已經提交的消息。這意味著事務中的消息在提交之前對消費者是不可見的。使用此隔離級別可以避免消費者讀取到未提交的事務消息,確保消費者只讀取到已經持久化的消息。讀未提交(Read Uncommitted):此隔離級別允許消費者讀取未提交的消息。這意味著事務中的消息在提交之前就對消費者可見。使用此隔離級別可以實現更低的延遲,但可能會導致消費者讀取到未提交的事務消息。?

在任務2中添加isolation.level="read_committed",設定讀取消費事務級別為讀已提交,再次測試,發現任務1執行完checkpoint前任務2消費不到數據。而命令行可以及時消費任務1的輸出topic可可以消費到數據。結果與預期相同。

 Properties properties1 = new Properties();properties1.put("isolation.level","read_committed");KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test002").setGroupId("my-group2").setProperties(properties1)

注意事項

Kafka | Apache Flink

FlinkKafkaProducer?已被棄用并將在 Flink 1.15 中移除,請改用?KafkaSink

官網文檔信息

Kafka | Apache Flink

Kafka Consumer 提交 Offset 的行為配置?#

Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker 的行為。請注意:Flink Kafka Consumer 不依賴于提交的 offset 來實現容錯保證。提交的 offset 只是一種方法,用于公開 consumer 的進度以便進行監控。

配置 offset 提交行為的方法是否相同,取決于是否為 job 啟用了 checkpointing。

  • 禁用 Checkpointing:?如果禁用了 checkpointing,則 Flink Kafka Consumer 依賴于內部使用的 Kafka client 自動定期 offset 提交功能。 因此,要禁用或啟用 offset 的提交,只需將?enable.auto.commit?或者?auto.commit.interval.ms?的Key 值設置為提供的?Properties?配置中的適當值。

  • 啟用 Checkpointing:?如果啟用了 checkpointing,那么當 checkpointing 完成時,Flink Kafka Consumer 將提交的 offset 存儲在 checkpoint 狀態中。 這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態中的 offset 一致。 用戶可以通過調用 consumer 上的?setCommitOffsetsOnCheckpoints(boolean)?方法來禁用或啟用 offset 的提交(默認情況下,這個值是 true )。 注意,在這個場景中,Properties?中的自動定期 offset 提交設置會被完全忽略。

kafkasink支持語義保證

kafkaSink?總共支持三種不同的語義保證(DeliveryGuarantee)。對于?DeliveryGuarantee.AT_LEAST_ONCE?和?DeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必須啟用。默認情況下?KafkaSink?使用?DeliveryGuarantee.NONE。 以下是對不同語義保證的解釋:

  • DeliveryGuarantee.NONE?不提供任何保證:消息有可能會因 Kafka broker 的原因發生丟失或因 Flink 的故障發生重復。
  • DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 時會等待 Kafka 緩沖區中的數據全部被 Kafka producer 確認。消息不會因 Kafka broker 端發生的事件而丟失,但可能會在 Flink 重啟時重復,因為 Flink 會重新處理舊數據。
  • DeliveryGuarantee.EXACTLY_ONCE: 該模式下,Kafka sink 會將所有數據通過在 checkpoint 時提交的事務寫入。因此,如果 consumer 只讀取已提交的數據(參見 Kafka consumer 配置?isolation.level),在 Flink 發生重啟時不會發生數據重復。然而這會使數據在 checkpoint 完成時才會可見,因此請按需調整 checkpoint 的間隔。請確認事務 ID 的前綴(transactionIdPrefix)對不同的應用是唯一的,以保證不同作業的事務 不會互相影響!此外,強烈建議將 Kafka 的事務超時時間調整至遠大于 checkpoint 最大間隔 + 最大重啟時間,否則 Kafka 對未提交事務的過期處理會導致數據丟失。

推薦查看1.14版本和1.18版本結合起來看,在一些細節處理上有差異。

Kafka | Apache Flink

其他源碼簡介

如果查看1.18版本源碼不太好理解兩階段提交,可以查看1.14.5的源碼,發現FlinkKafkaProducer被標記廢除請改用?KafkaSink,并將在 Flink 1.15 中移除, 在1.14.5中TwoPhaseCommitSinkFunction為抽象類,有明確定開啟事務、預提交和提交的抽象方法,比較好理解。

?

?查看1.14.5版本的KafkaSink 的依賴,發現沒有直接使用TwoPhaseCommitSinkFunction,但是查看源碼可以看到使用了commiter和kafkawriter對象

?

public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> {   public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}
-- KafkaWriter 中會判斷是否需要開啟事務@Overridepublic SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(InitContext context, List<KafkaWriterState> states) throws IOException {final Supplier<MetricGroup> metricGroupSupplier =() -> context.metricGroup().addGroup("user");return new KafkaWriter<>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,new InitContextInitializationContextAdapter(context.getUserCodeClassLoader(), metricGroupSupplier),states);}-- 事務提交在kafkaCommitter@Overridepublic Optional<Committer<KafkaCommittable>> createCommitter() throws IOException {return Optional.of(new KafkaCommitter(kafkaProducerConfig));}@Overridepublic Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter()throws IOException {return Optional.empty();}...
}

KafkaWriter源碼

@Overridepublic List<KafkaCommittable> prepareCommit(boolean flush) {if (deliveryGuarantee != DeliveryGuarantee.NONE || flush) {currentProducer.flush();}if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {final List<KafkaCommittable> committables =Collections.singletonList(KafkaCommittable.of(currentProducer, producerPool::add));LOG.debug("Committing {} committables, final commit={}.", committables, flush);return committables;}return Collections.emptyList();}
-- 快照狀態開啟事務@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {currentProducer = getTransactionalProducer(checkpointId + 1);currentProducer.beginTransaction();}return ImmutableList.of(kafkaWriterState);}

1.14.5 版本TwoPhaseCommitSinkFunction是一個抽象類 在1.18 中是接口

/*** Flink Sink to produce data into a Kafka topic. By default producer will use {@link* FlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link* FlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.** @deprecated Please use {@link org.apache.flink.connector.kafka.sink.KafkaSink}.*/
@Deprecated
@PublicEvolving
public class FlinkKafkaProducer<IN>extends TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer.KafkaTransactionState,FlinkKafkaProducer.KafkaTransactionContext> {。。。}
-- 1.14 版本TwoPhaseCommitSinkFunction 為抽象類@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>implements CheckpointedFunction, CheckpointListener { }-- 1.18 版本
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;Committer<CommT> createCommitter() throws IOException;SimpleVersionedSerializer<CommT> getCommittableSerializer();@PublicEvolvingpublic interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {Collection<CommT> prepareCommit() throws IOException, InterruptedException;}
}

?

FlinkKafkaProducer繼承TwoPhaseCommitSinkFunction,會重寫其中的方法,查看重寫開啟事務的方法
  -- FlinkKafkaProducer 中重寫beginTransaction 方法@Overrideprotected FlinkKafkaProducer.KafkaTransactionState beginTransaction()throws FlinkKafkaException {switch (semantic) {case EXACTLY_ONCE:FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
-- 開啟kafka的procder的事務producer.beginTransaction();return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);case AT_LEAST_ONCE:case NONE:// Do not create new producer on each beginTransaction() if it is not necessaryfinal FlinkKafkaProducer.KafkaTransactionState currentTransaction =currentTransaction();if (currentTransaction != null && currentTransaction.producer != null) {return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);}return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));default:throw new UnsupportedOperationException("Not implemented semantic");}}

只有當FlinkKafkaProducer.Semantic 為EXACTLY_ONCE時才會開啟事務,查看其構造方法

 public FlinkKafkaProducer(String topicId,SerializationSchema<IN> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<IN> customPartitioner,FlinkKafkaProducer.Semantic semantic,int kafkaProducersPoolSize) {this(topicId,null,null,new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema),producerConfig,semantic,kafkaProducersPoolSize);}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62732.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62732.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62732.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

leetcode 3224. 使差值相等的最少數組改動次數

題目鏈接&#xff1a;3224. 使差值相等的最少數組改動次數 題目&#xff1a; 給你一個長度為 n 的整數數組 nums &#xff0c;n 是偶數 &#xff0c;同時給你一個整數 k 。 你可以對數組進行一些操作。每次操作中&#xff0c;你可以將數組中任一元素替換為 0 到 k 之間的任一…

Y3編輯器文檔4:觸發器1(對話、裝備、特效、行為樹、排行榜、不同步問題)

文章目錄 一、觸發器簡介1.1 觸發器界面1.2 ECA語句編輯及快捷鍵1.3 參數設置1.4 變量設置1.5 實體觸發器1.6 函數庫與觸發器復用 二、觸發器的多層結構2.1 子觸發器&#xff08;在游戲內對新的事件進行注冊&#xff09;2.2 觸發器變量作用域2.3 復合條件2.4 循環2.5 計時器2.6…

前端WebSocket應用——聊天實時通信的基本配置

使用 WebSocket 實現實時通信的 Vue 應用 前言1. WebSocketService 類 1.1 類屬性1.2 構造函數和連接初始化1.3 WebSocket 連接1.4 事件處理方法1.5 發送和關閉 WebSocket 消息1.6 狀態查詢與回調注冊1.7 完整代碼 2. 在 Vue 組件中使用 WebSocketService 2.1 定義 WebSocket …

【開源】A065—基于SpringBoot的庫存管理系統的設計與實現

&#x1f64a;作者簡介&#xff1a;在校研究生&#xff0c;擁有計算機專業的研究生開發團隊&#xff0c;分享技術代碼幫助學生學習&#xff0c;獨立完成自己的網站項目。 代碼可以查看項目鏈接獲取??&#xff0c;記得注明來意哦~&#x1f339; 贈送計算機畢業設計600個選題ex…

基于python實現自動化的驗證碼識別:探索與實踐

基于python實現自動化的驗證碼識別&#xff1a;探索與實踐 一、驗證碼的類型及特點&#xff08;一&#xff09;圖像驗證碼&#xff08;二&#xff09;短信驗證碼&#xff08;三&#xff09;語音驗證碼 二、驗證碼識別的方法*&#xff08;一&#xff09;傳統圖像處理方法&#x…

Vue vs. React:兩大前端框架的深度對比與分析(一)

前言 在當今快速發展的前端領域中&#xff0c;Vue和React作為兩個備受矚目的前端框架&#xff0c;已經成為許多開發者的首選。這兩個框架憑借其出色的設計和強大的功能&#xff0c;在構建現代化、高效性能的Web應用方面扮演著重要角色。 Vue和React都以其獨特的特點吸引了眾多開…

windows安裝使用conda

在Windows系統上安裝和使用Conda的詳細步驟如下&#xff1a; 一、下載Conda安裝包 訪問Conda的官方網站Anaconda | The Operating System for AI&#xff0c;點擊“Downloads”按鈕。在下載頁面&#xff0c;選擇適合您系統的安裝包。通常&#xff0c;對于Windows系統&#xf…

websocket 服務 pinia 全局配置

websocket 方法類 // stores/webSocketStore.ts import { defineStore } from "pinia";interface WebSocketStoreState {ws: WebSocket | null; // WebSocket 實例callbacks: ((message: string) > void)[]; // 消息回調函數列表connected: boolean; // 連接狀態…

Ariba Procurement: Administration_Cloud Basics

# SAP Ariba Procurement: Administration_Cloud Basics 認識Ariba Cloud SAP Ariba Procurement 是一個云計算平臺… The Ariba Cloud 平臺需要簡單理解的概念: Datacenter數據中心:SAP Ariba在世界各地有許多數據中心。這些數據中心構成了Ariba云的基本物理基礎設施。 …

vulnhub靶場【shenron】--1

前言 靶機&#xff1a;shenron-1 攻擊&#xff1a;kali 都采用虛擬機&#xff0c;網卡為橋接模式 主機發現 使用arp-scan -l或netdiscover -r 192.168.1.1/24掃描 信息收集 使用nmap掃描端口 網站信息探測 查看頁面&#xff0c;發現是apache2的默認界面&#xff0c;查看…

等保2.0數據庫測評之SQL server數據庫測評

一、SQL server數據庫介紹 SQL server美國Microsoft公司推出的一種關系型數據庫系統。SQL Server是一個可擴展的、高性能的、為分布式客戶機/服務器計算所設計的數據庫管理系統。 本次安裝環境為Windows10專業版操作系統&#xff0c;數據庫版本為Microsoft SQL Server 2019 (…

無人機之報警器的工作原理!

一、電量監測技術 電量監測是無人機電量指示和報警功能的基礎。通過實時監測無人機的電池電量&#xff0c;系統能夠準確判斷電池的剩余使用時間&#xff0c;并在電量不足時發出報警。電量監測技術通常包括以下幾個方面&#xff1a; 電壓檢測&#xff1a;無人機電池內部通常配…

【pyspark學習從入門到精通23】機器學習庫_6

目錄 分割連續變量 標準化連續變量 分類 分割連續變量 我們經常處理高度非線性的連續特征&#xff0c;而且只用一個系數很難擬合到我們的模型中。 在這種情況下&#xff0c;可能很難只通過一個系數來解釋這樣一個特征與目標之間的關系。有時&#xff0c;將值劃分到離散的桶中…

解密時序數據庫的未來:TDengine Open Day技術沙龍精彩回顧

在數字化時代&#xff0c;開源已成為推動技術創新和知識共享的核心力量&#xff0c;尤其在數據領域&#xff0c;開源技術的涌現不僅促進了行業的快速發展&#xff0c;也讓更多的開發者和技術愛好者得以參與其中。隨著物聯網、工業互聯網等技術的廣泛應用&#xff0c;時序數據庫…

QT 使用共享內存 實現進程間通訊

QSharedMemory&#xff1a;如果兩個進程運行在同一臺機器上&#xff0c;且對性能要求非常高&#xff08;如實時數據共享、圖像渲染等&#xff09;&#xff0c;建議使用共享內存。 優點&#xff1a; 高性能&#xff1a; 共享內存是進程間通信的最快方式之一&#xff0c;因為數…

在Scala中對隱式轉換格式與作用

隱式對象 格式&#xff1a;implicit object 作用&#xff1a;給函數的默認參數提供隱式值 object Scala12______10 { // case class DataBase(driver: String, url: String) // // implicit object mySql extends DataBase("mysql", "localhost:300") //…

go build command

文章目錄 1.簡介2.格式3.選項4.示例5.小結參考文獻 1.簡介 go build 是 Go 語言工具鏈中的一個命令&#xff0c;它用于編譯 Go 源代碼并生成可執行文件。 2.格式 go build [-o output] [build flags] [packages]可選的 -o 選項強制 build 將生成的可執行文件或對象寫入指定的…

OpenCV實驗:圖片加水印

第二篇&#xff1a;圖片添加水印&#xff08;加 logo&#xff09; 1. 實驗原理 水印原理&#xff1a; 圖片添加水印是圖像疊加的一種應用&#xff0c;分為透明水印和不透明水印。水印的實現通常依賴于像素值操作&#xff0c;將水印圖片融合到目標圖片中&#xff0c;常用的方法…

WinDbg 中使用 !process 命令

PROCESS 81a979d0 SessionId: 0 Cid: 0210 Peb: 7ffda000 ParentCid: 063cDirBase: 145b9000 ObjectTable: e12fed70 HandleCount: 53.Image: Dbgview.exe 1. PROCESS 81a979d0 意義&#xff1a;PROCESS 是該進程對象的內核地址。用途&#xff1a;可以使用這個地址獲…

深入解析下oracle的number底層存儲格式

oracle數據庫中&#xff0c;number數據類型用來存儲數值數據&#xff0c;它既可以存儲負數數值&#xff0c;也可以存儲正數數值。相對于其他類型數據&#xff0c;number格式的數據底層存儲格式要復雜得多。今天我們就詳細探究下oracle的number底層存儲格式。 一、環境搭建 1.…