openmetadata自定義連接器開發教程
一、開發通用自定義連接器教程
官網教程鏈接:
1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors
2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector
(一)創建服務類型自定義連接器類
參考文檔:https://docs.open-metadata.org/v1.3.x/sdk/python/build-connector/source#for-consumers-of-openmetadata-ingestion-to-define-custom-connectors-in-their-own-package-with-same-namespace
1.創建自定義連接器
示例:my_csv_connector.py
"""
自定義Database Service 從 CSV 文件中提取元數據
"""
import csv
import tracebackfrom pydantic import BaseModel, ValidationError, validator
from pathlib import Path
from typing import Iterable, Optional, List, Dict, Anyfrom metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from metadata.ingestion.api.steps import Source, InvalidSourceException
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (CustomDatabaseConnection,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.api.data.createDatabaseSchema import (CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.entity.services.databaseService import (DatabaseService,
)
from metadata.generated.schema.entity.data.table import (Column,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class InvalidCsvConnectorException(Exception):"""Sample data is not valid to be ingested"""class CsvModel(BaseModel):name: strcolumn_names: List[str]column_types: List[str]@validator("column_names", "column_types", pre=True)def str_to_list(cls, value):"""Suppose that the internal split is in ;"""return value.split(";")class CsvConnector(Source):"""Custom connector to ingest Database metadata.We'll suppose that we can read metadata from a CSVwith a custom database name from a business_unit connection option."""# 內置方法def __init__(self, config: WorkflowSource, metadata: OpenMetadata):self.config = configself.metadata = metadata# 獲取配置信息self.service_connection = config.serviceConnection.__root__.configself.source_directory: str = (# 獲取CSV文件路徑self.service_connection.connectionOptions.__root__.get("source_directory"))if not self.source_directory:raise InvalidCsvConnectorException("未獲取到source_directory配置信息")self.business_unit: str = (# 獲取自定義的數據庫名稱self.service_connection.connectionOptions.__root__.get("business_unit"))if not self.business_unit:raise InvalidCsvConnectorException("未獲取到business_unit配置信息")self.data: Optional[List[CsvModel]] = Nonesuper().__init__()# 內置函數@classmethoddef create(cls, config_dict: dict, metadata_config: OpenMetadataConnection) -> "CsvConnector":config: WorkflowSource = WorkflowSource.parse_obj(config_dict)connection: CustomDatabaseConnection = config.serviceConnection.__root__.configif not isinstance(connection, CustomDatabaseConnection):raise InvalidSourceException(f"Expected CustomDatabaseConnection, but got {connection}")return cls(config, metadata_config)# 靜態方法:按行讀取@staticmethoddef read_row_safe(row: Dict[str, Any]):try:return CsvModel.parse_obj(row)except ValidationError:logger.warning(f"Error parsing row {row}. Skipping it.")# 預處理:讀取文件及數據def prepare(self):# Validate that the file existssource_data = Path(self.source_directory)if not source_data.exists():raise InvalidCsvConnectorException("Source Data path does not exist")try:with open(source_data, "r", encoding="utf-8") as file:reader = csv.DictReader(file)# 讀取數據self.data = [self.read_row_safe(row) for row in reader]except Exception as exc:logger.error("Unknown error reading the source file")raise excdef yield_create_request_database_service(self):yield Either(# 串講元數據讀取服務right=self.metadata.get_create_service_from_source(entity=DatabaseService, config=self.config))# 業務原數據庫名處理方法def yield_business_unit_db(self):# 選擇我們剛剛創建的服務(如果不是UI)# 獲取提取服務對象service_entity: DatabaseService = self.metadata.get_by_name(entity=DatabaseService, fqn=self.config.serviceName)yield Either(right=CreateDatabaseRequest(name=self.business_unit,service=service_entity.fullyQualifiedName,))# chems處理方法def yield_default_schema(self):# Pick up the service we just created (if not UI)database_entity: Database = self.metadata.get_by_name(entity=Database, fqn=f"{self.config.serviceName}.{self.business_unit}")yield Either(right=CreateDatabaseSchemaRequest(name="default",database=database_entity.fullyQualifiedName,))# 業務元數據處理方法def yield_data(self):"""Iterate over the data list to create tables"""database_schema: DatabaseSchema = self.metadata.get_by_name(entity=DatabaseSchema,fqn=f"{self.config.serviceName}.{self.business_unit}.default",)# 異常處理# 假設我們有一個要跟蹤的故障# try:# 1/0# except Exception:# yield Either(# left=StackTraceError(# name="My Error",# error="Demoing one error",# stackTrace=traceback.format_exc(),# )# )# 解析csv元數據信息(獲取列名和類型)for row in self.data:yield Either(right=CreateTableRequest(name=row.name,databaseSchema=database_schema.fullyQualifiedName,columns=[Column(name=model_col[0],dataType=model_col[1],)for model_col in zip(row.column_names, row.column_types)],))# 迭代器:元數據迭代返回def _iter(self) -> Iterable[Entity]:# 數據庫元數據信息存儲yield from self.yield_create_request_database_service()# 業務源數據庫yield from self.yield_business_unit_db()# 業務schemayield from self.yield_default_schema()# 業務數據yield from self.yield_data()# 測試數據庫連接def test_connection(self) -> None:pass# 連接關閉def close(self):pass
(二)將自定義連接器方法打包編譯進ingestion鏡像
項目目錄:
Dockerfile:
FROM openmetadata/ingestion:1.3.1# Let's use the same workdir as the ingestion image
WORKDIR ingestion
USER airflow# Install our custom connector
COPY connector connector
COPY setup.py .
COPY sample.csv .
#COPY person_info.proto .
RUN pip install --no-deps .
編譯服務鏡像
docker build -t om-ingestion:build -f Dockerfile .
(三)部署新版ingestion服務()
docker-compose up -d
docker-compose-ingestion.yml
version: "3.9"
volumes:ingestion-volume-dag-airflow:ingestion-volume-dags:ingestion-volume-tmp:es-data:
services: ingestion:container_name: om_ingestionimage: om-ingestion:buildenvironment:AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"AIRFLOW__CORE__EXECUTOR: LocalExecutorAIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"DB_SCHEME: ${AIRFLOW_DB_SCHEME:-postgresql+psycopg2}DB_HOST: ${AIRFLOW_DB_HOST:-host.docker.internal}DB_PORT: ${AIRFLOW_DB_PORT:-5432}AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}DB_USER: ${AIRFLOW_DB_USER:-airflow_user}DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}# extra connection-string properties for the database# EXAMPLE# require SSL (only for Postgres)# properties: "?sslmode=require"DB_PROPERTIES: ${AIRFLOW_DB_PROPERTIES:-}# To test the lineage backend# AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend# AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflowAIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://host.docker.internal:8585/apiAIRFLOW__LINEAGE__JWT_TOKEN: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDk3MDkyNDMsImV4cCI6bnVsbH0.U7XIYZjJAmJ-p3WTy4rTGGSzUxZeNpjOsHzrWRz7n-zAl-GZvznZWMKX5nSX_KwRHAK3UYuO1UX2-ZbeZxdpzhyumycNFyWzwMs8G6iEGoaM6doGhqCgHileco8wcAoaTXKHTnwa80ddWHt4dqZmikP7cIhLg9etKAepQNQibefewHbaLOoCrFyo9BqFeZzNaVBo1rogNtslWaDO6Wnk_rx0jxRLTy57Thq7R7YS_nZd-JVfYf72BEFHJ_WDZym4k-dusV0PWGzMPYIXq3s1KbpPBt_tUSz4cUrXbLuI5-ZsOWIvUhsLeHJDU-35-RymylhMrQ92kZjsy7v2nl6apQentrypoint: /bin/bashcommand:- "/opt/airflow/ingestion_dependency.sh"expose:- 8080ports:- "8080:8080"networks:- app_net_ingestionvolumes:- ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs- ingestion-volume-dags:/opt/airflow/dags- ingestion-volume-tmp:/tmpnetworks:app_net_ingestion:ipam:driver: defaultconfig:- subnet: "172.16.240.0/24"
(四)根據服務類型選擇對應類型的custom服務創建采集器測試
點擊保存添加元數據提取器:
二、開發內置連接器教程(Streamsets)
官網教程鏈接:https://docs.open-metadata.org/v1.3.x/developers/contribute/developing-a-new-connector
(一)定義連接器class類json模版(streamSetsConnection.json)
目錄:openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/streamSetsConnection.json
{"$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/streamSetsConnection.json","$schema": "http://json-schema.org/draft-07/schema#","title": "StreamSetsConnection","description": "StreamSets Metadata Pipeline Connection Config","type": "object","javaType": "org.openmetadata.schema.services.connections.pipeline.StreamSetsConnection","definitions": {"StreamSetsType": {"description": "Service type.","type": "string","enum": ["StreamSets"],"default": "StreamSets"},"basicAuthentication": {"title": "Username Authentication","description": "Login username","type":"object","properties": {"username": {"title": "Username","description": "StreamSets user to authenticate to the API.","type": "string"}},"additionalProperties": false}},"properties": {"type": {"title": "Service Type","description": "Service Type","$ref": "#/definitions/StreamSetsType","default": "StreamSets"},"hostPort": {"expose": true,"title": "Host And Port","description": "Pipeline Service Management/UI URI.","type": "string","format": "uri"},"streamSetsConfig": {"title": "StreamSets Credentials Configuration","description": "We support username authentication","oneOf": [{"$ref": "#/definitions/basicAuthentication"}]},"supportsMetadataExtraction": {"title": "Supports Metadata Extraction","$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"}},"additionalProperties": false,"required": ["hostPort", "streamSetsConfig"]
}
(二)開發采集器源碼:
目錄:ingestion/src/metadata/ingestion/source/pipeline/streamsets/*
1.streamsets連接客戶端(client.py)
import logging
import traceback
from typing import Any, Iterable, Optionalimport requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth# 設置日志記錄器
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)REQUESTS_TIMEOUT = 60 * 5def clean_uri(uri: str) -> str:"""清理URI,確保它以HTTP或HTTPS開頭"""if not uri.startswith(("http://", "https://")):return "http://" + urireturn uriclass StreamSetsClient:"""在StreamSets Data Collector REST API之上的包裝器"""def __init__(self,host_port: str,username: Optional[str] = None,password: Optional[str] = None,verify: bool = False,):self.api_endpoint = clean_uri(host_port) + "/rest"self.username = usernameself.password = passwordself.verify = verifyself.headers = {"Content-Type": "application/json"}def get(self, path: str) -> Optional[Any]:"""GET方法包裝器"""try:res = requests.get(f"{self.api_endpoint}/{path}",verify=self.verify,headers=self.headers,timeout=REQUESTS_TIMEOUT,auth=HTTPBasicAuth(self.username, self.password),)res.raise_for_status()return res.json()except HTTPError as err:logger.warning(f"Connection error calling the StreamSets API - {err}")raise errexcept ValueError as err:logger.warning(f"Cannot pick up the JSON from API response - {err}")raise errexcept Exception as err:logger.warning(f"Unknown error calling StreamSets API - {err}")raise errdef list_pipelines(self) -> Iterable[dict]:"""List all pipelines"""try:return self.get("v1/pipelines")except Exception as err:logger.error(traceback.format_exc())raise errdef get_pipeline_details(self, pipeline_id: str) -> dict:"""Get a specific pipeline by ID"""return self.get(f"v1/pipeline/{pipeline_id}?rev=0&get=pipeline")def test_list_pipeline_detail(self) -> Iterable[dict]:"""Test API access for listing pipelines"""return self.list_pipelines()
2.連接器和測試連接器(connection.py)
"""
源連接處理程序
"""
from typing import Optionalfrom metadata.generated.schema.entity.automations.workflow import (Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (BasicAuthentication,StreamSetsConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.streamsets.client import StreamSetsClientdef get_connection(connection: StreamSetsConnection) -> StreamSetsClient:"""Create connection"""if isinstance(connection.streamSetsConfig, BasicAuthentication):return StreamSetsClient(host_port=connection.hostPort,username=connection.streamSetsConfig.username,password="95bd7977208bc935cac3656f4a9eea3a",verify=False,)def test_connection(metadata: OpenMetadata,client: StreamSetsClient,service_connection: StreamSetsConnection,automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:"""元數據工作流或自動化工作流期間測試連接。這可以作為一部分執行"""def custom_executor():list(client.list_pipelines())test_fn = {"GetPipelines": custom_executor}test_connection_steps(metadata=metadata,test_fn=test_fn,service_type=service_connection.type.value,automation_workflow=automation_workflow,)
3.元數據提取器(metadata.py)
"""
提取StreamSets 源的元數據
"""
import traceback
from typing import Iterable, List, Optional, Anyfrom metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from pydantic import BaseModel, ValidationErrorfrom metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (StreamSetsConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()class StagesDetails(BaseModel):instanceName: strlabel:strstageType: strstageName: strdescription: strinputLanes: List[str]outputLanes: List[str]downstream_task_names: set[str] = set()class StreamSetsPipelineDetails(BaseModel):"""Defines the necessary StreamSets information"""uuid: strpipelineId: strtitle: strname: strcreated: intcreator: strdescription: strclass StreamsetsSource(PipelineServiceSource):"""執行必要的方法,從 Airflow 的元數據數據庫中提取管道元數據"""@classmethoddef create(cls, config_dict: dict, metadata: OpenMetadata):logger.info("create..........")config: WorkflowSource = WorkflowSource.parse_obj(config_dict)logger.info(f"WorkflowSource: {config}")connection: StreamSetsConnection = config.serviceConnection.__root__.configlogger.info(f"StreamSetsConnection: {connection}")if not isinstance(connection, StreamSetsConnection):raise InvalidSourceException(f"Expected StreamSetsConnection, but got {connection}")return cls(config, metadata)def yield_pipeline(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[CreatePipelineRequest]]:logger.info("yield_pipeline.......")try:connection_url = Noneif self.service_connection.hostPort:connection_url = (f"{clean_uri(self.service_connection.hostPort)}/rest/v1/pipelines")logger.info(f"pipeline_details:{pipeline_details}")logger.info(f"connection_url:{connection_url}")pipeline_request = CreatePipelineRequest(name=pipeline_details.name,displayName=pipeline_details.title,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",tasks=self._get_tasks_from_details(pipeline_details),service=self.context.pipeline_service,)yield Either(right=pipeline_request)self.register_record(pipeline_request=pipeline_request)except TypeError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=(f"Error building DAG information from {pipeline_details}. There might be Airflow version"f" incompatibilities - {err}"),stackTrace=traceback.format_exc(),))except ValidationError as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Error building pydantic model for {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))except Exception as err:self.context.task_names = set()yield Either(left=StackTraceError(name=pipeline_details.dag_id,error=f"Wild error ingesting pipeline {pipeline_details} - {err}",stackTrace=traceback.format_exc(),))# 獲取解析管道詳情def _get_tasks_from_details(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[Task]]:logger.info("_get_tasks_from_details.......")logger.info(f"StreamSetsPipelineDetails:{pipeline_details}")try:stages = self.get_stages_by_pipline(pipeline_details)return [Task(name=stage.instanceName,displayName=stage.label,sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",taskType=stage.stageType,description=stage.description,downstreamTasks=list(stage.downstream_task_names)if stage.downstream_task_nameselse [],)for stage in stages]except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}.")return Nonedef yield_pipeline_lineage_details(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[AddLineageRequest]]:logger.info("yield_pipeline_lineage_details..........")"""將連接轉換為管道實體:param pipeline_details: 來自 StreamSets的pipeline_details對象return:使用任務創建管道請求"""passdef get_pipelines_list(self) -> Optional[List[StreamSetsPipelineDetails]]:logger.info("get_pipelines_list..........")"""Get List of all pipelines"""if self.connection.list_pipelines() is not None:for list_pipeline in self.connection.list_pipelines():logger.info(f"pipeline:{list_pipeline}")try:yield StreamSetsPipelineDetails(uuid=list_pipeline.get("uuid"),pipelineId=list_pipeline.get("pipelineId"),title=list_pipeline.get("title"),name=list_pipeline.get("name"),created=list_pipeline.get("created"),creator=list_pipeline.get("creator"),description=list_pipeline.get("description"),)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StreamSetsPipelineDetails from {list_pipeline} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {list_pipeline} - {err}.")else:return None# 獲取上下關聯關系def get_stages_lane(self, stages: Optional[List[StagesDetails]]) -> {}:logger.info("get_stages_lane......")input_lane_to_stage_map = {}for stage in stages:logger.info(f"stage_info:{stage}")for input_lane in stage.get("inputLanes", []):try:if input_lane_to_stage_map.get(input_lane) is None:input_lane_to_stage_map[input_lane] = set()input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))else:input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get stages from Pipeline Details {stages} - {err}.")logger.info(f"input_lane_to_stage_map:{input_lane_to_stage_map}")return input_lane_to_stage_mapdef get_stages_by_pipline(self, pipeline_details: StreamSetsPipelineDetails) -> Optional[List[StagesDetails]]:logger.info("get_stages_by_pipline")pipeline_detail = self.connection.get_pipeline_details(pipeline_details.pipelineId)stages = []if pipeline_detail.get("stages"):stages = pipeline_detail.get("stages")input_lane_to_stage_map = self.get_stages_lane(stages)for stage in stages:logger.info(f"stage:{stage}")try:input_lanes = stage.get("inputLanes", [])output_lanes = stage.get("outputLanes", [])downstream_stage_names = set()for output_lane in stage.get("outputLanes", []):if output_lane in input_lane_to_stage_map.keys():for down_stage in input_lane_to_stage_map.get(output_lane, []):downstream_stage_names.add(down_stage)yield StagesDetails(instanceName=stage.get("instanceName"),label=stage["uiInfo"].get("label"),stageType=stage["uiInfo"].get("stageType"),stageName=stage.get("stageName"),description=stage["uiInfo"].get("description"),inputLanes=input_lanes,outputLanes=output_lanes,downstream_task_names=downstream_stage_names)except (ValueError, KeyError, ValidationError) as err:logger.debug(traceback.format_exc())logger.warning(f"Cannot create StagesDetails from {stage} - {err}")except Exception as err:logger.debug(traceback.format_exc())logger.warning(f"Wild error encountered when trying to get pipelines from Process Group {stage} - {err}.")def get_pipeline_name(self, pipeline_details: StreamSetsPipelineDetails) -> str:return pipeline_details.namedef yield_pipeline_status(self, pipeline_details: StreamSetsPipelineDetails) -> Iterable[Either[OMetaPipelineStatus]]:pass
(三)修改前端ui源碼,添加連接器對象
目錄:openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts
/** Copyright 2022 Collate.* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* http://www.apache.org/licenses/LICENSE-2.0* Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
import streamSetsConnection from '../jsons/connectionSchemas/connections/pipeline/streamSetsConnection.json';export const getPipelineConfig = (type: PipelineServiceType) => {let schema = {};const uiSchema = { ...COMMON_UI_SCHEMA };switch (type) {case PipelineServiceType.Airbyte: {schema = airbyteConnection;break;}case PipelineServiceType.Airflow: {schema = airflowConnection;break;}case PipelineServiceType.GluePipeline: {schema = gluePipelineConnection;break;}case PipelineServiceType.Fivetran: {schema = fivetranConnection;break;}case PipelineServiceType.Dagster: {schema = dagsterConnection;break;}case PipelineServiceType.Nifi: {schema = nifiConnection;break;}case PipelineServiceType.StreamSets: {schema = streamSetsConnection;break;}case PipelineServiceType.DomoPipeline: {schema = domoPipelineConnection;break;}case PipelineServiceType.CustomPipeline: {schema = customPipelineConnection;break;}case PipelineServiceType.DatabricksPipeline: {schema = databricksPipelineConnection;break;}case PipelineServiceType.Spline: {schema = splineConnection;break;}default:break;}return cloneDeep({ schema, uiSchema });
};
(四)前端ui源碼,添加MD說明文檔
路徑:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/StreamSets.md
# StreamSets
在本節中,我們將提供使用 StreamSets 連接器的指南和參考。## 要求
系統 支持 StreamSets 連接器的 1 種連接類型:
- **基本認證**:使用用戶名對 StreamSets 進行登陸。您可以在 [docs](https://docs.open-metadata.org/connectors/pipeline/StreamSets) 中找到有關 StreamSets 連接器的詳細信息。## 連接詳細信息
$$section
### Host and Port $(id="hostPort")
管道服務管理 URI。這應指定為格式為"scheme://hostname:port"的 URI 字符串。例如,“http://localhost:8443”、“http://host.docker.internal:8443”。
$$$$section
### StreamSets Config $(id="StreamSetsConfig")
OpenMetadata 支持基本身份驗證(用戶名/密碼身份驗證。有關詳細信息,請參閱要求部分。
$$$$section
### Username $(id="username")
用于連接到 StreamSets 的用戶名。此用戶應該能夠向 StreamSets API 發送請求并訪問“資源”終結點。
$$
(五)創建 Java ClassConverter(可選)
無
(六)構建dockefile重新構建鏡像
server服務Dockerfile
# Build stage
FROM alpine:3.19 AS buildCOPY openmetadata-dist/target/openmetadata-*.tar.gz /
#COPY docker/openmetadata-start.sh /RUN mkdir -p /opt/openmetadata && \tar zxvf openmetadata-*.tar.gz -C /opt/openmetadata --strip-components 1 && \rm openmetadata-*.tar.gz# Final stage
FROM alpine:3.19EXPOSE 8585RUN adduser -D openmetadata && \apk update && \apk upgrade && \apk add --update --no-cache bash openjdk17-jre tzdata
ENV TZ=Asia/ShanghaiCOPY --chown=openmetadata:openmetadata --from=build /opt/openmetadata /opt/openmetadata
COPY --chmod=755 docker/openmetadata-start.sh /USER openmetadataWORKDIR /opt/openmetadata
ENTRYPOINT [ "/bin/bash" ]
CMD ["/openmetadata-start.sh"]
ingestion服務Dockerfile
路徑:ingestion/Dockerfile
FROM apache/airflow:2.7.3-python3.10#FROM docker-compose-ingestion-ingestion:latest
USER root
RUN curl -sS https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl -sS https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
# Install Dependencies (listed in alphabetical order)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -qq update \&& apt-get -qq install -y \tzdata \alien \build-essential \default-libmysqlclient-dev \freetds-bin \freetds-dev \gcc \gnupg \libaio1 \libevent-dev \libffi-dev \libpq-dev \librdkafka-dev \libsasl2-dev \libsasl2-2 \libsasl2-modules \libsasl2-modules-gssapi-mit \libssl-dev \libxml2 \libkrb5-dev \openjdk-11-jre \openssl \postgresql \postgresql-contrib \tdsodbc \unixodbc \unixodbc-dev \unzip \vim \git \wget --no-install-recommends \# Accept MSSQL ODBC License&& ACCEPT_EULA=Y apt-get install -y msodbcsql18 \&& rm -rf /var/lib/apt/lists/*RUN if [[ $(uname -m) == "arm64" || $(uname -m) == "aarch64" ]]; \then \wget -q https://download.oracle.com/otn_software/linux/instantclient/191000/instantclient-basic-linux.arm64-19.10.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \else \wget -q https://download.oracle.com/otn_software/linux/instantclient/1917000/instantclient-basic-linux.x64-19.17.0.0.0dbru.zip -O /oracle-instantclient.zip && \unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \fiENV LD_LIBRARY_PATH=/instantclient# Security patches for base image
# monitor no fixed version for
# https://security.snyk.io/vuln/SNYK-DEBIAN11-LIBTASN16-3061097
# https://security.snyk.io/vuln/SNYK-DEBIAN11-MARIADB105-2940589
# https://security.snyk.io/vuln/SNYK-DEBIAN11-BIND9-3027852
# https://security.snyk.io/vuln/SNYK-DEBIAN11-EXPAT-3023031 we are already installed the latest
RUN echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/backports.list
RUN apt-get -qq update \&& apt-get -qq install -t bullseye-backports -y \curl \libpcre2-8-0 \postgresql-common \expat \bind9# Required for Starting Ingestion Container in Docker Compose
# Provide Execute Permissions to shell script
COPY --chown=airflow:0 --chmod=775 ingestion/ingestion_dependency.sh /opt/airflow
# Required for Ingesting Sample Data
COPY --chown=airflow:0 --chmod=775 ingestion /home/airflow/ingestionCOPY --chown=airflow:0 --chmod=775 openmetadata-airflow-apis /home/airflow/openmetadata-airflow-apis# Required for Airflow DAGs of Sample Data
#COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dagsUSER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
ENV TZ=Asia/Shanghai# Disable pip cache dir
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1RUN pip install --upgrade pipWORKDIR /home/airflow/openmetadata-airflow-apis
RUN pip install "."WORKDIR /home/airflow/ingestion# 提供要安裝的引入依賴項的參數。默認為全部提供要安裝的引入依賴項的參數。默認為全部
ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[${INGESTION_DEPENDENCY}]"# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
RUN echo "Image built for $(uname -m)"
RUN if [[ $(uname -m) != "aarch64" ]]; \then \pip install "ibm-db-sa~=0.4"; \fi# bump python-daemon for https://github.com/apache/airflow/pull/29916
RUN pip install "python-daemon>=3.0.0"
# remove all airflow providers except for docker and cncf kubernetes
RUN pip freeze | grep "apache-airflow-providers" | grep --invert-match -E "docker|http|cncf" | xargs pip uninstall -y
# Uninstalling psycopg2-binary and installing psycopg2 instead
# because the psycopg2-binary generates a architecture specific error
# while authenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient==2.1.1
# Make required folders for openmetadata-airflow-apis
RUN mkdir -p /opt/airflow/dag_generated_configsEXPOSE 8080
# This is required as it's responsible to create airflow.cfg file
RUN airflow db init && rm -f /opt/airflow/airflow.db
(七)構建服務鏡像
根目錄下執行構建server:
docker build -t om-server:build -f docker/development/Dockerfile .
根目錄下執行構建ingestion:
docker build -t om-ingestion:build -f ingestion/Dockerfile .
(八)部署新版服務
docker-compose -f docker/development/docker-compose-postgres.yml up -d
(九)訪問服務,創建streamsets元數據采集