【Spark源碼分析】事件總線機制分析

Spark事件總線機制

采用Spark2.11源碼,以下類或方法被@DeveloperApi注解額部分,可能出現不同版本不同實現的情況。

Spark中的事件總線用于接受事件并提交到對應的監聽器中。事件總線在Spark應用啟動時,會在SparkContext中激活spark運行的事件總線(LiveListenerBus)。

LiveListenerBus相關的部分類圖如下:

由于Spark使用scala語言編寫的,所以在類圖上的接口代表的是Traits類的接口功能。

繼承
實現
實現
聚合
聚合
繼承
繼承
繼承
實現
SparkContext
?interface?
SparkListenerEvent
?interface?
SparkListenerInterface
?interface?
SparkListenerBus
?interface?
ListenerBus
LiveListenerBus
AsyncEventQueue
AppStatusListener
ExecutorAllocationListener
?Abstract?
SparkListener
SparkListener相關事件
EventLoggingListener

主體邏輯

啟動應用的時候,在SparkConext中對LiveListenerBus進行實例化,除了內部的監聽器,還將注冊在 spark.extraListeners配置項中指定的監聽器,然后啟動監聽器總線。

LiveListenerBus中使用AsyncEventQueue作為核心,實現將事件異步的分發給已經注冊的SparkListener監聽器們。其中AsyncEventQueue有4類:

LiveListenerBusAsyncEventQueue分為4類,不同的事件分發給各自獨立的線程進行處理,防止在監聽器和事件較多的時候造成積壓問題。

  • eventLog:日志事件隊列
  • executorManagement:執行器管理隊列
  • appStatus:應用程序狀態隊列
  • shared:非內部監聽器共享的隊列

在AsyncEventQueue內部采用LinkedBlockingQueue來存儲事件,并啟動一個常住線程(dispatchThread)進行事件的轉發。

LiveListenerBus
AsyncEventQueue-eventLog
AsyncEventQueue-executorManagement
AsyncEventQueue-appStatus
AsyncEventQueue-shared
addToQueue
addToQueue
addToQueue
addToQueue
start
stop
eventQueue
event4-1
event4-2
listeners
listener4類
listener8類
dispatchThread
eventQueue
event3-1
event3-2
listeners
listener3類
listener7類
dispatchThread
eventQueue
event2-1
event2-2
listeners
listener2類
listener6類
dispatchThread
eventQueue
event1-1
event1-2
listeners
listener1類
listener5類
dispatchThread
events發生源1
listener1
events發生源2
listener2
events發生源3
listener3
events發生源4
listener4

代碼詳解

org.apache.spark.util.ListenerBus Traits類

scala中的Traits類,類似Java中的接口類。與接口相同的部分是可以定義抽象的方法和成員,不用的部分是可以包含具體的方法可以成員。

