CSI-external-provisioner

main()

這段Go代碼是一個CSI(容器存儲接口)Provisioner(供應器)的實現,用于在Kubernetes集群中動態提供持久卷。代碼涉及多個組件和步驟,下面是對關鍵部分的解釋:

  1. 初始化和配置
  • 命令行標志和環境變量:代碼使用flag包處理命令行參數,如feature-gateskubeconfig等。同時,從環境變量中獲取配置,如NODE_NAME
  • 日志和度量:使用klog進行日志記錄,并配置Prometheus度量收集器。
  1. Kubernetes客戶端配置
  • 構建KubeConfig:根據是否提供了masterkubeconfig參數,決定是使用集群內配置(rest.InClusterConfig)還是外部配置(clientcmd.BuildConfigFromFlags)。
  • 客戶端創建:使用配置創建Kubernetes客戶端(kubernetes.NewForConfig)和CSI快照客戶端(snapclientset.NewForConfig)。
  1. CSI驅動連接和驗證
  • 連接到CSI驅動:通過gRPC連接到CSI驅動,并進行基本的探測(ctrl.Probe)以確保驅動可用。
  • 獲取驅動名稱和能力:從CSI驅動獲取驅動名稱(ctrl.GetDriverName)和能力(ctrl.GetDriverCapabilities)。
  1. 拓撲和節點信息
  • 拓撲支持:如果CSI驅動支持拓撲,則創建相應的informer來監視節點和CSINode對象。
  • 節點部署:如果啟用了節點部署(--enable-node-deployment),則獲取節點信息并配置節點部署對象。
  1. Provisioner和Controller創建
  • Provisioner創建:使用獲取的配置和客戶端創建CSI Provisioner對象,該對象實現了Provisioner接口。
  • 容量控制器:如果啟用了容量功能(--enable-capacity),則創建容量控制器來發布存儲容量信息。
  1. HTTP服務器和度量
  • HTTP服務器:如果指定了度量地址(--metrics-address)或HTTP端點(--http-endpoint),則啟動HTTP服務器來暴露度量和可能的調試端點(如pprof)。
  1. Informers和隊列
  • Informer和隊列:創建各種資源的Informer來監視Kubernetes對象的變化,并使用工作隊列處理事件。
  1. 運行
  • 啟動Informer和控制器:啟動Informer工廠和控制器,開始監視和處理事件。

總結
這段代碼是一個復雜的CSI Provisioner實現,它集成了Kubernetes客戶端、CSI驅動、度量收集、拓撲感知、容量管理等多個組件。通過精心設計的架構和模塊化的代碼,它能夠在Kubernetes集群中高效地提供和管理持久卷。

