openmetadata1.3.1 自定義連接器 開發教程

openmetadata自定義連接器開發教程

一、開發通用自定義連接器教程

官網教程鏈接:

1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors

2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector

(一)創建服務類型自定義連接器類

參考文檔:https://docs.open-metadata.org/v1.3.x/sdk/python/build-connector/source#for-consumers-of-openmetadata-ingestion-to-define-custom-connectors-in-their-own-package-with-same-namespace

1.創建自定義連接器

示例:my_csv_connector.py

"""
自定義Database Service 從 CSV 文件中提取元數據
"""
import csv
import tracebackfrom pydantic import BaseModel, ValidationError, validator
from pathlib import Path
from typing import Iterable, Optional, List, Dict, Anyfrom metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from metadata.ingestion.api.steps import Source, InvalidSourceException
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (CustomDatabaseConnection,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.api.data.createDatabaseSchema import (CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.entity.services.databaseService import (DatabaseService,
)
from metadata.generated.schema.entity.data.table import (Column,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class InvalidCsvConnectorException(Exception):"""Sample data is not valid to be ingested"""class CsvModel(BaseModel):name: strcolumn_names: List[str]column_types: List[str]@validator("column_names", "column_types", pre=True)def str_to_list(cls, value):"""Suppose that the internal split is in ;"""return value.split(";")class CsvConnector(Source):"""Custom connector to ingest Database metadata.We'll suppose that we can read metadata from a CSVwith a custom database name from a business_unit connection option."""# 內置方法def __init__(self, config: WorkflowSource, metadata: OpenMetadata):self.config = configself.metadata = metadata# 獲取配置信息self.service_connection = config.serviceConnection.__root__.configself.source_directory: str = (# 獲取CSV文件路徑self.service_connection.connectionOptions.__root__.get("source_directory"))if not self.source_directory:raise InvalidCsvConnectorException("未獲取到source_directory配置信息")self.business_unit: str = (# 獲取自定義的數據庫名稱self.service_connection.connectionOptions.__root__.get("business_unit"))if not self.business_unit:raise InvalidCsvConnectorException("未獲取到business_unit配置信息")self.data: Optional[List[CsvModel]] = Nonesuper().__init__()# 內置函數@classmethoddef create(cls, config_dict: dict, metadata_config: OpenMetadataConnection) -> "CsvConnector":config: WorkflowSource = WorkflowSource.parse_obj(config_dict)connection: CustomDatabaseConnection = config.serviceConnection.__root__.configif not isinstance(connection, CustomDatabaseConnection):raise InvalidSourceException(f"Expected CustomDatabaseConnection, but got {connection}")return cls(config, metadata_config)# 靜態方法:按行讀取@staticmethoddef read_row_safe(row: Dict[str, Any]):try:return CsvModel.parse_obj(row)except ValidationError:logger.warning(f"Error parsing row {row}. Skipping it.")# 預處理:讀取文件及數據def prepare(self):# Validate that the file existssource_data = Path(self.source_directory)if not source_data.exists():raise InvalidCsvConnectorException("Source Data path does not exist")try:with open(source_data, "r", encoding="utf-8") as file:reader = csv.DictReader(file)# 讀取數據self.data = [self.read_row_safe(row) for row in reader]except Exception as exc:logger.error("Unknown error reading the source file")raise excdef yield_create_request_database_service(self):yield Either(# 串講元數據讀取服務right=self.metadata.get_create_service_from_source(entity=DatabaseService, config=self.config))# 業務原數據庫名處理方法def yield_business_unit_db(self):# 選擇我們剛剛創建的服務(如果不是UI)# 獲取提取服務對象service_entity: DatabaseService = self.metadata.get_by_name(entity=DatabaseService, fqn=self.config.serviceName)yield Either(right=CreateDatabaseRequest(name=self.business_unit,service=service_entity.fullyQualifiedName,))# chems處理方法def yield_default_schema(self):# Pick up the service we just created (if not UI)database_entity: Database = self.metadata.get_by_name(entity=Database, fqn=f"{self.config.serviceName}.{self.business_unit}")yield Either(right=CreateDatabaseSchemaRequest(name="default",database=database_entity.fullyQualifiedName,))# 業務元數據處理方法def yield_data(self):"""Iterate over the data list to create tables"""database_schema: DatabaseSchema = self.metadata.get_by_name(entity=DatabaseSchema,fqn=f"{self.config.serviceName}.{self.business_unit}.default",)# 異常處理# 假設我們有一個要跟蹤的故障# try:#     1/0# except Exception:#     yield Either(#         left=StackTraceError(#             name="My Error",#             error="Demoing one error",#             stackTrace=traceback.format_exc(),#         )#     )# 解析csv元數據信息(獲取列名和類型)for row in self.data:yield Either(right=CreateTableRequest(name=row.name,databaseSchema=database_schema.fullyQualifiedName,columns=[Column(name=model_col[0],dataType=model_col[1],)for model_col in zip(row.column_names, row.column_types)],))# 迭代器:元數據迭代返回def _iter(self) -> Iterable[Entity]:# 數據庫元數據信息存儲yield from self.yield_create_request_database_service()# 業務源數據庫yield from self.yield_business_unit_db()# 業務schemayield from self.yield_default_schema()# 業務數據yield from self.yield_data()# 測試數據庫連接def test_connection(self) -> None:pass# 連接關閉def close(self):pass

(二)將自定義連接器方法打包編譯進ingestion鏡像

項目目錄:

image-20240701153616535

Dockerfile:

FROM openmetadata/ingestion:1.3.1# Let's use the same workdir as the ingestion image
WORKDIR ingestion
USER airflow# Install our custom connector
COPY connector connector
COPY setup.py .
COPY sample.csv .
#COPY person_info.proto .
RUN pip install --no-deps .

編譯服務鏡像

docker build -t om-ingestion:build -f Dockerfile .

(三)部署新版ingestion服務()

docker-compose up -d

docker-compose-ingestion.yml

version: "3.9"
volumes:ingestion-volume-dag-airflow:ingestion-volume-dags:ingestion-volume-tmp:es-data:
services:  ingestion:container_name: om_ingestionimage: om-ingestion:buildenvironment:AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"AIRFLOW__CORE__EXECUTOR: LocalExecutorAIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"DB_SCHEME: ${AIRFLOW_DB_SCHEME:-postgresql+psycopg2}DB_HOST: ${AIRFLOW_DB_HOST:-host.docker.internal}DB_PORT: ${AIRFLOW_DB_PORT:-5432}AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}DB_USER: ${AIRFLOW_DB_USER:-airflow_user}DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}# extra connection-string properties for the database# EXAMPLE# require SSL (only for Postgres)# properties: "?sslmode=require"DB_PROPERTIES: ${AIRFLOW_DB_PROPERTIES:-}# To test the lineage backend# AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend# AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflowAIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://host.docker.internal:8585/apiAIRFLOW__LINEAGE__JWT_TOKEN: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDk3MDkyNDMsImV4cCI6bnVsbH0.U7XIYZjJAmJ-p3WTy4rTGGSzUxZeNpjOsHzrWRz7n-zAl-GZvznZWMKX5nSX_KwRHAK3UYuO1UX2-ZbeZxdpzhyumycNFyWzwMs8G6iEGoaM6doGhqCgHileco8wcAoaTXKHTnwa80ddWHt4dqZmikP7cIhLg9etKAepQNQibefewHbaLOoCrFyo9BqFeZzNaVBo1rogNtslWaDO6Wnk_rx0jxRLTy57Thq7R7YS_nZd-JVfYf72BEFHJ_WDZym4k-dusV0PWGzMPYIXq3s1KbpPBt_tUSz4cUrXbLuI5-ZsOWIvUhsLeHJDU-35-RymylhMrQ92kZjsy7v2nl6apQentrypoint: /bin/bashcommand:- "/opt/airflow/ingestion_dependency.sh"expose:- 8080ports:- "8080:8080"networks:- app_net_ingestionvolumes:- ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs- ingestion-volume-dags:/opt/airflow/dags- ingestion-volume-tmp:/tmpnetworks:app_net_ingestion:ipam:driver: defaultconfig:- subnet: "172.16.240.0/24"

(四)根據服務類型選擇對應類型的custom服務創建采集器測試

image-20240701160552070

點擊保存添加元數據提取器:

image-20240701160623658

image-20240701160654370

二、開發內置連接器教程(Streamsets)

官網教程鏈接:https://docs.open-metadata.org/v1.3.x/developers/contribute/developing-a-new-connector

(一)定義連接器class類json模版(streamSetsConnection.json)

目錄openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/streamSetsConnection.json

{"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/streamSetsConnection.json","$schema": "http://json-schema.org/draft-07/schema#","title": "StreamSetsConnection","description": "StreamSets Metadata Pipeline Connection Config","type": "object","javaType": "org.openmetadata.schema.services.connections.pipeline.StreamSetsConnection","definitions": {"StreamSetsType": {"description": "Service type.","type": "string","enum": ["StreamSets"],"default": "StreamSets"},"basicAuthentication": {"title": "Username Authentication","description": "Login username","type":"object","properties": {"username": {"title": "Username","description": "StreamSets user to authenticate to the API.","type": "string"}},"additionalProperties": false}},"properties": {"type": {"title": "Service Type","description": "Service Type","$ref": "#/definitions/StreamSetsType","default": "StreamSets"},"hostPort": {"expose": true,"title": "Host And Port","description": "Pipeline Service Management/UI URI.","type": "string","format": "uri"},"streamSetsConfig": {"title": "StreamSets Credentials Configuration","description": "We support username authentication","oneOf": [{"$ref": "#/definitions/basicAuthentication"}]},"supportsMetadataExtraction": {"title": "Supports Metadata Extraction","$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"}},"additionalProperties": false,"required": ["hostPort", "streamSetsConfig"]
}

(二)開發采集器源碼:

目錄:ingestion/src/metadata/ingestion/source/pipeline/streamsets/*

image-20240701162822027

1.streamsets連接客戶端(client.py)

import logging
import traceback
from typing import Any, Iterable, Optionalimport requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth# 設置日志記錄器
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)REQUESTS_TIMEOUT = 60 * 5def clean_uri(uri: str) -> str:"""清理URI,確保它以HTTP或HTTPS開頭"""if not uri.startswith(("http://", "https://")):return "http://" + urireturn uriclass StreamSetsClient:"""在StreamSets Data Collector REST API之上的包裝器"""def __init__(self,host_port: str,username: Optional[str] = None,password: Optional[str] = None,verify: bool = False,):self.api_endpoint = clean_uri(host_port) + "/rest"self.username = usernameself.password = passwordself.verify = verifyself.headers = {"Content-Type": "application/json"}def get(self, path: str) -> Optional[Any]:"""GET方法包裝器"""try:res = requests.get(f"{self.api_endpoint}/{path}",verify=self.verify,headers=self.headers,timeout=REQUESTS_TIMEOUT,auth=HTTPBasicAuth(self.username, self.password),)res.raise_for_status()return res.json()except HTTPError as err:logger.warning(f"Connection error calling the StreamSets API - {err}")raise errexcept ValueError as err:logger.warning(f"Cannot pick up the JSON from API response - {err}")raise errexcept Exception as err:logger.warning(f"Unknown error calling StreamSets API - {err}")raise errdef list_pipelines(self) -> Iterable[dict]:"""List all pipelines"""try:return self.get("v1/pipelines")except Exception as err:logger.error(traceback.format_exc())raise errdef get_pipeline_details(self, pipeline_id: str) -> dict:"""Get a specific pipeline by ID"""return self.get(f"v1/pipeline/{pipeline_id}?rev=0&get=pipeline")def test_list_pipeline_detail(self) -> Iterable[dict]:"""Test API access for listing pipelines"""return self.list_pipelines()

2.連接器和測試連接器(connection.py)

"""
源連接處理程序
"""
from typing import Optionalfrom metadata.generated.schema.entity.automations.workflow import (Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (BasicAuthentication,StreamSetsConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.streamsets.client import StreamSetsClientdef get_connection(connection: StreamSetsConnection) -> StreamSetsClient:"""Create connection"""if isinstance(connection.streamSetsConfig, BasicAuthentication):return StreamSetsClient(host_port=connection.hostPort,username=connection.streamSetsConfig.username,password="95bd7977208bc935cac3656f4a9eea3a",verify=False,)def test_connection(metadata: OpenMetadata,client: StreamSetsClient,service_connection: StreamSetsConnection,automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:"""元數據工作流或自動化工作流期間測試連接。這可以作為一部分執行"""def custom_executor():list(client.list_pipelines())test_fn = {"GetPipelines": custom_executor}test_connection_steps(metadata=metadata,test_fn=test_fn,service_type=service_connection.type.value,automation_workflow=automation_workflow,)

3.元數據提取器(metadata.py)

"""
提取StreamSets 源的元數據 
"""
import traceback
from typing import Iterable, List, Optional, Anyfrom metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from pydantic import BaseModel, ValidationErrorfrom metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (StreamSetsConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class StagesDetails(BaseModel):instanceName: strlabel:strstageType: strstageName: strdescription: strinputLanes: List[str]outputLanes: List[str]downstream_task_names: set[str] = set()class StreamSetsPipelineDetails(BaseModel):"""Defines the necessary StreamSets information"""uuid: strpipelineId: strtitle: strname: strcreated: intcreator: strdescription: strclass StreamsetsSource(PipelineServiceSource):"""執行必要的方法,從 Airflow 的元數據數據庫中提取管道元數據"""@classmethoddef create(cls, config_dict: dict, metadata: OpenMetadata):logger.info("create..........")config: WorkflowSource = WorkflowSource.parse_obj(config_dict)logger.info(f"WorkflowSource: {config}")connection: StreamSetsConnection = config.serviceConnection.__root__.configlogger.info(f"StreamSetsConnection: {connection}")if not isinstance(connection, StreamSetsConnection):raise InvalidSourceException(f"Expected StreamSetsConnection, but got {connection}")return cls(config, metadata)def yield_pipeline(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[CreatePipelineRequest]]:logger.info("yield_pipeline.......")try:connection_url = Noneif self.service_connection.hostPort:connection_url = (f"{clean_uri(self.service_connection.hostPort)}/rest/v1/pipelines")logger.info(f"pipeline_details:{pipeline_details}")logger.info(f"connection_url:{connection_url}")pipeline_request = CreatePipelineRequest(name=pipeline_details.name,displayName=pipeline_details.title,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",tasks=self._get_tasks_from_details(pipeline_details),service=self.context.pipeline_service,)yield Either(right=pipeline_request)self.register_record(pipeline_request=pipeline_request)except TypeError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=(f"Error building DAG information from {pipeline_details}. There might be Airflow version"f" incompatibilities - {err}"),stackTrace=traceback.format_exc(),))except ValidationError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Error building pydantic model for {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))except Exception as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Wild error ingesting pipeline {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))# 獲取解析管道詳情def _get_tasks_from_details(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[Task]]:logger.info("_get_tasks_from_details.......")logger.info(f"StreamSetsPipelineDetails:{pipeline_details}")try:stages = self.get_stages_by_pipline(pipeline_details)return [Task(name=stage.instanceName,displayName=stage.label,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",taskType=stage.stageType,description=stage.description,downstreamTasks=list(stage.downstream_task_names)if stage.downstream_task_nameselse [],)for stage in stages]except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}.")return Nonedef yield_pipeline_lineage_details(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[AddLineageRequest]]:logger.info("yield_pipeline_lineage_details..........")"""將連接轉換為管道實體:param pipeline_details: 來自  StreamSets的pipeline_details對象return:使用任務創建管道請求"""passdef get_pipelines_list(self) -> Optional[List[StreamSetsPipelineDetails]]:logger.info("get_pipelines_list..........")"""Get List of all pipelines"""if self.connection.list_pipelines() is not None:for list_pipeline in self.connection.list_pipelines():logger.info(f"pipeline:{list_pipeline}")try:yield StreamSetsPipelineDetails(uuid=list_pipeline.get("uuid"),pipelineId=list_pipeline.get("pipelineId"),title=list_pipeline.get("title"),name=list_pipeline.get("name"),created=list_pipeline.get("created"),creator=list_pipeline.get("creator"),description=list_pipeline.get("description"),)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StreamSetsPipelineDetails from {list_pipeline} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {list_pipeline} - {err}.")else:return None# 獲取上下關聯關系def get_stages_lane(self, stages: Optional[List[StagesDetails]]) -> {}:logger.info("get_stages_lane......")input_lane_to_stage_map = {}for stage in stages:logger.info(f"stage_info:{stage}")for input_lane in stage.get("inputLanes", []):try:if input_lane_to_stage_map.get(input_lane) is None:input_lane_to_stage_map[input_lane] = set()input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))else:input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get stages from Pipeline Details {stages} - {err}.")logger.info(f"input_lane_to_stage_map:{input_lane_to_stage_map}")return input_lane_to_stage_mapdef get_stages_by_pipline(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[StagesDetails]]:logger.info("get_stages_by_pipline")pipeline_detail = self.connection.get_pipeline_details(pipeline_details.pipelineId)stages = []if pipeline_detail.get("stages"):stages = pipeline_detail.get("stages")input_lane_to_stage_map = self.get_stages_lane(stages)for stage in stages:logger.info(f"stage:{stage}")try:input_lanes = stage.get("inputLanes", [])output_lanes = stage.get("outputLanes", [])downstream_stage_names = set()for output_lane in stage.get("outputLanes", []):if output_lane in input_lane_to_stage_map.keys():for down_stage in input_lane_to_stage_map.get(output_lane, []):downstream_stage_names.add(down_stage)yield StagesDetails(instanceName=stage.get("instanceName"),label=stage["uiInfo"].get("label"),stageType=stage["uiInfo"].get("stageType"),stageName=stage.get("stageName"),description=stage["uiInfo"].get("description"),inputLanes=input_lanes,outputLanes=output_lanes,downstream_task_names=downstream_stage_names)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StagesDetails from {stage} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {stage} - {err}.")def get_pipeline_name(self, pipeline_details: StreamSetsPipelineDetails) -> str:return pipeline_details.namedef yield_pipeline_status(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[OMetaPipelineStatus]]:pass

(三)修改前端ui源碼,添加連接器對象

目錄:openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts

/**  Copyright 2022 Collate.*  Licensed under the Apache License, Version 2.0 (the "License");*  you may not use this file except in compliance with the License.*  You may obtain a copy of the License at*  http://www.apache.org/licenses/LICENSE-2.0*  Unless required by applicable law or agreed to in writing, software*  distributed under the License is distributed on an "AS IS" BASIS,*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.*  See the License for the specific language governing permissions and*  limitations under the License.*/import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
import streamSetsConnection from '../jsons/connectionSchemas/connections/pipeline/streamSetsConnection.json';export const getPipelineConfig = (type: PipelineServiceType) => {let schema = {};const uiSchema = { ...COMMON_UI_SCHEMA };switch (type) {case PipelineServiceType.Airbyte: {schema = airbyteConnection;break;}case PipelineServiceType.Airflow: {schema = airflowConnection;break;}case PipelineServiceType.GluePipeline: {schema = gluePipelineConnection;break;}case PipelineServiceType.Fivetran: {schema = fivetranConnection;break;}case PipelineServiceType.Dagster: {schema = dagsterConnection;break;}case PipelineServiceType.Nifi: {schema = nifiConnection;break;}case PipelineServiceType.StreamSets: {schema = streamSetsConnection;break;}case PipelineServiceType.DomoPipeline: {schema = domoPipelineConnection;break;}case PipelineServiceType.CustomPipeline: {schema = customPipelineConnection;break;}case PipelineServiceType.DatabricksPipeline: {schema = databricksPipelineConnection;break;}case PipelineServiceType.Spline: {schema = splineConnection;break;}default:break;}return cloneDeep({ schema, uiSchema });
};

(四)前端ui源碼,添加MD說明文檔

路徑:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/StreamSets.md

# StreamSets
在本節中,我們將提供使用 StreamSets 連接器的指南和參考。## 要求
系統 支持 StreamSets 連接器的 1 種連接類型:
- **基本認證**:使用用戶名對 StreamSets 進行登陸。您可以在 [docs](https://docs.open-metadata.org/connectors/pipeline/StreamSets) 中找到有關 StreamSets 連接器的詳細信息。## 連接詳細信息
$$section
### Host and Port $(id="hostPort")
管道服務管理 URI。這應指定為格式為"scheme://hostname:port"的 URI 字符串。例如,“http://localhost:8443”、“http://host.docker.internal:8443”。
$$$$section
### StreamSets Config $(id="StreamSetsConfig")
OpenMetadata 支持基本身份驗證(用戶名/密碼身份驗證。有關詳細信息,請參閱要求部分。
$$$$section
### Username $(id="username")
用于連接到 StreamSets 的用戶名。此用戶應該能夠向 StreamSets API 發送請求并訪問“資源”終結點。
$$

(五)創建 Java ClassConverter(可選)

(六)構建dockefile重新構建鏡像

server服務Dockerfile

# Build stage
FROM alpine:3.19 AS buildCOPY openmetadata-dist/target/openmetadata-*.tar.gz /
#COPY docker/openmetadata-start.sh /RUN mkdir -p /opt/openmetadata && \tar zxvf openmetadata-*.tar.gz -C /opt/openmetadata --strip-components 1 && \rm openmetadata-*.tar.gz# Final stage
FROM alpine:3.19EXPOSE 8585RUN adduser -D openmetadata && \apk update && \apk upgrade && \apk add --update --no-cache bash openjdk17-jre tzdata
ENV TZ=Asia/ShanghaiCOPY --chown=openmetadata:openmetadata --from=build /opt/openmetadata /opt/openmetadata
COPY --chmod=755 docker/openmetadata-start.sh /USER openmetadataWORKDIR /opt/openmetadata
ENTRYPOINT [ "/bin/bash" ]
CMD ["/openmetadata-start.sh"]

ingestion服務Dockerfile

路徑:ingestion/Dockerfile

FROM apache/airflow:2.7.3-python3.10#FROM docker-compose-ingestion-ingestion:latest
USER root
RUN curl -sS https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl -sS https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
# Install Dependencies (listed in alphabetical order)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -qq update \&& apt-get -qq install -y \tzdata \alien \build-essential \default-libmysqlclient-dev \freetds-bin \freetds-dev \gcc \gnupg \libaio1 \libevent-dev \libffi-dev \libpq-dev \librdkafka-dev \libsasl2-dev \libsasl2-2 \libsasl2-modules \libsasl2-modules-gssapi-mit \libssl-dev \libxml2 \libkrb5-dev \openjdk-11-jre \openssl \postgresql \postgresql-contrib \tdsodbc \unixodbc \unixodbc-dev \unzip \vim \git \wget --no-install-recommends \# Accept MSSQL ODBC License&& ACCEPT_EULA=Y apt-get install -y msodbcsql18 \&& rm -rf /var/lib/apt/lists/*RUN if [[ $(uname -m) == "arm64" || $(uname -m) == "aarch64" ]]; \then \wget -q https://download.oracle.com/otn_software/linux/instantclient/191000/instantclient-basic-linux.arm64-19.10.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \else \wget -q https://download.oracle.com/otn_software/linux/instantclient/1917000/instantclient-basic-linux.x64-19.17.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \fiENV LD_LIBRARY_PATH=/instantclient# Security patches for base image
# monitor no fixed version for
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-LIBTASN16-3061097
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-MARIADB105-2940589
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-BIND9-3027852
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-EXPAT-3023031 we are already installed the latest
RUN echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/backports.list
RUN apt-get -qq update \&& apt-get -qq install -t bullseye-backports -y \curl \libpcre2-8-0 \postgresql-common \expat \bind9# Required for Starting Ingestion Container in Docker Compose
# Provide Execute Permissions to shell script
COPY --chown=airflow:0 --chmod=775 ingestion/ingestion_dependency.sh /opt/airflow
# Required for Ingesting Sample Data
COPY --chown=airflow:0 --chmod=775 ingestion /home/airflow/ingestionCOPY --chown=airflow:0 --chmod=775 openmetadata-airflow-apis /home/airflow/openmetadata-airflow-apis# Required for Airflow DAGs of Sample Data
#COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dagsUSER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
ENV TZ=Asia/Shanghai# Disable pip cache dir
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1RUN pip install --upgrade pipWORKDIR /home/airflow/openmetadata-airflow-apis
RUN pip install "."WORKDIR /home/airflow/ingestion# 提供要安裝的引入依賴項的參數。默認為全部提供要安裝的引入依賴項的參數。默認為全部
ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[${INGESTION_DEPENDENCY}]"# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
RUN echo "Image built for $(uname -m)"
RUN if [[ $(uname -m) != "aarch64" ]]; \then \pip install "ibm-db-sa~=0.4"; \fi# bump python-daemon for https://github.com/apache/airflow/pull/29916
RUN pip install "python-daemon>=3.0.0"
# remove all airflow providers except for docker and cncf kubernetes
RUN pip freeze | grep "apache-airflow-providers" | grep --invert-match -E "docker|http|cncf" | xargs pip uninstall -y
# Uninstalling psycopg2-binary and installing psycopg2 instead 
# because the psycopg2-binary generates a architecture specific error 
# while authenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient==2.1.1
# Make required folders for openmetadata-airflow-apis
RUN mkdir -p /opt/airflow/dag_generated_configsEXPOSE 8080
# This is required as it's responsible to create airflow.cfg file
RUN airflow db init && rm -f /opt/airflow/airflow.db

(七)構建服務鏡像

根目錄下執行構建server:

docker build -t om-server:build -f docker/development/Dockerfile .

根目錄下執行構建ingestion:

docker build -t om-ingestion:build -f ingestion/Dockerfile .

(八)部署新版服務

docker-compose -f docker/development/docker-compose-postgres.yml up -d

(九)訪問服務,創建streamsets元數據采集

image-20240701165027755

image-20240701165054548

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/40736.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/40736.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/40736.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Matplotlib 文本

可以使用 xlabel、ylabel、text向圖中添加文本 mu, sigma 100, 15 x mu sigma * np.random.randn(10000)# the histogram of the data n, bins, patches plt.hist(x, 50, densityTrue, facecolorg, alpha0.75)plt.xlabel(Smarts) plt.ylabel(Probability) plt.title(Histo…

Qt讀取ini格式配置文件的類設計

目錄 1.引言 2.QSettings 2.1.功能特點 2.2.基本用法 3.讀取ini文件配置通用類設計 3.1.設計要點 3.2.完整實現 3.3.調用方法 4.總結 1.引言 在編寫應用程序的時,有些參數需要用戶配置,那么這些參數就涉及到存儲了,單從存儲來講&…

git 還原被刪除的分支

在多人項目開發中,有一次碰到忘記合并到master分支了,直接就把開發分支給刪除了,現在記錄下怎么還原被刪除的分支 必須保證刪除的分支之前已經被推送到了遠程倉庫 # 找出被刪除分支的最后一個提交的哈希值 git reflog show# 找到提交哈希值…

2024/07/04

1、梳理筆記(原創) 2、終端輸入一個日期&#xff0c;判斷是這一年的第幾天 scanf("%d-%d-%d",&y,&m,&d); 閏年2月29天&#xff0c;平年2月28天 #include<stdio.h> int main(int argc, char const *argv[]) {int y0,m0,d0;printf("please ente…

析構函數和拷貝構造函數

文章目錄 析構函數1.析構函數的定義&#xff1a;2.析構函數的語法&#xff1a;3.析構函數的特性&#xff1a; 拷貝構造函數1.拷貝構造函數的定義&#xff1a;2.拷貝構造函數的語法3.拷貝構造函數的特性(1)拷貝構造函數是構造函數的一個重載形式**(這個其實也很好理解&#xff0…

鴻蒙開發設備管理:【@ohos.thermal (熱管理)】

熱管理 該模塊提供熱管理相關的接口&#xff0c;包括熱檔位查詢及注冊回調等功能。 說明&#xff1a; 本模塊首批接口從API version 8開始支持。后續版本的新增接口&#xff0c;采用上角標單獨標記接口的起始版本。開發前請熟悉鴻蒙開發指導文檔&#xff1a;gitee.com/li-shi…

如何實現圖片垂直旋轉90度的問題

非常簡單的問題&#xff0c;一串代碼就可以解決。復制修改一下就可以直接使用&#xff0c;一個簡單的小demo。寫項目的時候需要寫的功能&#xff0c;不到二十行代碼就可以實現。 <html> <head><title>旋轉圖片</title><meta http-equiv"Conte…

Land survey boundary report (template)

Land survey boundary report (template) 土地勘測定界報告&#xff08;模板&#xff09;.doc

【高校科研前沿】南京地理與湖泊研究所博士后夏凡為第一作者在環境科學與水資源領域Top期刊發文:鈣對云南洱海溶解有機質與浮游細菌相互作用的調控作用

文章簡介 論文名稱&#xff1a;Calcium regulates the interactions between dissolved organic matter and planktonic bacteria in Erhai Lake, Yunnan Province, China 第一作者及單位&#xff1a;夏凡&#xff08;博士后|中國科學院南京地理與湖泊研究所&#xff09; 通訊…

Git指令

一 參考&#xff1a;https://zhuanlan.zhihu.com/p/389814854 1.clone遠程倉庫 git clone https://git.xiaojukeji.com/falcon-mg/dagger.git 2.增加當前子目錄下所有更改過的文件至index git add . 3.提交并備注‘xxx’ git commit -m ‘xxx’ 4.顯示本地分支 git branch 5.顯…

【pytorch13】激活函數及梯度

什么是激活函數 計算機科學家借鑒生物的神經元機制發明了計算機上的模型&#xff0c;這個模型與生物的神經元非常類似 激活的意思就是z變量要大于0&#xff0c;這一個節點才會激活&#xff0c;否則就會處于睡眠狀態不會輸出電平值 該激活函數在z0處不可導&#xff0c;因此不能…

Asp .Net Core 系列:基于 Castle DynamicProxy + Autofac 實踐 AOP 以及實現事務、用戶填充功能

文章目錄 什么是 AOP &#xff1f;.Net Core 中 有哪些 AOP 框架&#xff1f;基于 Castle DynamicProxy 實現 AOPIOC中使用 Castle DynamicProxy實現事務管理實現用戶自動填充 什么是 AOP &#xff1f; AOP&#xff08;Aspect-Oriented Programming&#xff0c;面向切面編程&a…

OpenCV——把YOLO格式的圖片目標截圖,并按目標類別保存

import os import cv2def get_class_folder(catagetory,class_id, base_folder):# 根據類別ID創建文件夾路徑class_folder os.path.join(base_folder, catagetory[int(class_id)])if not os.path.exists(class_folder):os.makedirs(class_folder)return class_folderdef crop_…

VPN是什么?

VPN&#xff0c;全稱Virtual Private Network&#xff0c;即“虛擬私人網絡”&#xff0c;是一種在公共網絡&#xff08;如互聯網&#xff09;上建立加密、安全的連接通道的技術。簡單來說&#xff0c;VPN就像是一條在公共道路上鋪設的“秘密隧道”&#xff0c;通過這條隧道傳輸…

圖像的反轉

圖像顏色的反轉一般分為兩種&#xff1a;一種是灰度圖片的顏色反轉&#xff0c;另一種是彩色圖像的顏色反轉。 本節使用的原圖如下&#xff1a; 1.1 灰度圖像顏色反轉 灰度圖像每個像素點只有一個像素值來表示&#xff0c;色彩范圍在0-255之間&#xff0c;反轉方法255-當前像…

信創產業政策,信創測試方面

信創產業的政策支持主要體現在多個方面&#xff0c;這些政策旨在推動產業的快速發展&#xff0c;加強自主創新能力&#xff0c;保障國家信息安全&#xff0c;以及促進產業結構的優化升級。 首先&#xff0c;政府通過財政支持、稅收優惠等方式&#xff0c;加大對信創產業的資金…

8.ApplicationContext常見實現

ClassPathXmlApplicationContext 基于classpath下xml格式的配置文件來創建 <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:xsi"http://www.w3.org/2001/XMLSchema-i…

Flutter——最詳細(Drawer)使用教程

背景 應用左側或右側導航面板&#xff1b; 屬性作用elevation相當于陰影的大小 import package:flutter/material.dart;class CustomDrawer extends StatelessWidget {const CustomDrawer({Key? key}) : super(key: key);overrideWidget build(BuildContext context) {return…

解決SeaTunnel 2.3.4版本寫入S3文件報錯問題

在使用Apache SeaTunnel時&#xff0c;我遇到了一個寫入S3文件的報錯問題。通過深入調試和分析&#xff0c;找到了問題所在&#xff0c;并提出了相應的解決方案。 本文將詳細介紹報錯情況、參考資料、解決思路以及后續研究方向&#xff0c;希望對大家有幫助&#xff01; 一、…

代碼隨想錄算法訓練營第二天|【數組】977.有序數組的平方

題目 給你一個按 非遞減順序 排序的整數數組 nums&#xff0c;返回 每個數字的平方 組成的新數組&#xff0c;要求也按 非遞減順序 排序。 示例 1&#xff1a; 輸入&#xff1a;nums [-4,-1,0,3,10] 輸出&#xff1a;[0,1,9,16,100] 解釋&#xff1a;平方后&#xff0c;數組…