package org.apache.spark.utilimport java.util.concurrent.CopyOnWriteArrayListimport scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatalimport com.codahale.metrics.Timerimport org.apache.spark.internal.Logging/*** 事件總線的基類。用來轉發事件到對應的事件監聽器*/
// [ L<:AnyRef]指的是泛型,<:符號是泛型的上限。private[spark]代表作用域,只對spark目錄下可見
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {// (L, Option[Timer])采用的元組式集合private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]// Marked `private[spark]` for access in tests.private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJavaprotected def getTimer(listener: L): Option[Timer] = None/*** 添加監聽器來監聽事件。 該方法是線程安全的,可以在任何線程中調用。*/final def addListener(listener: L): Unit = {listenersPlusTimers.add((listener, getTimer(listener)))}/*** 移除監聽器,它將不會接收任何事件。 該方法是線程安全的,可以在任何線程中調用。*/final def removeListener(listener: L): Unit = {listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>listenersPlusTimers.remove(listenerAndTimer)}}/*** 如果刪除偵聽器時需要進行任何額外的清理,則可以由子類覆蓋它。 特別是AsyncEventQueue可以清理LiveListenerBus中的隊列。*/def removeListenerOnError(listener: L): Unit = {removeListener(listener)}/*** 將事件轉發給所有注冊的偵聽器。 `postToAll` 調用者應該保證在同一線程中為所有事件調用 `postToAll`。*/def postToAll(event: E): Unit = {val iter = listenersPlusTimers.iteratorwhile (iter.hasNext) {val listenerAndMaybeTimer = iter.next()val listener = listenerAndMaybeTimer._1val maybeTimer = listenerAndMaybeTimer._2val maybeTimerContext = if (maybeTimer.isDefined) {maybeTimer.get.time()} else {null}try {doPostEvent(listener, event)if (Thread.interrupted()) {throw new InterruptedException()}} catch {case ie: InterruptedException =>logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}.  " +s"Removing that listener.", ie)removeListenerOnError(listener)case NonFatal(e) =>logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)} finally {if (maybeTimerContext != null) {maybeTimerContext.stop()}}}}/*** 將事件發布到指定的偵聽器。 保證所有偵聽器在同一線程中調用“onPostEvent”。*/protected def doPostEvent(listener: L, event: E): Unitprivate[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {val c = implicitly[ClassTag[T]].runtimeClasslisteners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq}}

org.apache.spark.util.ListenerBus.SparkListenerBus

package org.apache.spark.schedulerimport org.apache.spark.util.ListenerBus/*** SparkListenerEvent事件總線繼承ListenerBus類,將SparkListenerEvent事件轉發到SparkListenerInterface中。* SparkListenerInterface是一個trait接口類,里面定義了一些關于spark應用運行周期中的一些事件監聽器。* SparkListenerEvent是定義了一個事件的通用接口類,其他關于Spark應用運行周期過程中的事件均以 case class實現這個接口*/
private[spark] trait SparkListenerBusextends ListenerBus[SparkListenerInterface, SparkListenerEvent] {// 監聽器處理對不同的事件采用不用的處理protected override def doPostEvent(listener: SparkListenerInterface,event: SparkListenerEvent): Unit = {event match {case stageSubmitted: SparkListenerStageSubmitted =>listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted =>listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart =>listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd =>listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart =>listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult =>listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd =>listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate =>listener.onEnvironmentUpdate(environmentUpdate)case blockManagerAdded: SparkListenerBlockManagerAdded =>listener.onBlockManagerAdded(blockManagerAdded)case blockManagerRemoved: SparkListenerBlockManagerRemoved =>listener.onBlockManagerRemoved(blockManagerRemoved)case unpersistRDD: SparkListenerUnpersistRDD =>listener.onUnpersistRDD(unpersistRDD)case applicationStart: SparkListenerApplicationStart =>listener.onApplicationStart(applicationStart)case applicationEnd: SparkListenerApplicationEnd =>listener.onApplicationEnd(applicationEnd)case metricsUpdate: SparkListenerExecutorMetricsUpdate =>listener.onExecutorMetricsUpdate(metricsUpdate)case executorAdded: SparkListenerExecutorAdded =>listener.onExecutorAdded(executorAdded)case executorRemoved: SparkListenerExecutorRemoved =>listener.onExecutorRemoved(executorRemoved)case executorBlacklisted: SparkListenerExecutorBlacklisted =>listener.onExecutorBlacklisted(executorBlacklisted)case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>listener.onExecutorUnblacklisted(executorUnblacklisted)case nodeBlacklisted: SparkListenerNodeBlacklisted =>listener.onNodeBlacklisted(nodeBlacklisted)case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>listener.onNodeUnblacklisted(nodeUnblacklisted)case blockUpdated: SparkListenerBlockUpdated =>listener.onBlockUpdated(blockUpdated)case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)case _ => listener.onOtherEvent(event)}}}

SparkListener實現了接口SparkListenerInterface,是它的默認實現類。主要對所有的事件回調做了無操作實現。

事件的存儲與轉發隊列

org.apache.spark.scheduler.AsyncEventQueue

package org.apache.spark.schedulerimport java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}import com.codahale.metrics.{Gauge, Timer}import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils/*** 事件的異步隊列。 發布到此隊列的所有事件都將傳遞到單獨線程中的子偵聽器。** 僅當調用 `start()` 方法時才會開始傳遞事件。 當不需要傳遞更多事件時,應該調用“stop()”方法。*/
private class AsyncEventQueue(val name: String,conf: SparkConf,metrics: LiveListenerBusMetrics,bus: LiveListenerBus)extends SparkListenerBuswith Logging {import AsyncEventQueue._// 維護了隊列前文所述的繼承自SparkListenerEvent的樣例類事件,默認長度10000。private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))// 代表未處理的事件個數,從eventQueue彈出的事件不保證處理結束了,所以采用一個單獨的變量對事件進行計數private val eventCount = new AtomicLong()/**丟棄事件的計數器。 */private val droppedEventsCounter = new AtomicLong(0L)/** 上次記錄“droppedEventsCounter”的時間(以毫秒為單位)。 */@volatile private var lastReportTimestamp = 0Lprivate val logDroppedEvent = new AtomicBoolean(false)private var sc: SparkContext = nullprivate val started = new AtomicBoolean(false)private val stopped = new AtomicBoolean(false)private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")// 首先刪除隊列大小計量器,以防它是由從偵聽器總線中刪除的該隊列的先前版本創建的。metrics.metricRegistry.remove(s"queue.$name.size")metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {override def getValue: Int = eventQueue.size()})// 事件轉發的常駐線程,不停的調用dispatch()進行事件轉發private val dispatchThread = new Thread(s"spark-listener-group-$name") {setDaemon(true)override def run(): Unit = Utils.tryOrStopSparkContext(sc) {dispatch()}}private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {var next: SparkListenerEvent = eventQueue.take()while (next != POISON_PILL) {val ctx = processingTime.time()try {// 通過事件總線將事件轉發到所有的注冊的監聽器中。super.postToAll(next)} finally {ctx.stop()}eventCount.decrementAndGet()next = eventQueue.take()}eventCount.decrementAndGet()}override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))}/*** 啟動一個dispatchThread線程將事件分派給監聽器。** @param sc Used to stop the SparkContext in case the async dispatcher fails.*/private[scheduler] def start(sc: SparkContext): Unit = {if (started.compareAndSet(false, true)) {this.sc = scdispatchThread.start()} else {throw new IllegalStateException(s"$name already started!")}}/*** 停止監聽器總線。 它將等待,直到處理完排隊的事件,但新事件將被丟棄。* 插入POISON_PILL,dispatchThread線程讀取到POISON_PIL時就會停止事件的分發*/private[scheduler] def stop(): Unit = {if (!started.get()) {throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")}if (stopped.compareAndSet(false, true)) {eventCount.incrementAndGet()eventQueue.put(POISON_PILL)}if (Thread.currentThread() != dispatchThread) {dispatchThread.join()}}// 向隊列中添加事件,如果隊列滿了,丟棄當前事件并記錄日志。這是個生產者消費者模型,當隊列滿時生產者丟棄事件,但隊列為空時消費者等待生產者。def post(event: SparkListenerEvent): Unit = {if (stopped.get()) {return}eventCount.incrementAndGet()if (eventQueue.offer(event)) {return}// 向eventQueue添加事件失敗后的邏輯eventCount.decrementAndGet()droppedEvents.inc()droppedEventsCounter.incrementAndGet()if (logDroppedEvent.compareAndSet(false, true)) {logError(s"Dropping event from queue $name. " +"This likely means one of the listeners is too slow and cannot keep up with " +"the rate at which tasks are being started by the scheduler.")}logTrace(s"Dropping event $event")val droppedCount = droppedEventsCounter.getif (droppedCount > 0) {// 為了控制日志的輸出頻率。采用1分鐘輸出一次。if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {val prevLastReportTimestamp = lastReportTimestamplastReportTimestamp = System.currentTimeMillis()val previous = new java.util.Date(prevLastReportTimestamp)logWarning(s"Dropped $droppedCount events from $name since $previous.")}}}}/*** For testing only. Wait until there are no more events in the queue.*/def waitUntilEmpty(deadline: Long): Boolean = {while (eventCount.get() != 0) {if (System.currentTimeMillis > deadline) {return false}Thread.sleep(10)}true}override def removeListenerOnError(listener: SparkListenerInterface): Unit = {bus.removeListener(listener)}}private object AsyncEventQueue {val POISON_PILL = new SparkListenerEvent() { }}

spark運行事件總線

org.apache.spark.scheduler.LiveListenerBus

package org.apache.spark.schedulerimport java.util.{List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.util.DynamicVariableimport com.codahale.metrics.{Counter, MetricRegistry, Timer}import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source/*** SparkListenerEvent事件管理器* 將 SparkListenerEvents 異步傳遞給已注冊的 SparkListener。** 在調用`start()`之前,所有發布的事件都只會被緩沖。 只有在此偵聽器總線啟動后,事件才會實際傳播到所有連接的偵聽器。 當調用 stop() 時,該監聽器總線將停止,停止后它將丟棄更多事件。*/
private[spark] class LiveListenerBus(conf: SparkConf) {import LiveListenerBus._private var sparkContext: SparkContext = _private[spark] val metrics = new LiveListenerBusMetrics(conf)// 表示是否調用了`start()`方法==>總線已啟動private val started = new AtomicBoolean(false)// 表示是否調用了`stop()`方法==>總線已啟動private val stopped = new AtomicBoolean(false)/** 事件放棄計數器 */private val droppedEventsCounter = new AtomicLong(0L)/** 上次記錄“droppedEventsCounter”的時間(以毫秒為單位)。 */@volatile private var lastReportTimestamp = 0Lprivate val queues = new CopyOnWriteArrayList[AsyncEventQueue]()// Visible for testing.@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()/**將偵聽器添加到所有非內部偵聽器共享的隊列中。 */def addToSharedQueue(listener: SparkListenerInterface): Unit = {addToQueue(listener, SHARED_QUEUE)}/** 將監聽器添加到執行器管理隊列中。 */def addToManagementQueue(listener: SparkListenerInterface): Unit = {addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)}/** 將偵聽器添加到應用程序狀態隊列。*/def addToStatusQueue(listener: SparkListenerInterface): Unit = {addToQueue(listener, APP_STATUS_QUEUE)}/** 將監聽器添加到事件日志隊列. */def addToEventLogQueue(listener: SparkListenerInterface): Unit = {addToQueue(listener, EVENT_LOG_QUEUE)}/*** 將偵聽器添加到特定隊列,并根據需要創建新隊列。 * 隊列彼此獨立(每個隊列使用單獨的線程來傳遞事件),允許較慢的偵聽器在一定程度上與其他偵聽器隔離。*/private[spark] def addToQueue(listener: SparkListenerInterface,queue: String): Unit = synchronized {if (stopped.get()) {throw new IllegalStateException("LiveListenerBus is stopped.")}// 先尋找隊列是否存在,如果存在就注冊,不存在就創建新隊列并注冊queues.asScala.find(_.name == queue) match {case Some(queue) =>queue.addListener(listener)case None =>val newQueue = new AsyncEventQueue(queue, conf, metrics, this)newQueue.addListener(listener)if (started.get()) {newQueue.start(sparkContext)}queues.add(newQueue)}}def removeListener(listener: SparkListenerInterface): Unit = synchronized {// 從添加到的所有隊列中刪除偵聽器,并停止已變空的隊列。queues.asScala.filter { queue =>queue.removeListener(listener)queue.listeners.isEmpty()}.foreach { toRemove =>if (started.get() && !stopped.get()) {toRemove.stop()}queues.remove(toRemove)}}/** 將事件轉發到所有的隊列中 */def post(event: SparkListenerEvent): Unit = {if (stopped.get()) {return}metrics.numEventsPosted.inc()// 如果事件緩沖區為空,則意味著總線已啟動,我們可以避免同步并將事件直接發布到隊列中。 這應該是事件總線生命周期中最常見的情況。if (queuedEvents == null) {postToQueues(event)return}// 否則,需要同步檢查總線是否啟動,以確保調用 start() 的線程拾取新事件。synchronized {if (!started.get()) {queuedEvents += eventreturn}}// 如果進行上述檢查時總線已經啟動,則直接發送到隊列。postToQueues(event)}// 遍歷所有隊列進行事件分發private def postToQueues(event: SparkListenerEvent): Unit = {val it = queues.iterator()while (it.hasNext()) {it.next().post(event)}}/*** 啟動每個隊列,并發送queuedEvents中緩存的事件。每個隊列就開始消費之前post的事件并調用postToAll()方法將事件發送給監視器。** 這首先發送在此偵聽器總線啟動之前發布的所有緩沖事件,然后在偵聽器總線仍在運行時異步偵聽任何其他事件。* 這應該只被調用一次。** @param sc Used to stop the SparkContext in case the listener thread dies.*/def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {if (!started.compareAndSet(false, true)) {throw new IllegalStateException("LiveListenerBus already started.")}this.sparkContext = scqueues.asScala.foreach { q =>q.start(sc)queuedEvents.foreach(q.post)}queuedEvents = nullmetricsSystem.registerSource(metrics)}/*** Exposed for testing.*/@throws(classOf[TimeoutException])def waitUntilEmpty(timeoutMillis: Long): Unit = {val deadline = System.currentTimeMillis + timeoutMillisqueues.asScala.foreach { queue =>if (!queue.waitUntilEmpty(deadline)) {throw new TimeoutException(s"The event queue is not empty after $timeoutMillis ms.")}}}/*** 停止監聽器總線。 它將等待,直到處理完排隊的事件,但在停止后刪除新事件。*/def stop(): Unit = {if (!started.get()) {throw new IllegalStateException(s"Attempted to stop bus that has not yet started!")}if (!stopped.compareAndSet(false, true)) {return}synchronized {queues.asScala.foreach(_.stop())queues.clear()}}// For testing only.private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = {queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }}// For testing only.private[spark] def listeners: JList[SparkListenerInterface] = {queues.asScala.flatMap(_.listeners.asScala).asJava}// For testing only.private[scheduler] def activeQueues(): Set[String] = {queues.asScala.map(_.name).toSet}}private[spark] object LiveListenerBus {// Allows for Context to check whether stop() call is made within listener threadval withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)private[scheduler] val SHARED_QUEUE = "shared"private[scheduler] val APP_STATUS_QUEUE = "appStatus"private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"private[scheduler] val EVENT_LOG_QUEUE = "eventLog"
}private[spark] class LiveListenerBusMetrics(conf: SparkConf)extends Source with Logging {override val sourceName: String = "LiveListenerBus"override val metricRegistry: MetricRegistry = new MetricRegistryval numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted"))// Guarded by synchronization.private val perListenerClassTimers = mutable.Map[String, Timer]()def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {synchronized {val className = cls.getNameval maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)perListenerClassTimers.get(className).orElse {if (perListenerClassTimers.size == maxTimed) {logError(s"Not measuring processing time for listener class $className because a " +s"maximum of $maxTimed listener classes are already timed.")None} else {perListenerClassTimers(className) =metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className))perListenerClassTimers.get(className)}}}}}

Spark任務啟動時,會在SparkContext中啟動spark運行的事件總線(LiveListenerBus)

  private def setupAndStartListenerBus(): Unit = {try {conf.get(EXTRA_LISTENERS).foreach { classNames =>val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)listeners.foreach { listener =>listenerBus.addToSharedQueue(listener)logInfo(s"Registered listener ${listener.getClass().getName()}")}}} catch {case e: Exception =>try {stop()} finally {throw new SparkException(s"Exception when registering SparkListener", e)}}// 啟動應用的運行事件總線listenerBus.start(this, _env.metricsSystem)_listenerBusStarted = true}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/165145.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/165145.shtml
英文地址,請注明出處:http://en.pswp.cn/news/165145.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

什么是持續集成的自動化測試?

持續集成的自動化測試 如今互聯網軟件的開發、測試和發布&#xff0c;已經形成了一套非常標準的流程&#xff0c;最重要的組成部分就是持續集成&#xff08;Continuous integration&#xff0c;簡稱CI&#xff0c;目前主要的持續集成系統是Jenkins&#xff09;。 那么什么是持…

docker 安裝常用環境

一、 安裝linux&#xff08;完整&#xff09; 目前為止docker hub 還是被封著&#xff0c;用阿里云、騰訊云鏡像找一找版本直接查就行 默認使用latest最新版 #:latest 可以不寫 docker pull centos:latest # 拉取后查看 images docker images #給鏡像設置標簽 # docker tag […

FIB表與快速轉發表工作原理

在一張路由表中&#xff0c;當存在多個路由項可同時匹配目的IP地址時&#xff0c;路由查找進程會選擇掩碼最長的路由項用于轉發&#xff0c;即最長匹配原則。因為掩碼越長&#xff0c;所處的網段范圍就越小&#xff0c;網段的范圍越小&#xff0c;就越能快速的定位到PC機的具體…

【分布式】小白看Ring算法 - 03

相關系列 【分布式】NCCL部署與測試 - 01 【分布式】入門級NCCL多機并行實踐 - 02 【分布式】小白看Ring算法 - 03 【分布式】大模型分布式訓練入門與實踐 - 04 概述 NCCL&#xff08;NVIDIA Collective Communications Library&#xff09;是由NVIDIA開發的一種用于多GPU間…

通過 python 腳本遷移 Redis 數據

背景 需求&#xff1a;需要將的 Redis 數據遷移由云廠商 A 遷移至云廠商 B問題&#xff1a;云版本的 Redis 版本不支持 SYNC、MIGRATE、BGSAVE 等命令&#xff0c;使得許多工具用不了&#xff08;如 redis-port&#xff09; 思路 &#xff08;1&#xff09;從 Redis A 獲取所…

GoLand 2023.2.5(GO語言集成開發工具環境)

GoLand是一款專門為Go語言開發者打造的集成開發環境&#xff08;IDE&#xff09;。它能夠提供一系列功能&#xff0c;如代碼自動完成、語法高亮、代碼格式化、代碼重構、代碼調試等等&#xff0c;使編寫代碼更加高效和舒適。 GoLand的特點包括&#xff1a; 1. 智能代碼補全&a…

json 去除特殊字符換行等符號

由于字符串中有出現了 換行符&#xff0c;導致轉json失敗&#xff0c;報錯&#xff1a;json parse error。 一般來講&#xff0c;直接用string的replace方法就可以了 String str "{\"adrdet\":\"阿歌嘎\n嘎、\",\"date\":\"2023/06/…

Ubuntu安裝CUDA驅動

Ubuntu安裝CUDA驅動 前言官網安裝確認安裝版本安裝CUDA Toolkit 前言 CUDA驅動一般指CUDA Toolkit&#xff0c;可通過Nvidia官網下載安裝。本文介紹安裝方法。 官網 CUDA Toolkit 最新版&#xff1a;CUDA Toolkit Downloads | NVIDIA Developer CUDA Toolkit 最新版文檔&…

NX二次開發UF_CAM_update_list_object_customization 函數介紹

文章作者&#xff1a;里海 來源網站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CAM_update_list_object_customization Defined in: uf_cam.h int UF_CAM_update_list_object_customization(tag_t * object_tags ) overview 概述 This function provids the…

UDP客戶端使用connect與UDP服務器使用send函數和recv函數收發數據

服務器代碼編譯運行 服務器udpconnectToServer.c的代碼如下&#xff1a; #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<arpa/inet.h> #include<sys/socket.h> #include<errno.h> #inclu…

Okhttp 淺析

安全的連接 OkHttpClient: OkHttpClient: 1.線程調度 2.連接池,有則復用,沒有就創建 3.interceptor 4.interceptor 5.監聽工廠 6.是否失敗重試 7.自動修正訪問,如果沒有權限或認證 8是否重定向 followRedirects 9.協議切換時候是否繼續重定向 10.Cookie jar 容器 默認…

Python 的 socket 模塊套接字編程(簡單入門級別)

Python 的 socket 模塊提供了對套接字編程的支持&#xff0c;允許你在網絡上進行數據傳輸。套接字是一個抽象的概念&#xff0c;它允許程序在網絡中的不同節點之間進行通信。 下面是 socket 模塊中一些常用的函數和類&#xff1a; 1. 創建套接字&#xff1a; socket.socket(…

pycharm 創建的django目錄和命令行創建的django再使用pycharm打開的目錄對比截圖 及相關

pytcharm創建django的項目 命令行創建的django 命令行創建項目時 不帶路徑時 (.venv) D:\gbCode>django-admin startproject gbCode 命令行創建項目時 帶路徑時 -- 所以如果有目錄就指定路徑好 (.venv) D:\gbCode>django-admin startproject gbCode d:\gbCode\

洛谷P1219 [USACO1.5] 八皇后【n皇后問題】【深搜+回溯 經典題】【附O(1)方法】

P1219 [USACO1.5] 八皇后 Checker Challenge 前言題目題目描述輸入格式輸出格式樣例 #1樣例輸入 #1樣例輸出 #1 提示題目分析注意事項 代碼深搜回溯打表 后話額外測試用例樣例輸入 #2樣例輸出 #2 王婆賣瓜 題目來源 前言 也是說到做到&#xff0c;來做搜索的題&#xff08;雖…

微機原理_2

一、單項選擇題(本大題共15小題,每小題3分,共45分。在每小題給出的四個備選項中,選出一個正確的答案&#xff0c;請將選定的答案填涂在答題紙的相應位置上。&#xff09; 下列數中最大的數為&#xff08;&#xff09; A. 10010101B B. (126)8 C. 96H D. 100 CPU 執行 OUT 60H,…

Android 9.0 隱藏設置顯示中自動調節亮度

Android 9.0 隱藏設置顯示中自動調節亮度 最近收到郵件需求提到想要隱藏設置顯示中的自動調節亮度&#xff0c;具體修改參照如下&#xff1a; /vendor/mediatek/proprietary/packages/apps/MtkSettings/res/xml/display_settings.xml - <Preference<!--Preferencea…

西門子(Siemens)仿真PLC啟動報錯處理

目錄 一、背景&#xff1a; 二、卸載軟件 三、安裝軟件 三、啟動軟件 四、下載PORTAL項目 五、測試 一、背景&#xff1a; 在啟動S7-PLCSIM Advanced V3.0仿真PLC時報錯&#xff0c;報錯信息為&#xff1a;>>Siemens PLCSIM Virtual Switch<<is misconfigu…

Ubuntu 23.10 服務器版本 ifconfig 查不到網卡 ip(已解決)

文章目錄 1、問題描述2、 解決方案 1、問題描述 服務器&#xff1a;ubuntu 23.10 經常會遇到虛擬機添加僅主機網卡后&#xff0c;通過 ifconfig 無法獲取其網卡 ip 2、 解決方案 修改網卡配置文件&#xff1a; # 進入網卡配置文件目錄 cd /etc/netplan # 備份原始文件 cp …

ArgoWorkflow教程(一)---DevOps 另一選擇?云原生 CICD: ArgoWorkflow 初體驗

來自&#xff1a;探索云原生 https://www.lixueduan.com 原文&#xff1a;https://www.lixueduan.com/posts/devops/argo-workflow/01-deploy-argo-workflows/ 本文主要記錄了如何在 k8s 上快速部署云原生的工作流引擎 ArgoWorkflow。 ArgoWorkflow 是什么 Argo Workflows 是…

網絡安全如何自學?

1.網絡安全是什么 網絡安全可以基于攻擊和防御視角來分類&#xff0c;我們經常聽到的 “紅隊”、“滲透測試” 等就是研究攻擊技術&#xff0c;而“藍隊”、“安全運營”、“安全運維”則研究防御技術。 2.網絡安全市場 一、是市場需求量高&#xff1b; 二、則是發展相對成熟…