func main() {var config *rest.Configvar err errorflag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))klog.InitFlags(nil)flag.CommandLine.AddGoFlagSet(goflag.CommandLine)flag.Set("logtostderr", "true")flag.Parse()ctx := context.Background()if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {klog.Fatal(err)}node := os.Getenv("NODE_NAME")if *enableNodeDeployment && node == "" {klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.")}if *showVersion {fmt.Println(os.Args[0], version)os.Exit(0)}klog.Infof("Version: %s", version)if *metricsAddress != "" && *httpEndpoint != "" {klog.Error("only one of `--metrics-address` and `--http-endpoint` can be set.")os.Exit(1)}addr := *metricsAddressif addr == "" {addr = *httpEndpoint}// get the KUBECONFIG from env if specified (useful for local/debug cluster)kubeconfigEnv := os.Getenv("KUBECONFIG")if kubeconfigEnv != "" {klog.Infof("Found KUBECONFIG environment variable set, using that..")kubeconfig = &kubeconfigEnv}if *master != "" || *kubeconfig != "" {klog.Infof("Either master or kubeconfig specified. building kube config from that..")config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)} else {klog.Infof("Building kube configs for running in cluster...")config, err = rest.InClusterConfig()}if err != nil {klog.Fatalf("Failed to create config: %v", err)}config.QPS = *kubeAPIQPSconfig.Burst = *kubeAPIBurstclientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create client: %v", err)}// snapclientset.NewForConfig creates a new Clientset for  VolumesnapshotV1ClientsnapClient, err := snapclientset.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create snapshot client: %v", err)}var gatewayClient gatewayclientset.Interfaceif utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {// gatewayclientset.NewForConfig creates a new Clientset for GatewayClientgatewayClient, err = gatewayclientset.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create gateway client: %v", err)}}metricsManager := metrics.NewCSIMetricsManagerWithOptions("", /* driverName */// Will be provided via default gatherer.metrics.WithProcessStartTime(false),metrics.WithSubsystem(metrics.SubsystemSidecar),)grpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)if err != nil {klog.Error(err.Error())os.Exit(1)}err = ctrl.Probe(ctx, grpcClient, *operationTimeout)if err != nil {klog.Error(err.Error())os.Exit(1)}// Autodetect provisioner nameprovisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)if err != nil {klog.Fatalf("Error getting CSI driver name: %s", err)}klog.V(2).Infof("Detected CSI driver %s", provisionerName)metricsManager.SetDriverName(provisionerName)translator := csitrans.New()supportsMigrationFromInTreePluginName := ""if translator.IsMigratedCSIDriverByName(provisionerName) {supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)if err != nil {klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)}klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)// Create a new connection with the metrics manager with migrated labelmetricsManager = metrics.NewCSIMetricsManagerWithOptions(provisionerName,// Will be provided via default gatherer.metrics.WithProcessStartTime(false),metrics.WithMigration())migratedGrpcClient, err := ctrl.Connect(ctx, *csiEndpoint, metricsManager)if err != nil {klog.Error(err.Error())os.Exit(1)}grpcClient.Close()grpcClient = migratedGrpcClienterr = ctrl.Probe(ctx, grpcClient, *operationTimeout)if err != nil {klog.Error(err.Error())os.Exit(1)}}// Prepare http endpoint for metrics + leader election healthzmux := http.NewServeMux()gatherers := prometheus.Gatherers{// For workqueue and leader election metrics, set up via the anonymous imports of:// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go// https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/leaderelection/metrics.go//// Also to happens to include Go runtime and process metrics:// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.goL46-L49legacyregistry.DefaultGatherer,// For CSI operations.metricsManager.GetRegistry(),}pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)if err != nil {klog.Fatalf("Error getting CSI driver capabilities: %s", err)}// Generate a unique ID for this provisionertimeStamp := time.Now().UnixNano() / int64(time.Millisecond)identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerNameif *enableNodeDeployment {identity = identity + "-" + node}factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity// -------------------------------// Listers// Create informer to prevent hit the API server for all resource requestscLister := factory.Storage().V1().StorageClasses().Lister()claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()var vaLister storagelistersv1.VolumeAttachmentListerif controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] {klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments")vaLister = factory.Storage().V1().VolumeAttachments().Lister()} else {klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments")}var nodeDeployment *ctrl.NodeDeploymentif *enableNodeDeployment {nodeDeployment = &ctrl.NodeDeployment{NodeName:         node,ClaimInformer:    factory.Core().V1().PersistentVolumeClaims(),ImmediateBinding: *nodeDeploymentImmediateBinding,BaseDelay:        *nodeDeploymentBaseDelay,MaxDelay:         *nodeDeploymentMaxDelay,}nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout)if err != nil {klog.Fatalf("Failed to get node info from CSI driver: %v", err)}nodeDeployment.NodeInfo = *nodeInfo}var nodeLister listersv1.NodeListervar csiNodeLister storagelistersv1.CSINodeListerif ctrl.SupportsTopology(pluginCapabilities) {if nodeDeployment != nil {// Avoid watching in favor of fake, static objects. This is particularly relevant for// Node objects, which can generate significant traffic.csiNode := &storagev1.CSINode{ObjectMeta: metav1.ObjectMeta{Name: nodeDeployment.NodeName,},Spec: storagev1.CSINodeSpec{Drivers: []storagev1.CSINodeDriver{{Name:   provisionerName,NodeID: nodeDeployment.NodeInfo.NodeId,},},},}node := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeDeployment.NodeName,},}if nodeDeployment.NodeInfo.AccessibleTopology != nil {for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key)}node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments}klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode)// We make those fake objects available to the topology code via informers which// never change.stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour)csiNodes := stoppedFactory.Storage().V1().CSINodes()nodes := stoppedFactory.Core().V1().Nodes()csiNodes.Informer().GetStore().Add(csiNode)nodes.Informer().GetStore().Add(node)csiNodeLister = csiNodes.Lister()nodeLister = nodes.Lister()} else {csiNodeLister = factory.Storage().V1().CSINodes().Lister()nodeLister = factory.Core().V1().Nodes().Lister()}}var referenceGrantLister referenceGrantv1beta1.ReferenceGrantListervar gatewayFactory gatewayInformers.SharedInformerFactoryif utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {gatewayFactory = gatewayInformers.NewSharedInformerFactory(gatewayClient, ctrl.ResyncPeriodOfReferenceGrantInformer)referenceGrants := gatewayFactory.Gateway().V1beta1().ReferenceGrants()referenceGrantLister = referenceGrants.Lister()}// -------------------------------// PersistentVolumeClaims informerrateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()// Setup optionsprovisionerOptions := []func(*controller.ProvisionController) error{controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.controller.FailedProvisionThreshold(0),controller.FailedDeleteThreshold(0),controller.RateLimiter(rateLimiter),controller.Threadiness(int(*workerThreads)),controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),controller.ClaimsInformer(claimInformer),controller.NodesLister(nodeLister),}if utilfeature.DefaultFeatureGate.Enabled(features.HonorPVReclaimPolicy) {provisionerOptions = append(provisionerOptions, controller.AddFinalizer(true))}if supportsMigrationFromInTreePluginName != "" {provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))}// Create the provisioner: it implements the Provisioner interface expected by// the controllercsiProvisioner := ctrl.NewCSIProvisioner(clientset,*operationTimeout,identity,*volumeNamePrefix,*volumeNameUUIDLength,grpcClient,snapClient,provisionerName,pluginCapabilities,controllerCapabilities,supportsMigrationFromInTreePluginName,*strictTopology,*immediateTopology,translator,scLister,csiNodeLister,nodeLister,claimLister,vaLister,referenceGrantLister,*extraCreateMetadata,*defaultFSType,nodeDeployment,*controllerPublishReadOnly,*preventVolumeModeConversion,)var capacityController *capacity.Controllerif *enableCapacity {// Publishing storage capacity information uses its own client// with separate rate limiting.config.QPS = *kubeAPICapacityQPSconfig.Burst = *kubeAPICapacityBurstclientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create client: %v", err)}namespace := os.Getenv("NAMESPACE")if namespace == "" {klog.Fatal("need NAMESPACE env variable for CSIStorageCapacity objects")}var controller *metav1.OwnerReferenceif *capacityOwnerrefLevel >= 0 {podName := os.Getenv("POD_NAME")if podName == "" {klog.Fatal("need POD_NAME env variable to determine CSIStorageCapacity owner")}var err errorcontroller, err = owner.Lookup(config, namespace, podName,schema.GroupVersionKind{Group:   "",Version: "v1",Kind:    "Pod",}, *capacityOwnerrefLevel)if err != nil {klog.Fatalf("look up owner(s) of pod: %v", err)}klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name)}var topologyInformer topology.Informerif nodeDeployment == nil {topologyInformer = topology.NewNodeTopology(provisionerName,clientset,factory.Core().V1().Nodes(),factory.Storage().V1().CSINodes(),workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"),)} else {var segment topology.Segmentif nodeDeployment.NodeInfo.AccessibleTopology != nil {for key, value := range nodeDeployment.NodeInfo.AccessibleTopology.Segments {segment = append(segment, topology.SegmentEntry{Key: key, Value: value})}}klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment)topologyInformer = topology.NewFixedNodeTopology(&segment)}go topologyInformer.RunWorker(ctx)managedByID := "external-provisioner"if *enableNodeDeployment {managedByID = getNameWithMaxLength(managedByID, node, validation.DNS1035LabelMaxLength)}// We only need objects from our own namespace. The normal factory would give// us an informer for the entire cluster. We can further restrict the// watch to just those objects with the right labels.factoryForNamespace = informers.NewSharedInformerFactoryWithOptions(clientset,ctrl.ResyncPeriodOfCsiNodeInformer,informers.WithNamespace(namespace),informers.WithTweakListOptions(func(lo *metav1.ListOptions) {lo.LabelSelector = labels.Set{capacity.DriverNameLabel: provisionerName,capacity.ManagedByLabel:  managedByID,}.AsSelector().String()}),)// We use the V1 CSIStorageCapacity API if available.clientFactory := capacity.NewV1ClientFactory(clientset)cInformer := factoryForNamespace.Storage().V1().CSIStorageCapacities()// This invalid object is used in a v1 Create call to determine// based on the resulting error whether the v1 API is supported.invalidCapacity := &storagev1.CSIStorageCapacity{ObjectMeta: metav1.ObjectMeta{Name: "%123-invalid-name",},}createdCapacity, err := clientset.StorageV1().CSIStorageCapacities(namespace).Create(ctx, invalidCapacity, metav1.CreateOptions{})switch {case err == nil:klog.Fatalf("creating an invalid v1.CSIStorageCapacity didn't fail as expected, got: %s", createdCapacity)case apierrors.IsNotFound(err):// We need to bridge between the v1beta1 API on the// server and the v1 API expected by the capacity code.klog.Info("using the CSIStorageCapacity v1beta1 API")clientFactory = capacity.NewV1beta1ClientFactory(clientset)cInformer = capacity.NewV1beta1InformerBridge(factoryForNamespace.Storage().V1beta1().CSIStorageCapacities())case apierrors.IsInvalid(err):klog.Info("using the CSIStorageCapacity v1 API")default:klog.Fatalf("unexpected error when checking for the V1 CSIStorageCapacity API: %v", err)}capacityController = capacity.NewCentralCapacityController(csi.NewControllerClient(grpcClient),provisionerName,clientFactory,// Metrics for the queue is available in the default registry.workqueue.NewNamedRateLimitingQueue(rateLimiter, "csistoragecapacity"),controller,managedByID,namespace,topologyInformer,factory.Storage().V1().StorageClasses(),cInformer,*capacityPollInterval,*capacityImmediateBinding,*operationTimeout,)legacyregistry.CustomMustRegister(capacityController)// Wrap Provision and Delete to detect when it is time to refresh capacity.csiProvisioner = capacity.NewProvisionWrapper(csiProvisioner, capacityController)}if addr != "" {// Start HTTP server, regardless whether we are the leader or not.// Register provisioner metrics manually to be able to add multiplexer in front of itm := libmetrics.New("controller")reg := prometheus.NewRegistry()reg.MustRegister([]prometheus.Collector{m.PersistentVolumeClaimProvisionTotal,m.PersistentVolumeClaimProvisionFailedTotal,m.PersistentVolumeClaimProvisionDurationSeconds,m.PersistentVolumeDeleteTotal,m.PersistentVolumeDeleteFailedTotal,m.PersistentVolumeDeleteDurationSeconds,}...)provisionerOptions = append(provisionerOptions, controller.MetricsInstance(m))gatherers = append(gatherers, reg)// This is similar to k8s.io/component-base/metrics HandlerWithReset// except that we gather from multiple sources. This is necessary// because both CSI metrics manager and component-base manage// their own registry. Probably could be avoided by making// CSI metrics manager a bit more flexible.mux.Handle(*metricsPath,promhttp.InstrumentMetricHandler(reg,promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})))if *enableProfile {klog.InfoS("Starting profiling", "endpoint", httpEndpoint)mux.HandleFunc("/debug/pprof/", pprof.Index)mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)mux.HandleFunc("/debug/pprof/profile", pprof.Profile)mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)mux.HandleFunc("/debug/pprof/trace", pprof.Trace)}go func() {klog.Infof("ServeMux listening at %q", addr)err := http.ListenAndServe(addr, mux)if err != nil {klog.Fatalf("Failed to start HTTP server at specified address (%q) and metrics path (%q): %s", addr, *metricsPath, err)}}()}logger := klog.FromContext(ctx)provisionController = controller.NewProvisionController(logger,clientset,provisionerName,csiProvisioner,provisionerOptions...,)csiClaimController := ctrl.NewCloningProtectionController(clientset,claimLister,claimInformer,claimQueue,controllerCapabilities,)run := func(ctx context.Context) {factory.Start(ctx.Done())if factoryForNamespace != nil {// Starting is enough, the capacity controller will// wait for sync.factoryForNamespace.Start(ctx.Done())}cacheSyncResult := factory.WaitForCacheSync(ctx.Done())for _, v := range cacheSyncResult {if !v {klog.Fatalf("Failed to sync Informers!")}}if utilfeature.DefaultFeatureGate.Enabled(features.CrossNamespaceVolumeDataSource) {if gatewayFactory != nil {gatewayFactory.Start(ctx.Done())}gatewayCacheSyncResult := gatewayFactory.WaitForCacheSync(ctx.Done())for _, v := range gatewayCacheSyncResult {if !v {klog.Fatalf("Failed to sync Informers for gateway!")}}}if capacityController != nil {go capacityController.Run(ctx, int(*capacityThreads))}if csiClaimController != nil {go csiClaimController.Run(ctx, int(*finalizerThreads))}provisionController.Run(ctx)}if !*enableLeaderElection {run(ctx)} else {// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller// to preserve backwards compatibilitylockName := strings.Replace(provisionerName, "/", "-", -1)// create a new clientset for leader electionleClientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create leaderelection client: %v", err)}le := leaderelection.NewLeaderElection(leClientset, lockName, run)if *httpEndpoint != "" {le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)}if *leaderElectionNamespace != "" {le.WithNamespace(*leaderElectionNamespace)}le.WithLeaseDuration(*leaderElectionLeaseDuration)le.WithRenewDeadline(*leaderElectionRenewDeadline)le.WithRetryPeriod(*leaderElectionRetryPeriod)le.WithIdentity(identity)if err := le.Run(); err != nil {klog.Fatalf("failed to initialize leader election: %v", err)}}
}

NewProvisionController()

  1. 獲取主機名和生成唯一ID
    • 使用os.Hostname()獲取當前主機名,如果獲取失敗,則記錄錯誤日志并退出程序。
    • 將主機名與一個UUID結合,生成一個唯一的ID,以避免在同一主機上運行的多個進程發生沖突。
  2. 初始化事件記錄器
    • 使用record.NewBroadcaster()創建一個事件廣播器,并配置其進行結構化日志記錄和事件記錄。
    • 創建一個eventRecorder,用于記錄事件。
  3. 創建并初始化ProvisionController實例
    • 初始化ProvisionController結構體,包括客戶端、供應器名稱、供應器實現、ID、組件名、事件記錄器等字段。
    • 設置一系列默認值,如重同步周期、錯誤時的指數退避策略、線程數、失敗閾值等。
    • 初始化指標相關配置。
  4. 處理選項函數
    • 遍歷傳入的選項函數列表,對每個函數進行調用,以配置ProvisionController實例。如果某個選項函數執行失敗,則記錄錯誤日志并退出程序。
  5. 初始化速率限制器和工作隊列
    • 根據配置創建速率限制器,并用于初始化claimQueuevolumeQueue兩個工作隊列。
  6. 初始化Informer和事件處理器
    • 使用informers.NewSharedInformerFactory創建共享Informer工廠。
    • 為PersistentVolumeClaims(PVCs)、PersistentVolumes(PVs)和StorageClasses分別設置事件處理器和Indexer。
    • Informer用于監聽Kubernetes資源的變化,并根據變化觸發相應的事件處理函數。
  7. 初始化VolumeStore
    • 根據配置選擇使用NewVolumeStoreQueueNewBackoffStore來初始化volumeStore,用于處理PV的創建和保存邏輯。
// NewProvisionController creates a new provision controller using
// the given configuration parameters and with private (non-shared) informers.
func NewProvisionController(logger klog.Logger,client kubernetes.Interface,provisionerName string,provisioner Provisioner,options ...func(*ProvisionController) error,
) *ProvisionController {id, err := os.Hostname()if err != nil {logger.Error(err, "Error getting hostname")klog.FlushAndExit(klog.ExitFlushTimeout, 1)}// add a uniquifier so that two processes on the same host don't accidentally both become activeid = id + "_" + string(uuid.NewUUID())component := provisionerName + "_" + id// TODO: Once the following PR is merged, change to use StartLogging and StartRecordingToSinkWithContext// https://github.com/kubernetes/kubernetes/pull/120729v1.AddToScheme(scheme.Scheme)broadcaster := record.NewBroadcaster()broadcaster.StartStructuredLogging(0)broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)})eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})controller := &ProvisionController{client:                    client,provisionerName:           provisionerName,provisioner:               provisioner,id:                        id,component:                 component,eventRecorder:             eventRecorder,resyncPeriod:              DefaultResyncPeriod,exponentialBackOffOnError: DefaultExponentialBackOffOnError,threadiness:               DefaultThreadiness,failedProvisionThreshold:  DefaultFailedProvisionThreshold,failedDeleteThreshold:     DefaultFailedDeleteThreshold,leaderElection:            DefaultLeaderElection,leaderElectionNamespace:   getInClusterNamespace(),leaseDuration:             DefaultLeaseDuration,renewDeadline:             DefaultRenewDeadline,retryPeriod:               DefaultRetryPeriod,metrics:                   metrics.New(controllerSubsystem),metricsPort:               DefaultMetricsPort,metricsAddress:            DefaultMetricsAddress,metricsPath:               DefaultMetricsPath,addFinalizer:              DefaultAddFinalizer,hasRun:                    false,hasRunLock:                &sync.Mutex{},}for _, option := range options {err := option(controller)if err != nil {logger.Error(err, "Error processing controller options")klog.FlushAndExit(klog.ExitFlushTimeout, 1)}}var rateLimiter workqueue.RateLimiterif controller.rateLimiter != nil {// rateLimiter set via parameter takes precedencerateLimiter = controller.rateLimiter} else if controller.exponentialBackOffOnError {rateLimiter = workqueue.NewMaxOfRateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)} else {rateLimiter = workqueue.NewMaxOfRateLimiter(workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},)}controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)// ----------------------// PersistentVolumeClaimsclaimHandler := cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) { controller.enqueueClaim(obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },DeleteFunc: func(obj interface{}) {// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual// or it's not in claimsInProgress and then we don't care},}if controller.claimInformer != nil {controller.claimInformer.AddEventHandlerWithResyncPeriod(claimHandler, controller.resyncPeriod)} else {controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()controller.claimInformer.AddEventHandler(claimHandler)}err = controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {uid, err := getObjectUID(obj)if err != nil {return nil, err}return []string{uid}, nil}})if err != nil {logger.Error(err, "Error setting indexer for pvc informer", "indexer", uidIndex)klog.FlushAndExit(klog.ExitFlushTimeout, 1)}controller.claimsIndexer = controller.claimInformer.GetIndexer()// -----------------// PersistentVolumesvolumeHandler := cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) { controller.enqueueVolume(obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },}if controller.volumeInformer != nil {controller.volumeInformer.AddEventHandlerWithResyncPeriod(volumeHandler, controller.resyncPeriod)} else {controller.volumeInformer = informer.Core().V1().PersistentVolumes().Informer()controller.volumeInformer.AddEventHandler(volumeHandler)}controller.volumes = controller.volumeInformer.GetStore()// --------------// StorageClasses// no resource event handler needed for StorageClassesif controller.classInformer == nil {controller.classInformer = informer.Storage().V1().StorageClasses().Informer()}controller.classes = controller.classInformer.GetStore()if controller.createProvisionerPVLimiter != nil {logger.V(2).Info("Using saving PVs to API server in background")controller.volumeStore = NewVolumeStoreQueue(client, controller.createProvisionerPVLimiter, controller.claimsIndexer, controller.eventRecorder)} else {if controller.createProvisionedPVBackoff == nil {// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.if controller.createProvisionedPVInterval == 0 {controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval}if controller.createProvisionedPVRetryCount == 0 {controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount}controller.createProvisionedPVBackoff = &wait.Backoff{Duration: controller.createProvisionedPVInterval,Factor:   1, // linear backoffSteps:    controller.createProvisionedPVRetryCount,// Cap:      controller.createProvisionedPVInterval,}}logger.V(2).Info("Using blocking saving PVs to API server")controller.volumeStore = NewBackoffStore(client, controller.eventRecorder, controller.createProvisionedPVBackoff, controller)}return controller
}

syncClaim()

  1. 判斷是否應該進行供給:
    • 調用ctrl.shouldProvision(ctx, claim)方法來判斷是否需要對這個PVC進行供給操作。如果返回錯誤,則更新供給統計信息并返回錯誤。
    • 如果shouldtrue,表示需要進行供給操作。
  2. 供給操作:
    • 記錄供給操作的開始時間。
    • 從上下文中獲取logger對象。
    • 調用ctrl.provisionClaimOperation(ctx, claim)方法進行供給操作,返回操作狀態和可能的錯誤。
    • 更新供給統計信息,傳入錯誤和開始時間。
  3. 處理供給操作的結果:
    • 如果供給操作沒有錯誤或者狀態是ProvisioningFinished,表示供給操作已經完成或者不需要進行。根據錯誤類型進行不同的處理:
      • 如果沒有錯誤,記錄日志并刪除該PVC在claimsInProgress中的記錄。
      • 如果錯誤是errStopProvision,記錄日志并將錯誤置為nil(調用者會重新排隊處理)。
      • 其他錯誤類型,記錄日志。
    • 如果供給狀態是ProvisioningInBackground,表示供給操作正在后臺進行,記錄日志并將PVC添加到claimsInProgress中。
    • 如果供給狀態是ProvisioningNoChange,不做任何修改,保持claimsInProgress的狀態不變。
  4. 返回錯誤:
    • 如果不需要進行供給操作或者供給操作已經完成并且沒有需要處理的錯誤,則返回nil
    • 否則,返回供給操作中的錯誤。
      這段代碼的主要邏輯是圍繞PVC的供給狀態進行操作,根據供給的結果更新內部狀態(如claimsInProgress),并記錄相關的日志信息。通過這種方式,ProvisionController能夠管理多個PVC的供給過程,確保每個PVC都能夠被正確地處理。
func (ctrl *ProvisionController) syncClaim(ctx context.Context, obj interface{}) error {claim, ok := obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf("expected claim but got %+v", obj)}should, err := ctrl.shouldProvision(ctx, claim)if err != nil {ctrl.updateProvisionStats(claim, err, time.Time{})return err} else if should {startTime := time.Now()logger := klog.FromContext(ctx)status, err := ctrl.provisionClaimOperation(ctx, claim)ctrl.updateProvisionStats(claim, err, startTime)if err == nil || status == ProvisioningFinished {// Provisioning is 100% finished / not in progress.switch err {case nil:logger.V(5).Info("Claim processing succeeded, removing PVC from claims in progress", "claimUID", claim.UID)case errStopProvision:logger.V(5).Info("Stop provisioning, removing PVC from claims in progress", "claimUID", claim.UID)// Our caller would requeue if we pass on this special error; return nil instead.err = nildefault:logger.V(2).Info("Final error received, removing PVC from claims in progress", "claimUID", claim.UID)}ctrl.claimsInProgress.Delete(string(claim.UID))return err}if status == ProvisioningInBackground {// Provisioning is in progress in background.logger.V(2).Info("Temporary error received, adding PVC to claims in progress", "claimUID", claim.UID)ctrl.claimsInProgress.Store(string(claim.UID), claim)} else {// status == ProvisioningNoChange.// Don't change claimsInProgress:// - the claim is already there if previous status was ProvisioningInBackground.// - the claim is not there if if previous status was ProvisioningFinished.}return err}return nil
}
shouldProvision()
  1. 檢查PVC是否已指定卷名
    • 如果claim.Spec.VolumeName不為空,表示這個PVC已經綁定到了一個具體的卷上,因此不需要再進行供給。方法返回false, nil
  2. 檢查Provisioner是否實現了Qualifier接口
    • 通過類型斷言ctrl.provisioner.(Qualifier)檢查ctrl.provisioner是否實現了Qualifier接口。
    • 如果實現了,并且Qualifier接口的ShouldProvision方法返回false,則表示不需要進行供給。方法返回false, nil
  3. 檢查PVC的注解以確定Provisioner
    • 首先嘗試從PVC的注解中獲取annStorageProvisioner的值。
    • 如果不存在,則嘗試獲取annBetaStorageProvisioner的值。
    • 這兩個注解用于指定負責供給卷的Provisioner。
  4. 檢查找到的Provisioner是否是已知的
    • 如果找到了Provisioner的注解,并且這個Provisioner是控制器已知的(通過ctrl.knownProvisioner(provisioner)檢查),則繼續下一步。
  5. 檢查StorageClass的VolumeBindingMode
    • 通過util.GetPersistentVolumeClaimClass(claim)獲取PVC所屬的StorageClass。
    • 通過ctrl.getStorageClass(claimClass)獲取這個StorageClass的詳細信息。
    • 檢查StorageClass的
      檢查StorageClass的VolumeBindingMode。如果設置為storage.VolumeBindingWaitForFirstConsumer(即延遲綁定模式),則需要進一步檢查PVC的注解中是否有annSelectedNode
      • 如果有annSelectedNode且其值不為空,表示已經選定了節點,可以進行供給。方法返回true, nil
      • 如果沒有或值為空,則不進行供給。方法返回false, nil
  6. 默認進行供給
    • 如果StorageClass的VolumeBindingMode不是延遲綁定模式,或者沒有找到VolumeBindingMode,則默認需要進行供給。方法返回true, nil
  7. 未找到Provisioner
    • 如果在PVC的注解中沒有找到任何Provisioner的標識,則不進行供給。方法返回false, nil

總結來說,這段代碼通過檢查PVC的各種屬性和注解,以及關聯的StorageClass的配置,來決定是否需要對這個PVC進行卷的供給。這涉及到檢查是否已經指定了卷、是否滿足特定的供給條件、是否使用了延遲綁定模式等多個方面

// shouldProvision returns whether a claim should have a volume provisioned for
// it, i.e. whether a Provision is "desired"
func (ctrl *ProvisionController) shouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) (bool, error) {if claim.Spec.VolumeName != "" {return false, nil}if qualifier, ok := ctrl.provisioner.(Qualifier); ok {if !qualifier.ShouldProvision(ctx, claim) {return false, nil}}provisioner, found := claim.Annotations[annStorageProvisioner]if !found {provisioner, found = claim.Annotations[annBetaStorageProvisioner]}if found {if ctrl.knownProvisioner(provisioner) {claimClass := util.GetPersistentVolumeClaimClass(claim)class, err := ctrl.getStorageClass(claimClass)if err != nil {return false, err}if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {// When claim is in delay binding mode, annSelectedNode is// required to provision volume.// Though PV controller set annStorageProvisioner only when// annSelectedNode is set, but provisioner may remove// annSelectedNode to notify scheduler to reschedule again.if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {return true, nil}return false, nil}return true, nil}}return false, nil
}
provisionClaimOperation()
  1. 獲取PVC的類別
    • 使用util.GetPersistentVolumeClaimClass(claim)獲取PVC的存儲類別(StorageClass)。
  2. 日志記錄
    • 使用Kubernetes的日志庫klog來記錄日志,包括PVC和StorageClass的信息。
  3. 檢查PV是否已經存在
    • 通過ctrl.getProvisionedVolumeNameForClaim(claim)獲取預期的PV名稱,然后檢查這個PV是否已經在ctrl.volumes中存在。如果存在,說明PV已經被分配,函數返回ProvisioningFinishederrStopProvision
  4. 獲取PVC的引用
    • 使用ref.GetReference(scheme.Scheme, claim)獲取PVC的引用,以便在后續操作中引用這個PVC對象。
  5. 檢查是否可以分配
    • 調用ctrl.canProvision(ctx, claim)檢查當前的ProvisionController是否可以處理這個PVC的分配請求。如果不能,記錄事件并返回錯誤。
  6. 獲取StorageClass信息
    • 通過ctrl.getStorageClass(claimClass)獲取PVC指定的StorageClass的信息。如果獲取失敗或StorageClass的Provisioner不被當前ProvisionController支持,則記錄錯誤并返回。
  7. 獲取選定的節點
    • 如果PVC的注解中指定了選定的節點(annSelectedNodeannAlphaSelectedNode),則嘗試獲取這個節點的信息。如果節點不存在,調用ctrl.provisionVolumeErrorHandling處理錯誤。
  8. 準備分配選項
    • 創建一個ProvisionOptions對象,包含StorageClass、PV名稱、PVC對象和選定的節點信息。
  9. 記錄正常事件
    • 使用ctrl.eventRecorder.Event記錄一個正常事件,表示外部Provisioner正在為PVC分配存儲卷。
  10. 調用Provisioner進行分配
    • 調用ctrl.provisioner.Provision(ctx, options)嘗試分配存儲卷。如果分配失敗,根據錯誤類型進行相應的錯誤處理。
  11. 設置PVC的引用和Finalizer
    • 如果分配成功,設置PV的ClaimRef為PVC的引用,并根據需要添加Finalizer。
  12. 更新PV的元數據和存儲類別
    • 更新PV的注解和存儲類別信息。
  13. 存儲和添加PV
    • 使用ctrl.volumeStore.StoreVolume存儲PV信息,并將PV添加到ctrl.volumes中。
  14. 返回結果
    • 如果所有操作都成功,函數返回ProvisioningFinishednil表示成功完成分配。

這個函數涵蓋了從檢查PV是否存在到實際分配存儲卷,再到更新內部狀態和記錄相關事件的整個過程。它是Kubernetes存儲卷分配流程中的一個關鍵部分,確保了PVC能夠被正確地處理和分配存儲資源。

func (ctrl *ProvisionController) provisionClaimOperation(ctx context.Context, claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {// Most code here is identical to that found in controller.go of kube's PV controller...claimClass := util.GetPersistentVolumeClaimClass(claim)logger := klog.LoggerWithValues(klog.FromContext(ctx), "PVC", klog.KObj(claim), "StorageClass", claimClass)logger.V(4).Info("Started")//  A previous doProvisionClaim may just have finished while we were waiting for//  the locks. Check that PV (with deterministic name) hasn't been provisioned//  yet.pvName := ctrl.getProvisionedVolumeNameForClaim(claim)_, exists, err := ctrl.volumes.GetByKey(pvName)if err == nil && exists {// Volume has been already provisioned, nothing to do.logger.V(4).Info("PersistentVolume already exists, skipping", "PV", pvName)return ProvisioningFinished, errStopProvision}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err := ref.GetReference(scheme.Scheme, claim)if err != nil {logger.Error(err, "Unexpected error getting claim reference")return ProvisioningNoChange, err}// Check if this provisioner can provision this claim.if err = ctrl.canProvision(ctx, claim); err != nil {ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())logger.Error(err, "Failed to provision volume")return ProvisioningFinished, errStopProvision}// For any issues getting fields from StorageClass (including reclaimPolicy & mountOptions),// retry the claim because the storageClass can be fixed/(re)created independently of the claimclass, err := ctrl.getStorageClass(claimClass)if err != nil {logger.Error(err, "Error getting claim's StorageClass's fields")return ProvisioningFinished, err}if !ctrl.knownProvisioner(class.Provisioner) {// class.Provisioner has either changed since shouldProvision() or// annDynamicallyProvisioned contains different provisioner than// class.Provisioner.logger.Error(nil, "Unknown provisioner requested in claim's StorageClass", "provisioner", class.Provisioner)return ProvisioningFinished, errStopProvision}var selectedNode *v1.Node// Get SelectedNodeif nodeName, ok := getString(claim.Annotations, annSelectedNode, annAlphaSelectedNode); ok {if ctrl.nodeLister != nil {selectedNode, err = ctrl.nodeLister.Get(nodeName)} else {selectedNode, err = ctrl.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) // TODO (verult) cache Nodes}if err != nil {// if node does not exist, reschedule and remove volume.kubernetes.io/selected-node annotationif apierrs.IsNotFound(err) {ctx2 := klog.NewContext(ctx, logger)return ctrl.provisionVolumeErrorHandling(ctx2, ProvisioningReschedule, err, claim)}err = fmt.Errorf("failed to get target node: %v", err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())return ProvisioningNoChange, err}}options := ProvisionOptions{StorageClass: class,PVName:       pvName,PVC:          claim,SelectedNode: selectedNode,}ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", klog.KObj(claim)))volume, result, err := ctrl.provisioner.Provision(ctx, options)if err != nil {if ierr, ok := err.(*IgnoredError); ok {// Provision ignored, do nothing and hope another provisioner will provision it.logger.V(4).Info("Volume provision ignored", "reason", ierr)return ProvisioningFinished, errStopProvision}ctx2 := klog.NewContext(ctx, logger)err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)return ctrl.provisionVolumeErrorHandling(ctx2, result, err, claim)}logger.V(4).Info("Volume is provisioned", "PV", volume.Name)// Set ClaimRef and the PV controller will bind and set annBoundByController for usvolume.Spec.ClaimRef = claimRef// Add external provisioner finalizer if it doesn't already have itif ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) {volume.ObjectMeta.Finalizers = append(volume.ObjectMeta.Finalizers, finalizerPV)}metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, class.Provisioner)volume.Spec.StorageClassName = claimClasslogger.V(4).Info("Succeeded")if err := ctrl.volumeStore.StoreVolume(logger, claim, volume); err != nil {return ProvisioningFinished, err}if err = ctrl.volumes.Add(volume); err != nil {utilruntime.HandleError(err)}return ProvisioningFinished, nil
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/76783.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/76783.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/76783.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

react中通過 EventEmitter 在組件間傳遞狀態

要在 Reply 組件中通過 statusChangeEvent 發送狀態值,并在 Select 組件中接收這個狀態值 status,你可以按照以下步驟實現: //Event.jsimport EventEmitter from events;export const statusChangeEvent new EventEmitter();// 工單狀態切換…

1534. 統計好三元組

1534. 統計好三元組 - 力扣&#xff08;LeetCode&#xff09; 給你一個整數數組 arr &#xff0c;以及 a、b 、c 三個整數。請你統計其中好三元組的數量。 如果三元組 (arr[i], arr[j], arr[k]) 滿足下列全部條件&#xff0c;則認為它是一個 好三元組 。 0 < i < j &l…

如何配置AWS EKS自動擴展組:實現高效彈性伸縮

本文詳細講解如何在AWS EKS中配置節點組&#xff08;Node Group&#xff09;和Pod的自動擴展&#xff0c;優化資源利用率并保障應用高可用。 一、準備工作 工具安裝 安裝并配置AWS CLI 安裝eksctl&#xff08;EKS管理工具&#xff09; 安裝kubectl&#xff08;Kubernetes命令…

FPGA_UART

1.UART 概述 &#xff08;通用異步收發傳輸器&#xff09; 1. 基本定義 UART&#xff08;Universal Asynchronous Receiver/Transmitter&#xff09;是一種常見的串行通信協議&#xff0c;用于在設備間通過異步串行通信傳輸數據。它不依賴獨立的時鐘信號&#xff0c;而是通過預…

openwrt軟路由配置4--文件共享

1.安裝samba opkg update opkg install luci-app-samba4安裝好之后重啟設備&#xff0c;系統界面服務下面會多一個network shares 2.創建磁盤分區并掛載到共享目錄 openwrt剛剛安裝的時候空間都是很小的&#xff0c;共享目錄我是打算用來存放一些電影視頻之類的大文件。所以我…

Vue ‘v-model‘ directives require the attribute value which is valid as LHS.

1、問題描述 在項目開發中&#xff0c;如果將el-checkbox組件的v-model指令改為使用三元表達式時&#xff0c;會報出【vue/valid-v-model】的錯誤&#xff0c;如下圖所示&#xff1a; 2、分析原因 根據錯誤提示&#xff0c;是因為v-model指令始終把Vue實例的data視為數據真實…

基于 Qt 的 BMP 圖像數據存取至 SQLite 數據庫的實現

基于 Qt 的 BMP 圖像數據存取至 SQLite 數據庫的實現說明 本項目通過 Qt 框架實現了將 BMP 圖像文件以二進制形式存入 SQLite 數據庫&#xff0c;并可從數據庫中讀取還原為 BMP 圖像文件的功能&#xff0c;適用于需要圖像與結構化數據統一管理的場景。 整個流程分為兩個主要部…

嵌入式基礎(三)基礎外設

嵌入式基礎&#xff08;三&#xff09;基礎外設 1.什么是UART&#xff1f;與USART有什么區別??? (1)什么是UART 通用異步收發傳輸器&#xff08;Universal Asynchronous Receiver/Transmitter)&#xff0c;通常稱作UART。是一種異步全雙工串行通信協議&#xff0c;它將要…

人力資源管理方向論文怎么寫?

目錄 一、人力資源管理方向論文選題 二、人力資源管理方向論文參考資料 隨著經濟的蓬勃發展&#xff0c;企業日益意識到引才、善用人才、留住人才對于業務發展的至關重要性。人力資源管理逐漸成為企業管理中的核心職能&#xff0c;其角色日益凸顯。近年來&#xff0c;“人力資…

機器學習 從入門到精通 day_05

1. 線性回歸 前面介紹了很多分類算法&#xff0c;分類的目標變量是標稱型數據&#xff0c;回歸是對連續型的數據做出預測。 標稱型數據&#xff08;Nominal Data&#xff09;是統計學和數據分析中的一種數據類型&#xff0c;它用于分類或標記不同的類別或組別,數據點之間并沒有…

神經子圖同構計數

摘要 本文研究了一個新的圖學習問題&#xff1a;學習計算子圖同構。與其他傳統的圖學習問題&#xff0c;如節點分類和鏈接預測不同&#xff0c;子圖同構計數是NP完全的&#xff0c;需要更多的全局推理來監督整個圖。為了使其可擴展為大規模的圖形和模式&#xff0c;我們提出了一…

開源模型應用落地-模型上下文協議(MCP)-第三方MCP Server實戰指南(五)

一、前言 在AI技術高速發展的2025年,如何讓大語言模型(LLM)更靈活地調用外部工具與數據,成為開發者關注的焦點。?模型上下文協議(MCP)?作為AI與外部資源的“萬能接口”,通過標準化交互框架解決了傳統集成中的碎片化問題。而第三方MCP Server的引入,進一步降低了開發門…

【2025年認證杯數學中國數學建模網絡挑戰賽】C題 數據預處理與問題一二求解

目錄 【2025年認證杯數學建模挑戰賽】C題數據預處理與問題一求解三、數據預處理及分析3.1 數據可視化3.2 滑動窗口相關系數統計與動態置信區間耦合分析模型3.3 耦合關系分析結果 四、問題一代碼數據預處理問題一 【2025年認證杯數學建模挑戰賽】C題 數據預處理與問題一求解 三…

AI Agent開發大全第二十八課-MCP實現本地命令調用怎么做的?

開篇 MCP很強大,Client端一旦實現了穩定的連接和執行流程后任Server端隨意改動都可兼容,這就是熱插撥功能。 如果我們僅僅滿足于MCP查點網上資料、讀點圖片即文字型的功能肯定是不能充分發揮MCP的強大之處的,正應了Google以及Anthropic最近的研究報告上說的:不要再在chat…

AJAX原理與XMLHttpRequest

目錄 一、XMLHttpRequest使用步驟 基本語法 步驟 1&#xff1a;創建 XHR 對象 步驟 2&#xff1a;調用 open() 方法 步驟 3&#xff1a;監聽 loadend 事件 步驟 4&#xff1a;調用 send() 方法 二、完整示例 1. GET 請求&#xff08;帶查詢參數&#xff09; 2. POST 請…

python寫個0~12個月寶寶喂養規劃表

下載字體&#xff1a;https://github.com/adobe-fonts/source-han-sans/releases 下載fpdf2 pip uninstall fpdf pip install fpdf2運行代碼 ?from fpdf import FPDF from fpdf.enums import XPos, YPos# 創建 PDF 類 class BabyFeedingPDF(FPDF):def header(self):self.s…

集中趨勢描述

一、集中趨勢的定義與核心目標 集中趨勢指數據向其中心值聚集的傾向,反映數據的典型水平或分布中心。其核心是通過統計指標(如眾數、中位數、均值)概括數據的核心特征,幫助快速理解數據分布的核心位置。 核心作用:簡化復雜數據、指導業務決策(如確定用戶平均消費水平)、…

【NLP】Attention機制

1.模型對比 RNN(馬爾科夫鏈式編碼) 通過遞歸計算逐個處理 token,當前編碼結果 h t h_t ht?僅依賴前一步的隱藏狀態 h t ? 1 h_{t-1} ht?1?和當前輸入 x t x_t xt?局限性:序列建模需嚴格串行,無法并行;長距離依賴易丟失(梯度消失/爆炸)例:雙向 LSTM 需正向+反向兩…

基于OpenCV與PyTorch的智能相冊分類器全棧實現教程

引言&#xff1a;為什么需要智能相冊分類器&#xff1f; 在數字影像爆炸的時代&#xff0c;每個人的相冊都存儲著數千張未整理的照片。手動分類不僅耗時&#xff0c;還容易遺漏重要瞬間。本文將手把手教你構建一個基于深度學習的智能相冊分類系統&#xff0c;實現&#xff1a;…

活動安排問題 之 前綴和與差分

文章目錄 D. Robert Hood and Mrs Hood 考慮到一個活動開始時間和結束時間s,e&#xff0c;那么可以影響到的范圍就是 s-d1,e,所以我們只需對這個每一個活動可以影響到的區域進行標記即可&#xff0c;當然為了降低時間復雜度&#xff0c;我們將使用前綴和與差分 t int(input()…