6. 基礎設施層
基礎設施層為知識庫創建功能提供底層技術支撐,包括數據存儲、緩存、消息隊列、文檔處理、向量化等核心服務。
6.1 數據存儲服務
6.1.1 MySQL數據庫
文件位置: backend/infra/rdb/mysql.go
// MySQLConfig MySQL配置
type MySQLConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Username string `yaml:"username"`Password string `yaml:"password"`Database string `yaml:"database"`MaxOpenConns int `yaml:"max_open_conns"`MaxIdleConns int `yaml:"max_idle_conns"`MaxLifetime int `yaml:"max_lifetime"`
}// NewMySQLConnection 創建MySQL連接
func NewMySQLConnection(config *MySQLConfig) (*gorm.DB, error) {dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",config.Username, config.Password, config.Host, config.Port, config.Database)db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: logger.Default.LogMode(logger.Info),NamingStrategy: schema.NamingStrategy{SingularTable: true,},})if err != nil {return nil, fmt.Errorf("連接MySQL失敗: %w", err)}sqlDB, err := db.DB()if err != nil {return nil, fmt.Errorf("獲取SQL DB失敗: %w", err)}// 設置連接池參數sqlDB.SetMaxOpenConns(config.MaxOpenConns)sqlDB.SetMaxIdleConns(config.MaxIdleConns)sqlDB.SetConnMaxLifetime(time.Duration(config.MaxLifetime) * time.Second)return db, nil
}
6.1.2 Redis緩存
文件位置: backend/infra/cache/redis.go
// RedisConfig Redis配置
type RedisConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Password string `yaml:"password"`DB int `yaml:"db"`PoolSize int `yaml:"pool_size"`
}// NewRedisClient 創建Redis客戶端
func NewRedisClient(config *RedisConfig) *redis.Client {rdb := redis.NewClient(&redis.Options{Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),Password: config.Password,DB: config.DB,PoolSize: config.PoolSize,})return rdb
}// KnowledgeCacheManager 知識庫緩存管理器
type KnowledgeCacheManager struct {redisClient *redis.ClientlocalCache *cache.Cache
}func (c *KnowledgeCacheManager) SetKnowledge(ctx context.Context, knowledge *model.Knowledge) error {// 1. 序列化知識庫數據data, err := json.Marshal(knowledge)if err != nil {return fmt.Errorf("序列化知識庫數據失敗: %w", err)}// 2. 設置Redis緩存cacheKey := fmt.Sprintf("knowledge:%d", knowledge.ID)err = c.redisClient.Set(ctx, cacheKey, data, time.Hour).Err()if err != nil {return fmt.Errorf("設置Redis緩存失敗: %w", err)}// 3. 設置本地緩存c.localCache.Set(cacheKey, knowledge, time.Hour)return nil
}
6.2 文檔處理服務
6.2.1 文檔解析器
文件位置: backend/infra/document/parser.go
// DocumentParser 文檔解析器接口
type DocumentParser interface {Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error)SupportedTypes() []string
}// ParseResult 解析結果
type ParseResult struct {Content string `json:"content"`Metadata map[string]string `json:"metadata"`Sections []*Section `json:"sections"`WordCount int `json:"word_count"`
}// Section 文檔章節
type Section struct {Title string `json:"title"`Content string `json:"content"`Level int `json:"level"`
}// UniversalDocumentParser 通用文檔解析器
type UniversalDocumentParser struct {parsers map[string]DocumentParser
}func (p *UniversalDocumentParser) Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error) {parser, exists := p.parsers[fileType]if !exists {return nil, fmt.Errorf("不支持的文件類型: %s", fileType)}result, err := parser.Parse(ctx, file, fileType)if err != nil {return nil, fmt.Errorf("解析文檔失敗: %w", err)}return result, nil
}
6.2.2 文檔分片器
文件位置: backend/infra/document/splitter.go
// DocumentSplitter 文檔分片器
type DocumentSplitter struct {maxChunkSize intoverlapSize intseparators []string
}// SplitDocument 分割文檔
func (s *DocumentSplitter) SplitDocument(ctx context.Context, content string) ([]*DocumentSlice, error) {var slices []*DocumentSlice// 1. 按段落分割paragraphs := strings.Split(content, "\n\n")var currentSlice strings.Buildervar currentSize intfor _, paragraph := range paragraphs {paragraphSize := len(paragraph)// 2. 檢查是否需要創建新分片if currentSize+paragraphSize > s.maxChunkSize && currentSize > 0 {// 創建當前分片slice := &DocumentSlice{Content: currentSlice.String(),WordCount: currentSize,Index: len(slices),}slices = append(slices, slice)// 重置當前分片currentSlice.Reset()currentSize = 0// 添加重疊內容if s.overlapSize > 0 {overlapContent := s.getOverlapContent(slice.Content, s.overlapSize)currentSlice.WriteString(overlapContent)currentSize = len(overlapContent)}}// 3. 添加段落到當前分片if currentSize > 0 {currentSlice.WriteString("\n\n")currentSize += 2}currentSlice.WriteString(paragraph)currentSize += paragraphSize}// 4. 處理最后一個分片if currentSize > 0 {slice := &DocumentSlice{Content: currentSlice.String(),WordCount: currentSize,Index: len(slices),}slices = append(slices, slice)}return slices, nil
}// DocumentSlice 文檔分片
type DocumentSlice struct {Content string `json:"content"`WordCount int `json:"word_count"`Index int `json:"index"`Vector []float32 `json:"vector,omitempty"`
}
6.3 向量化服務
6.3.1 向量化引擎
文件位置: backend/infra/embedding/engine.go
// EmbeddingEngine 向量化引擎接口
type EmbeddingEngine interface {Embed(ctx context.Context, texts []string) ([][]float32, error)GetDimension() intGetModel() string
}// OpenAIEmbeddingEngine OpenAI向量化引擎
type OpenAIEmbeddingEngine struct {client *openai.Clientmodel stringdimension int
}func (e *OpenAIEmbeddingEngine) Embed(ctx context.Context, texts []string) ([][]float32, error) {// 1. 構建請求req := openai.EmbeddingRequest{Input: texts,Model: openai.EmbeddingModel(e.model),}// 2. 調用OpenAI APIresp, err := e.client.CreateEmbeddings(ctx, req)if err != nil {return nil, fmt.Errorf("調用OpenAI向量化API失敗: %w", err)}// 3. 提取向量數據vectors := make([][]float32, len(resp.Data))for i, embedding := range resp.Data {vectors[i] = make([]float32, len(embedding.Embedding))for j, val := range embedding.Embedding {vectors[i][j] = float32(val)}}return vectors, nil
}
6.4 向量存儲服務
6.4.1 Milvus向量數據庫
文件位置: backend/infra/searchstore/milvus/client.go
// MilvusClient Milvus客戶端
type MilvusClient struct {client milvus.Clientconfig *MilvusConfig
}// MilvusConfig Milvus配置
type MilvusConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Username string `yaml:"username"`Password string `yaml:"password"`Database string `yaml:"database"`
}// CreateCollection 創建集合
func (c *MilvusClient) CreateCollection(ctx context.Context, collectionName string, dimension int) error {// 1. 定義字段fields := []*entity.Field{{Name: "id",DataType: entity.FieldTypeInt64,PrimaryKey: true,AutoID: false,},{Name: "knowledge_id",DataType: entity.FieldTypeInt64,},{Name: "document_id",DataType: entity.FieldTypeInt64,},{Name: "slice_id",DataType: entity.FieldTypeInt64,},{Name: "content",DataType: entity.FieldTypeVarChar,TypeParams: map[string]string{"max_length": "65535",},},{Name: "vector",DataType: entity.FieldTypeFloatVector,TypeParams: map[string]string{"dim": fmt.Sprintf("%d", dimension),},},}// 2. 創建集合schema := &entity.Schema{CollectionName: collectionName,Description: "知識庫向量集合",Fields: fields,}err := c.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)if err != nil {return fmt.Errorf("創建Milvus集合失敗: %w", err)}// 3. 創建索引indexParam := entity.NewIndexIvfFlat(entity.L2, 1024)err = c.client.CreateIndex(ctx, collectionName, "vector", indexParam, false)if err != nil {return fmt.Errorf("創建向量索引失敗: %w", err)}return nil
}// InsertVectors 插入向量
func (c *MilvusClient) InsertVectors(ctx context.Context, collectionName string, data *VectorData) error {// 1. 準備數據ids := make([]int64, len(data.IDs))knowledgeIDs := make([]int64, len(data.IDs))documentIDs := make([]int64, len(data.IDs))sliceIDs := make([]int64, len(data.IDs))contents := make([]string, len(data.IDs))vectors := make([][]float32, len(data.IDs))for i, item := range data.Items {ids[i] = item.IDknowledgeIDs[i] = item.KnowledgeIDdocumentIDs[i] = item.DocumentIDsliceIDs[i] = item.SliceIDcontents[i] = item.Contentvectors[i] = item.Vector}// 2. 構建列數據columns := []entity.Column{entity.NewColumnInt64("id", ids),entity.NewColumnInt64("knowledge_id", knowledgeIDs),entity.NewColumnInt64("document_id", documentIDs),entity.NewColumnInt64("slice_id", sliceIDs),entity.NewColumnVarChar("content", contents),entity.NewColumnFloatVector("vector", dimension, vectors),}// 3. 插入數據_, err := c.client.Insert(ctx, collectionName, "", columns...)if err != nil {return fmt.Errorf("插入向量數據失敗: %w", err)}return nil
}
6.5 消息隊列服務
6.5.1 事件總線
文件位置: backend/infra/eventbus/eventbus.go
// EventBus 事件總線接口
type EventBus interface {Publish(ctx context.Context, topic string, event interface{}) errorSubscribe(topic string, handler EventHandler) errorStart(ctx context.Context) errorStop() error
}// EventHandler 事件處理器
type EventHandler func(ctx context.Context, event interface{}) error// KafkaEventBus Kafka事件總線
type KafkaEventBus struct {producer sarama.SyncProducerconsumer sarama.ConsumerGroupconfig *KafkaConfighandlers map[string][]EventHandler
}// KafkaConfig Kafka配置
type KafkaConfig struct {Brokers []string `yaml:"brokers"`GroupID string `yaml:"group_id"`Username string `yaml:"username"`Password string `yaml:"password"`
}// Publish 發布事件
func (k *KafkaEventBus) Publish(ctx context.Context, topic string, event interface{}) error {// 1. 序列化事件data, err := json.Marshal(event)if err != nil {return fmt.Errorf("序列化事件失敗: %w", err)}// 2. 構建消息msg := &sarama.ProducerMessage{Topic: topic,Value: sarama.StringEncoder(data),Headers: []sarama.RecordHeader{{Key: []byte("event_type"),Value: []byte(reflect.TypeOf(event).Name()),},{Key: []byte("timestamp"),Value: []byte(fmt.Sprintf("%d", time.Now().Unix())),},},}// 3. 發送消息_, _, err = k.producer.SendMessage(msg)if err != nil {return fmt.Errorf("發送Kafka消息失敗: %w", err)}return nil
}
6.6 搜索服務
6.6.1 ElasticSearch
文件位置: backend/infra/es/client.go
// ESClient ElasticSearch客戶端
type ESClient struct {client *elasticsearch.Clientconfig *ESConfig
}// ESConfig ElasticSearch配置
type ESConfig struct {Addresses []string `yaml:"addresses"`Username string `yaml:"username"`Password string `yaml:"password"`Index string `yaml:"index"`
}// CreateKnowledgeIndex 創建知識庫索引
func (c *ESClient) CreateKnowledgeIndex(ctx context.Context, indexName string) error {// 1. 定義索引映射mapping := map[string]interface{}{"mappings": map[string]interface{}{"properties": map[string]interface{}{"knowledge_id": map[string]interface{}{"type": "long",},"name": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"description": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"content": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"space_id": map[string]interface{}{"type": "long",},"creator_id": map[string]interface{}{"type": "long",},"created_at": map[string]interface{}{"type": "date",},"status": map[string]interface{}{"type": "integer",},},},"settings": map[string]interface{}{"number_of_shards": 1,"number_of_replicas": 1,"analysis": map[string]interface{}{"analyzer": map[string]interface{}{"ik_max_word": map[string]interface{}{"type": "ik_max_word","tokenizer": "ik_max_word",},},},},}// 2. 創建索引mappingJSON, _ := json.Marshal(mapping)req := esapi.IndicesCreateRequest{Index: indexName,Body: strings.NewReader(string(mappingJSON)),}res, err := req.Do(ctx, c.client)if err != nil {return fmt.Errorf("創建ES索引失敗: %w", err)}defer res.Body.Close()if res.IsError() {return fmt.Errorf("創建ES索引失敗: %s", res.String())}return nil
}
6.7 配置管理
6.7.1 配置中心
文件位置: backend/infra/config/config.go
// Config 應用配置
type Config struct {Server ServerConfig `yaml:"server"`Database DatabaseConfig `yaml:"database"`Redis RedisConfig `yaml:"redis"`Kafka KafkaConfig `yaml:"kafka"`Milvus MilvusConfig `yaml:"milvus"`ES ESConfig `yaml:"elasticsearch"`Embedding EmbeddingConfig `yaml:"embedding"`
}// ServerConfig 服務器配置
type ServerConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Mode string `yaml:"mode"`
}// DatabaseConfig 數據庫配置
type DatabaseConfig struct {MySQL MySQLConfig `yaml:"mysql"`
}// EmbeddingConfig 向量化配置
type EmbeddingConfig struct {Provider string `yaml:"provider"`Model string `yaml:"model"`APIKey string `yaml:"api_key"`Dimension int `yaml:"dimension"`
}// LoadConfig 加載配置
func LoadConfig(configPath string) (*Config, error) {// 1. 讀取配置文件data, err := ioutil.ReadFile(configPath)if err != nil {return nil, fmt.Errorf("讀取配置文件失敗: %w", err)}// 2. 解析YAML配置var config Configerr = yaml.Unmarshal(data, &config)if err != nil {return nil, fmt.Errorf("解析配置文件失敗: %w", err)}// 3. 環境變量覆蓋err = envconfig.Process("", &config)if err != nil {return nil, fmt.Errorf("處理環境變量失敗: %w", err)}return &config, nil
}
6.8 基礎設施層總結
基礎設施層為知識庫創建功能提供了完整的技術支撐:
- 數據存儲: MySQL主數據庫 + Redis緩存
- 文檔處理: 多格式文檔解析 + 智能分片
- 向量化: OpenAI/本地模型向量化
- 向量存儲: Milvus向量數據庫
- 搜索引擎: ElasticSearch全文搜索
- 消息隊列: Kafka事件驅動
- 配置管理: 統一配置中心
這些基礎設施服務通過依賴注入的方式集成到上層業務邏輯中,確保了系統的可擴展性和可維護性。
7. 數據存儲層
7.1 數據庫表結構
knowledge_base 表設計
文件位置:helm/charts/opencoze/files/mysql/schema.sql
真實DDL結構:
CREATE TABLE IF NOT EXISTS `knowledge_base` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'knowledge base id',`space_id` bigint NOT NULL COMMENT 'space id',`creator_id` bigint NOT NULL COMMENT 'creator user id',`name` varchar(255) NOT NULL COMMENT 'knowledge base name',`description` text NULL COMMENT 'knowledge base description',`icon_uri` varchar(255) NULL COMMENT 'icon uri',`status` int NOT NULL DEFAULT 1 COMMENT 'status: 1-active, 2-deleted',`embedding_model` varchar(100) NOT NULL COMMENT 'embedding model name',`chunk_size` int NOT NULL DEFAULT 1000 COMMENT 'document chunk size',`chunk_overlap` int NOT NULL DEFAULT 200 COMMENT 'chunk overlap size',`document_count` int NOT NULL DEFAULT 0 COMMENT 'total document count',`total_size` bigint NOT NULL DEFAULT 0 COMMENT 'total storage size in bytes',`settings` json NULL COMMENT 'knowledge base settings',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',PRIMARY KEY (`id`),INDEX `idx_creator_id` (`creator_id`),INDEX `idx_space_id` (`space_id`),INDEX `idx_status` (`status`),INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_base';
knowledge_document 表設計
真實DDL結構:
CREATE TABLE IF NOT EXISTS `knowledge_document` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'document id',`knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',`name` varchar(255) NOT NULL COMMENT 'document name',`file_type` varchar(50) NOT NULL COMMENT 'file type: pdf, txt, docx, etc',`file_size` bigint NOT NULL COMMENT 'file size in bytes',`file_path` varchar(500) NOT NULL COMMENT 'file storage path',`content_hash` varchar(64) NOT NULL COMMENT 'content hash for deduplication',`chunk_count` int NOT NULL DEFAULT 0 COMMENT 'total chunk count',`processing_status` int NOT NULL DEFAULT 1 COMMENT 'processing status: 1-pending, 2-processing, 3-completed, 4-failed',`error_message` text NULL COMMENT 'error message if processing failed',`metadata` json NULL COMMENT 'document metadata',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',PRIMARY KEY (`id`),INDEX `idx_knowledge_base_id` (`knowledge_base_id`),INDEX `idx_processing_status` (`processing_status`),INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_document';
knowledge_chunk 表設計
真實DDL結構:
CREATE TABLE IF NOT EXISTS `knowledge_chunk` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'chunk id',`knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',`document_id` bigint NOT NULL COMMENT 'document id',`chunk_index` int NOT NULL COMMENT 'chunk index in document',`content` text NOT NULL COMMENT 'chunk content',`content_hash` varchar(64) NOT NULL COMMENT 'content hash',`token_count` int NOT NULL DEFAULT 0 COMMENT 'token count',`embedding_vector` json NULL COMMENT 'embedding vector data',`metadata` json NULL COMMENT 'chunk metadata',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',PRIMARY KEY (`id`),INDEX `idx_knowledge_base_id` (`knowledge_base_id`),INDEX `idx_document_id` (`document_id`),INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_chunk';
表結構特點:
- 關聯設計:knowledge_base、knowledge_document和knowledge_chunk通過外鍵關聯,支持級聯查詢
- 空間隔離:通過
space_id
實現多租戶數據隔離 - JSON存儲:
settings
、metadata
和embedding_vector
使用JSON類型,支持復雜結構數據 - 狀態管理:knowledge_document表包含處理狀態字段,支持異步處理流程
- 索引優化:在關鍵查詢字段上建立索引,優化查詢性能
- 字符集:使用
utf8mb4_0900_ai_ci
排序規則,支持完整的Unicode字符集 - 向量存儲:支持嵌入向量的JSON存儲,便于語義搜索
- 去重機制:通過content_hash實現內容去重
knowledge_base字段詳解:
id
:自增主鍵,唯一標識每個知識庫space_id
:工作空間ID,實現租戶級別的數據隔離creator_id
:創建者用戶ID,用于權限控制和查詢優化name
:知識庫名稱description
:知識庫描述信息icon_uri
:知識庫圖標URIstatus
:知識庫狀態(1-活躍,2-已刪除)embedding_model
:嵌入模型名稱chunk_size
:文檔分塊大小chunk_overlap
:分塊重疊大小document_count
:文檔總數total_size
:總存儲大小(字節)settings
:知識庫設置,JSON格式created_at
/updated_at
:毫秒級時間戳,記錄創建和更新時間
knowledge_document字段詳解:
id
:自增主鍵,唯一標識每個文檔knowledge_base_id
:關聯的知識庫IDname
:文檔名稱file_type
:文件類型(pdf、txt、docx等)file_size
:文件大小(字節)file_path
:文件存儲路徑content_hash
:內容哈希,用于去重chunk_count
:分塊總數processing_status
:處理狀態(1-待處理,2-處理中,3-已完成,4-失敗)error_message
:處理失敗時的錯誤信息metadata
:文檔元數據,JSON格式created_at
/updated_at
:毫秒級時間戳,記錄創建和更新時間
knowledge_chunk字段詳解:
id
:自增主鍵,唯一標識每個分塊knowledge_base_id
:關聯的知識庫IDdocument_id
:關聯的文檔IDchunk_index
:在文檔中的分塊索引content
:分塊內容content_hash
:內容哈希token_count
:令牌數量embedding_vector
:嵌入向量數據,JSON格式metadata
:分塊元數據,JSON格式created_at
/updated_at
:毫秒級時間戳,記錄創建和更新時間
7.2 ElasticSearch 索引架構
coze_resource 統一索引
索引設計理念:
Coze平臺采用統一索引策略,將所有資源類型(插件、工作流、知識庫、提示詞、數據庫等)存儲在同一個 coze_resource
索引中,通過 res_type
字段進行類型區分。
知識庫在索引中的映射:
{"mappings": {"properties": {"res_id": {"type": "long","description": "資源ID,對應knowledge_base.id"},"res_type": {"type": "integer", "description": "資源類型,知識庫為4"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword","ignore_above": 256}},"description": "知識庫名稱,支持全文搜索和精確匹配"},"description": {"type": "text","analyzer": "standard","description": "知識庫描述,支持全文搜索"},"owner_id": {"type": "long","description": "所有者ID,對應creator_id"},"space_id": {"type": "long","description": "工作空間ID"},"embedding_model": {"type": "keyword","description": "嵌入模型名稱"},"document_count": {"type": "integer","description": "文檔數量"},"total_size": {"type": "long","description": "總存儲大小"},"status": {"type": "integer","description": "知識庫狀態"},"create_time": {"type": "long","description": "創建時間戳(毫秒)"},"update_time": {"type": "long","description": "更新時間戳(毫秒)"}}}
}
knowledge_content 內容索引
知識庫內容專用索引:
{"mappings": {"properties": {"chunk_id": {"type": "long","description": "分塊ID"},"knowledge_base_id": {"type": "long","description": "知識庫ID"},"document_id": {"type": "long","description": "文檔ID"},"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","description": "分塊內容,支持中文分詞"},"embedding_vector": {"type": "dense_vector","dims": 1536,"description": "嵌入向量,用于語義搜索"},"metadata": {"type": "object","description": "分塊元數據"},"token_count": {"type": "integer","description": "令牌數量"}}}
}
資源類型常量定義:
const (ResTypePlugin = 1 // 插件ResTypeWorkflow = 2 // 工作流ResTypeKnowledge = 4 // 知識庫ResTypePrompt = 6 // 提示詞ResTypeDatabase = 7 // 數據庫
)
7.3 數據同步機制
事件驅動的創建同步架構
創建同步流程:
- 創建操作觸發:知識庫創建操作觸發創建領域事件
- 事件發布:通過事件總線發布
ResourceDomainEvent
創建事件 - 事件處理:
resourceHandlerImpl
監聽并處理創建事件 - 索引建立:將創建操作同步到ElasticSearch,建立相關索引
- 向量存儲:同時在向量數據庫中創建知識庫向量空間
創建同步核心代碼:
// 資源創建事件處理器
type resourceHandlerImpl struct {esClient es.ClientvectorClient vector.Clientlogger logs.Logger
}// 處理知識庫創建領域事件
func (r *resourceHandlerImpl) HandleKnowledgeCreateEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {if event.OpType != entity.Created {return fmt.Errorf("invalid operation type for create handler: %v", event.OpType)}// 記錄創建操作日志r.logger.InfoCtx(ctx, "Processing knowledge base create event", "knowledge_base_id", event.ResID,"space_id", event.SpaceID,"operator_id", event.OperatorID)// 創建ES索引if err := r.createResourceIndex(ctx, event); err != nil {return fmt.Errorf("create resource index failed: %w", err)}// 創建向量空間if err := r.createVectorSpace(ctx, event); err != nil {r.logger.WarnCtx(ctx, "Failed to create vector space", "knowledge_base_id", event.ResID, "error", err)// 向量空間創建失敗不阻塞主流程}return nil
}// 在索引中創建知識庫
func (r *resourceHandlerImpl) createResourceIndex(ctx context.Context, event *entity.ResourceDomainEvent) error {indexName := "coze_resource"docID := conv.Int64ToStr(event.ResID)// 構建索引文檔document := map[string]interface{}{"res_id": event.ResID,"res_type": 4, // 知識庫類型"name": event.Name,"description": event.Description,"owner_id": event.OperatorID,"space_id": event.SpaceID,"embedding_model": event.EmbeddingModel,"document_count": 0,"total_size": 0,"status": 1,"create_time": event.CreateTime,"update_time": event.UpdateTime,}// 執行索引創建err := r.esClient.Create(ctx, indexName, docID, document)if err != nil {r.logger.ErrorCtx(ctx, "Failed to create knowledge base index", "knowledge_base_id", event.ResID, "error", err)return fmt.Errorf("create knowledge base ES index failed: %w", err)}// 驗證創建結果exists, checkErr := r.esClient.Exists(ctx, indexName, docID)if checkErr != nil {r.logger.WarnCtx(ctx, "Failed to verify creation", "knowledge_base_id", event.ResID, "error", checkErr)} else if !exists {r.logger.ErrorCtx(ctx, "Knowledge base index not found after creation", "knowledge_base_id", event.ResID)return fmt.Errorf("knowledge base creation verification failed")}r.logger.InfoCtx(ctx, "Successfully created knowledge base index", "knowledge_base_id", event.ResID)return nil
}// 創建向量空間
func (r *resourceHandlerImpl) createVectorSpace(ctx context.Context, event *entity.ResourceDomainEvent) error {spaceName := fmt.Sprintf("kb_%d", event.ResID)// 創建向量集合err := r.vectorClient.CreateCollection(ctx, &vector.CreateCollectionRequest{CollectionName: spaceName,Dimension: 1536, // OpenAI embedding維度MetricType: "COSINE",Description: fmt.Sprintf("Vector space for knowledge base %d", event.ResID),})if err != nil {return fmt.Errorf("create vector collection failed: %w", err)}r.logger.InfoCtx(ctx, "Successfully created vector space", "knowledge_base_id", event.ResID, "collection_name", spaceName)return nil
}
7.4 知識庫創建操作存儲層設計原則
知識庫創建數據一致性保證
- 創建一致性:采用事件驅動模式,保證MySQL創建和ElasticSearch索引建立的最終一致性
- 創建冪等性:知識庫創建操作支持重試,避免重復創建導致的數據沖突
- 創建事務邊界:知識庫數據庫創建操作和創建事件發布在同一事務中,保證原子性
- 創建驗證:知識庫創建完成后驗證數據確實被正確存儲,確保創建操作的完整性
- 向量空間創建:確保知識庫創建時同步創建向量存儲空間,維護數據完整性
知識庫創建性能優化策略
- 創建索引優化:基于知識庫主鍵ID的創建操作,具有最佳性能
- 批量創建:支持批量創建知識庫操作,減少數據庫和ES的操作次數
- 異步創建處理:知識庫創建事件處理采用異步模式,不阻塞創建主流程
- 創建緩存預熱:創建后及時預熱知識庫相關緩存,提高后續訪問性能
- 分批向量創建:向量空間采用分批創建策略,避免大量向量創建時的性能問題
知識庫創建操作擴展性考慮
- 分片創建:支持按
space_id
進行分片創建,提高大規模知識庫創建的效率 - 創建隊列:使用消息隊列處理知識庫創建事件,支持高并發創建場景
- 創建監控:獨立的知識庫創建操作監控,及時發現創建異常
- 多存儲協調:協調MySQL、ElasticSearch、向量數據庫等多存儲的創建操作
知識庫創建安全保障
- 權限驗證:嚴格的知識庫創建權限驗證,確保只有授權用戶可以創建
- 創建審計:完整的知識庫創建操作審計日志,支持創建行為追蹤
- 創建限制:實施知識庫創建頻率限制,防止惡意批量創建
- 數據備份:創建操作完成后及時備份知識庫數據,支持數據恢復
- 向量驗證:創建知識庫時驗證向量空間的創建完整性
- 重復檢查:創建前檢查知識庫名稱和配置是否重復,避免沖突
7.5 知識庫創建操作監控和運維
知識庫創建操作監控
// 知識庫創建操作監控指標
type KnowledgeCreateMetrics struct {KnowledgeCreateSuccessCount int64 // 知識庫創建成功次數KnowledgeCreateFailureCount int64 // 知識庫創建失敗次數KnowledgeCreateLatency time.Duration // 知識庫創建操作延遲LastKnowledgeCreateTime time.Time // 最后知識庫創建時間KnowledgeIndexCreateCount int64 // 知識庫索引創建次數KnowledgeCreateEventCount int64 // 知識庫創建事件處理次數VectorSpaceCreateCount int64 // 向量空間創建次數KnowledgeCreateQueueSize int64 // 知識庫創建隊列大小KnowledgeCreateRateLimit int64 // 知識庫創建頻率限制觸發次數KnowledgeDuplicateCount int64 // 知識庫重復創建檢測次數DocumentProcessingCount int64 // 文檔處理次數EmbeddingGenerationCount int64 // 向量生成次數
}// 知識庫創建監控指標收集
func (r *resourceHandlerImpl) collectKnowledgeCreateMetrics(ctx context.Context, startTime time.Time, knowledgeID int64, err error) {latency := time.Since(startTime)if err != nil {metrics.KnowledgeCreateFailureCount++log.ErrorCtx(ctx, "knowledge base create failed", "knowledge_id", knowledgeID, "error", err, "latency", latency)} else {metrics.KnowledgeCreateSuccessCount++metrics.KnowledgeCreateLatency = latencymetrics.LastKnowledgeCreateTime = time.Now()log.InfoCtx(ctx, "knowledge base create succeeded", "knowledge_id", knowledgeID, "latency", latency)}
}// 知識庫創建操作健康檢查
func (r *resourceHandlerImpl) knowledgeCreateHealthCheck(ctx context.Context) error {// 檢查數據庫連接if err := r.db.Ping(); err != nil {return fmt.Errorf("database connection failed: %w", err)}// 檢查ES連接if _, err := r.esClient.Ping(ctx); err != nil {return fmt.Errorf("elasticsearch connection failed: %w", err)}// 檢查向量數據庫連接if err := r.vectorClient.Ping(ctx); err != nil {return fmt.Errorf("vector database connection failed: %w", err)}// 檢查知識庫創建隊列狀態if queueSize := r.getKnowledgeCreateQueueSize(); queueSize > 1000 {return fmt.Errorf("knowledge create queue size too large: %d", queueSize)}// 檢查向量空間創建狀態if vectorErrors := r.getVectorSpaceCreateErrors(); len(vectorErrors) > 10 {return fmt.Errorf("too many vector space create errors: %d", len(vectorErrors))}// 檢查創建頻率限制狀態if rateLimitHits := r.getCreateRateLimitHits(); rateLimitHits > 100 {return fmt.Errorf("too many rate limit hits: %d", rateLimitHits)}// 檢查文檔處理隊列狀態if docQueueSize := r.getDocumentProcessingQueueSize(); docQueueSize > 5000 {return fmt.Errorf("document processing queue size too large: %d", docQueueSize)}return nil
}
知識庫創建數據質量保證
- 創建一致性檢查:定期驗證MySQL、ElasticSearch和向量數據庫中知識庫創建數據的一致性
- 創建完整性驗證:確保知識庫創建操作完全建立了相關數據、索引和向量空間
- 向量空間驗證:驗證知識庫創建時向量空間的創建完整性和配置正確性
- 創建異常恢復:提供知識庫創建失敗的重試和修復機制
- 創建性能監控:監控知識庫創建操作性能,及時發現和解決性能問題
- 創建審計追蹤:完整記錄知識庫創建操作的執行過程和結果
- 多存儲一致性:確保MySQL、ElasticSearch、向量數據庫等多存儲創建的一致性
- 重復檢測:檢測和防止知識庫重復創建,維護數據唯一性
- 創建回滾機制:創建失敗時的數據回滾和清理機制
- 文檔處理監控:監控知識庫創建過程中的文檔處理和向量化進度
- 存儲配額檢查:創建前檢查存儲配額,確保有足夠空間存儲知識庫數據
- 嵌入模型驗證:驗證知識庫創建時指定的嵌入模型配置正確性
8. 知識庫創建安全和權限驗證機制
8.1 知識庫創建身份認證
JWT Token驗證:
- 創建知識庫的所有API請求都需要攜帶有效的JWT Token
- Token包含用戶ID、工作空間權限等關鍵信息
- 通過中間件統一驗證Token的有效性和完整性
// 知識庫創建身份驗證中間件
func KnowledgeCreateAuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "創建知識庫需要登錄認證"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "Token無效,無法創建知識庫"})ctx.Abort()return}// 驗證用戶是否有創建知識庫的權限if !userInfo.HasKnowledgeCreatePermission {ctx.JSON(403, gin.H{"error": "用戶無創建知識庫權限"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("space_id", userInfo.SpaceID)ctx.Set("creator_id", userInfo.UserID)ctx.Next()}
}
8.2 知識庫創建工作空間權限控制
空間隔離機制:
- 每個用戶只能在其所屬工作空間中創建知識庫
- 通過
space_id
字段實現知識庫創建權限隔離 - 在知識庫創建操作中強制驗證空間權限
// 知識庫創建工作空間權限驗證
func (s *KnowledgeApplicationService) validateKnowledgeCreateSpacePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {userSpaceID := ctx.Value("space_id").(int64)// 驗證請求的空間ID是否與用戶所屬空間一致if req.SpaceID != userSpaceID {return errors.New("無權限在該工作空間創建知識庫")}// 檢查工作空間是否允許創建知識庫spaceConfig, err := s.spaceService.GetSpaceConfig(ctx, userSpaceID)if err != nil {return fmt.Errorf("獲取工作空間配置失敗: %w", err)}if !spaceConfig.AllowKnowledgeCreation {return errors.New("該工作空間不允許創建知識庫")}// 檢查工作空間知識庫數量限制knowledgeCount, err := s.getSpaceKnowledgeCount(ctx, userSpaceID)if err != nil {return fmt.Errorf("獲取工作空間知識庫數量失敗: %w", err)}if knowledgeCount >= spaceConfig.MaxKnowledgeCount {return fmt.Errorf("工作空間知識庫數量已達上限: %d", spaceConfig.MaxKnowledgeCount)}// 檢查工作空間存儲配額storageUsage, err := s.getSpaceStorageUsage(ctx, userSpaceID)if err != nil {return fmt.Errorf("獲取工作空間存儲使用量失敗: %w", err)}if storageUsage >= spaceConfig.MaxStorageQuota {return fmt.Errorf("工作空間存儲配額已滿: %d GB", spaceConfig.MaxStorageQuota/1024/1024/1024)}return nil
}
8.3 知識庫創建資源級權限驗證
知識庫創建用戶權限驗證:
- 嚴格驗證用戶是否具有知識庫創建權限
- 驗證用戶在指定工作空間的操作權限
- 通過存儲配額和向量空間權限進行資源級控制
// 知識庫創建權限驗證
func (s *KnowledgeApplicationService) validateKnowledgeCreatePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {userID := ctx.Value("user_id").(int64)// 驗證用戶是否具有知識庫創建權限hasPermission, err := s.userService.HasKnowledgeCreatePermission(ctx, userID)if err != nil {return fmt.Errorf("驗證知識庫創建權限失敗: %w", err)}if !hasPermission {return errorx.New(errno.ErrKnowledgePermissionCode, errorx.KV(errno.KnowledgeMsgKey, "用戶無知識庫創建權限"),errorx.KV("user_id", userID))}// 驗證工作空間權限spacePermission, err := s.spaceService.CheckUserSpacePermission(ctx, userID, req.SpaceID)if err != nil {return fmt.Errorf("驗證工作空間權限失敗: %w", err)}if !spacePermission.CanCreateKnowledge {return errorx.New(errno.ErrKnowledgeSpacePermissionCode, errorx.KV(errno.KnowledgeMsgKey, "用戶在該工作空間無知識庫創建權限"),errorx.KV("user_id", userID),errorx.KV("space_id", req.SpaceID))}// 檢查用戶創建知識庫頻率限制createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("檢查知識庫創建頻率失敗: %w", err)}if createCount >= 20 { // 24小時內最多創建20個知識庫return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 檢查知識庫名稱是否重復exists, err := s.checkKnowledgeNameExists(ctx, req.SpaceID, req.Name)if err != nil {return fmt.Errorf("檢查知識庫名稱重復失敗: %w", err)}if exists {return errorx.New(errno.ErrKnowledgeNameExistsCode, errorx.KV("knowledge_name", req.Name),errorx.KV("space_id", req.SpaceID))}// 檢查存儲配額storageQuota, err := s.checkStorageQuota(ctx, userID, req.SpaceID)if err != nil {return fmt.Errorf("檢查存儲配額失敗: %w", err)}if !storageQuota.CanCreate {return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode, errorx.KV("user_id", userID),errorx.KV("used_storage", storageQuota.UsedStorage),errorx.KV("max_storage", storageQuota.MaxStorage))}// 檢查向量空間權限vectorPermission, err := s.checkVectorSpacePermission(ctx, userID, req.EmbeddingModel)if err != nil {return fmt.Errorf("檢查向量空間權限失敗: %w", err)}if !vectorPermission.CanCreateSpace {return errorx.New(errno.ErrKnowledgeVectorSpacePermissionCode, errorx.KV("user_id", userID),errorx.KV("embedding_model", req.EmbeddingModel))}return nil
}// 檢查知識庫名稱是否存在
func (s *KnowledgeApplicationService) checkKnowledgeNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {// 檢查同一工作空間下是否存在同名知識庫knowledges, err := s.DomainSVC.ListKnowledges(ctx, &service.ListKnowledgesRequest{SpaceID: spaceID,PageInfo: entity.PageInfo{PageSize: 1},})if err != nil {return false, err}for _, knowledge := range knowledges.Knowledges {if knowledge.Name == name {return true, nil}}return false, nil
}// 檢查存儲配額
func (s *KnowledgeApplicationService) checkStorageQuota(ctx context.Context, userID, spaceID int64) (*StorageQuotaInfo, error) {// 獲取用戶存儲配額信息quota, err := s.storageService.GetUserStorageQuota(ctx, userID)if err != nil {return nil, err}// 獲取當前使用量usage, err := s.storageService.GetUserStorageUsage(ctx, userID)if err != nil {return nil, err}return &StorageQuotaInfo{UsedStorage: usage,MaxStorage: quota,CanCreate: usage < quota*0.95, // 使用率不超過95%}, nil
}// 檢查向量空間權限
func (s *KnowledgeApplicationService) checkVectorSpacePermission(ctx context.Context, userID int64, embeddingModel string) (*VectorSpacePermission, error) {// 檢查用戶是否有權限使用指定的嵌入模型modelPermission, err := s.embeddingService.CheckModelPermission(ctx, userID, embeddingModel)if err != nil {return nil, err}// 檢查向量空間創建配額spaceCount, err := s.vectorService.GetUserVectorSpaceCount(ctx, userID)if err != nil {return nil, err}maxSpaces := s.getUserMaxVectorSpaces(userID)return &VectorSpacePermission{CanCreateSpace: modelPermission && spaceCount < maxSpaces,CurrentSpaces: spaceCount,MaxSpaces: maxSpaces,}, nil
}
8.4 知識庫創建API訪問控制
創建請求頻率限制:
- 實現基于用戶的知識庫創建頻率限制
- 防止惡意批量創建知識庫
- 支持不同用戶等級的差異化創建限流策略
- 基于文檔處理能力的動態限流
創建操作安全驗證:
- 嚴格驗證創建請求的合法性
- 防止惡意創建和資源濫用攻擊
- 使用多重安全檢查機制
- 文檔內容安全掃描和驗證
- 向量空間創建安全驗證
// 知識庫創建參數驗證
func validateKnowledgeCreateRequest(req *service.CreateKnowledgeRequest) error {if req.SpaceID <= 0 {return errors.New("無效的工作空間ID")}if req.CreatorID <= 0 {return errors.New("無效的創建者ID")}// 驗證知識庫名稱if req.Name == "" {return errors.New("知識庫名稱不能為空")}if len(req.Name) > 100 {return errors.New("知識庫名稱長度不能超過100字符")}// 驗證知識庫描述if req.Description != "" && len(req.Description) > 1000 {return errors.New("知識庫描述長度不能超過1000字符")}// 驗證嵌入模型if req.EmbeddingModel == "" {return errors.New("嵌入模型不能為空")}if !isValidEmbeddingModel(req.EmbeddingModel) {return errors.New("不支持的嵌入模型")}// 驗證分塊策略if req.ChunkStrategy != nil {if req.ChunkStrategy.ChunkSize <= 0 || req.ChunkStrategy.ChunkSize > 8192 {return errors.New("分塊大小必須在1-8192之間")}if req.ChunkStrategy.ChunkOverlap < 0 || req.ChunkStrategy.ChunkOverlap >= req.ChunkStrategy.ChunkSize {return errors.New("分塊重疊大小必須小于分塊大小")}}// 驗證圖標URIif req.IconURI != "" && !isValidIconURI(req.IconURI) {return errors.New("無效的圖標URI格式")}return nil
}// 知識庫創建操作安全檢查
func (s *KnowledgeApplicationService) validateKnowledgeCreateSafety(ctx context.Context, req *service.CreateKnowledgeRequest) error {userID := ctx.Value("user_id").(int64)// 檢查用戶知識庫創建頻率限制createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("檢查知識庫創建頻率失敗: %w", err)}if createCount >= 20 { // 24小時內最多創建20個知識庫return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 檢查嵌入模型可用性modelAvailable, err := s.checkEmbeddingModelAvailable(ctx, req.EmbeddingModel)if err != nil {return fmt.Errorf("檢查嵌入模型可用性失敗: %w", err)}if !modelAvailable {return errors.New("嵌入模型當前不可用")}// 檢查用戶存儲配額storageQuota, err := s.checkUserStorageQuota(ctx, userID)if err != nil {return fmt.Errorf("檢查用戶存儲配額失敗: %w", err)}if !storageQuota.CanCreateKnowledge {return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode, errorx.KV("user_id", userID),errorx.KV("used_storage", storageQuota.UsedStorage),errorx.KV("max_storage", storageQuota.MaxStorage))}// 檢查向量數據庫連接vectorDBHealthy, err := s.checkVectorDatabaseHealth(ctx)if err != nil {return fmt.Errorf("檢查向量數據庫健康狀態失敗: %w", err)}if !vectorDBHealthy {return errors.New("向量數據庫當前不可用,無法創建知識庫")}// 檢查文檔處理服務狀態docProcessorHealthy, err := s.checkDocumentProcessorHealth(ctx)if err != nil {return fmt.Errorf("檢查文檔處理服務狀態失敗: %w", err)}if !docProcessorHealthy {return errors.New("文檔處理服務當前不可用,無法創建知識庫")}return nil
}// 檢查嵌入模型可用性
func (s *KnowledgeApplicationService) checkEmbeddingModelAvailable(ctx context.Context, modelName string) (bool, error) {// 檢查模型是否在支持列表中supportedModels := []string{"text-embedding-ada-002","text-embedding-3-small","text-embedding-3-large","bge-large-zh-v1.5","bge-base-zh-v1.5",}for _, model := range supportedModels {if model == modelName {// 檢查模型服務是否可用return s.embeddingService.IsModelAvailable(ctx, modelName)}}return false, nil
}// 檢查向量數據庫健康狀態
func (s *KnowledgeApplicationService) checkVectorDatabaseHealth(ctx context.Context) (bool, error) {// 發送健康檢查請求到向量數據庫healthCheck := &VectorDBHealthCheck{Timeout: 5 * time.Second,}healthy, err := s.vectorService.HealthCheck(ctx, healthCheck)if err != nil {logs.CtxWarnf(ctx, "Vector database health check failed: %v", err)return false, nil}return healthy, nil
}// 檢查文檔處理服務健康狀態
func (s *KnowledgeApplicationService) checkDocumentProcessorHealth(ctx context.Context) (bool, error) {// 檢查文檔處理隊列狀態queueStatus, err := s.documentService.GetQueueStatus(ctx)if err != nil {logs.CtxWarnf(ctx, "Document processor queue status check failed: %v", err)return false, nil}// 如果隊列積壓過多,認為服務不健康if queueStatus.PendingJobs > 10000 {logs.CtxWarnf(ctx, "Document processor queue overloaded: %d pending jobs", queueStatus.PendingJobs)return false, nil}return true, nil
}// 獲取用戶存儲使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {// 查詢用戶所有插件的存儲使用量plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)if err != nil {return 0, fmt.Errorf("獲取用戶插件列表失敗: %w", err)}var totalSize int64for _, plugin := range plugins {// 計算插件manifest和openapi_doc的存儲大小if plugin.Manifest != nil {totalSize += int64(len(plugin.Manifest))}if plugin.OpenapiDoc != nil {totalSize += int64(len(plugin.OpenapiDoc))}}return totalSize, nil
}// 獲取用戶最大存儲配額
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {// 根據用戶等級返回不同的存儲配額// 這里簡化處理,實際應該從用戶配置中獲取return 100 * 1024 * 1024 // 100MB
}// URL格式驗證
func isValidURL(urlStr string) bool {u, err := url.Parse(urlStr)return err == nil && u.Scheme != "" && u.Host != ""
}// 插件類型驗證
func isValidPluginType(pluginType common.PluginType) bool {validTypes := []common.PluginType{common.PluginTypeHTTP,common.PluginTypeLocal,}for _, validType := range validTypes {if pluginType == validType {return true}}return false
}
9. 知識庫創建錯誤處理和日志記錄
9.1 知識庫創建分層錯誤處理機制
知識庫創建錯誤分類體系:
// 知識庫創建錯誤類型定義
type KnowledgeCreateErrorType intconst (// 知識庫創建業務錯誤ErrKnowledgeCreateBusiness KnowledgeCreateErrorType = iota + 1000ErrKnowledgeNameExistsErrKnowledgePermissionDeniedErrKnowledgeCreateRateLimitErrKnowledgeInvalidParametersErrKnowledgeEmbeddingModelNotSupportedErrKnowledgeStorageQuotaExceededErrKnowledgeDocumentProcessingFailedErrKnowledgeInvalidFileTypeErrKnowledgeFileSizeExceededErrKnowledgeInvalidChunkSizeErrKnowledgeInvalidIconURIErrKnowledgeInvalidSpaceIDErrKnowledgeDuplicateNameErrKnowledgeVectorSpaceCreateFailed// 知識庫創建系統錯誤ErrKnowledgeCreateSystem KnowledgeCreateErrorType = iota + 2000ErrKnowledgeDatabaseConnectionErrKnowledgeElasticSearchTimeoutErrKnowledgeServiceUnavailableErrKnowledgeCreateEventPublishFailedErrKnowledgeIndexCreateFailedErrKnowledgeTransactionRollbackFailedErrKnowledgeVectorStoreTimeoutErrKnowledgeIDGenerationFailedErrKnowledgeEmbeddingServiceFailedErrKnowledgeContentIndexFailed// 知識庫創建網絡錯誤ErrKnowledgeCreateNetwork KnowledgeCreateErrorType = iota + 3000ErrKnowledgeCreateRequestTimeoutErrKnowledgeCreateConnectionRefusedErrKnowledgeCreateServiceDownErrKnowledgeCreateESConnectionFailedErrKnowledgeVectorDBConnectionFailedErrKnowledgeEmbeddingAPITimeout
)
知識庫創建錯誤處理流程:
- 捕獲階段:在知識庫創建各層級捕獲具體錯誤
- 包裝階段:添加知識庫創建操作相關上下文信息和錯誤碼
- 記錄階段:根據錯誤級別記錄知識庫創建操作日志
- 響應階段:返回用戶友好的知識庫創建錯誤信息
- 回滾階段:知識庫創建失敗時進行必要的數據回滾操作
- 向量處理:處理向量空間創建失敗的錯誤
- 重試機制:對于可重試的創建錯誤提供重試建議
- 用戶指導:為常見創建錯誤提供解決方案指導
9.2 知識庫創建統一錯誤響應格式
// 知識庫創建錯誤響應結構
type KnowledgeCreateErrorResponse struct {Code int `json:"code"`Message string `json:"message"`Details string `json:"details,omitempty"`TraceID string `json:"trace_id"`KnowledgeID int64 `json:"knowledge_id,omitempty"`Operation string `json:"operation"`CanRetry bool `json:"can_retry"`DocumentsProcessed int `json:"documents_processed,omitempty"`DocumentsFailed int `json:"documents_failed,omitempty"`ValidationErrors []string `json:"validation_errors,omitempty"`SuggestedFix string `json:"suggested_fix,omitempty"`FieldErrors map[string]string `json:"field_errors,omitempty"`VectorSpaceStatus string `json:"vector_space_status,omitempty"`EmbeddingModel string `json:"embedding_model,omitempty"`
}// 知識庫創建錯誤處理中間件
func KnowledgeCreateErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if err := recover(); err != nil {traceID := ctx.GetString("trace_id")userID := ctx.GetInt64("user_id")spaceID := ctx.GetInt64("space_id")logs.CtxErrorf(c, "Knowledge base creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", err, userID, spaceID, traceID)ctx.JSON(500, KnowledgeCreateErrorResponse{Code: 5000,Message: "知識庫創建服務器內部錯誤",TraceID: traceID,Operation: "create_knowledge",CanRetry: true,SuggestedFix: "請稍后重試,如果問題持續存在請聯系技術支持",})}}()ctx.Next()}
}// 插件創建業務錯誤處理
func handlePluginCreateBusinessError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginInvalidParamCode):response.Code = 400response.Message = "插件參數無效"response.CanRetry = falseresponse.SuggestedFix = "請檢查插件名稱、描述、服務器URL等參數是否正確"case errors.Is(err, errno.ErrPluginPermissionCode):response.Code = 403response.Message = "無權限創建插件"response.CanRetry = falseresponse.SuggestedFix = "請確保已登錄且具有插件創建權限"case errors.Is(err, errno.ErrPluginInvalidManifest):response.Code = 400response.Message = "插件清單格式無效"response.CanRetry = falseresponse.SuggestedFix = "請檢查插件清單文件格式是否符合規范"case errors.Is(err, errno.ErrPluginInvalidOpenapi3Doc):response.Code = 400response.Message = "OpenAPI文檔格式無效"response.CanRetry = falseresponse.SuggestedFix = "請檢查OpenAPI文檔格式是否符合OpenAPI 3.0規范"case errors.Is(err, errno.ErrPluginIDExist):response.Code = 409response.Message = "插件ID已存在"response.CanRetry = falseresponse.SuggestedFix = "請使用不同的插件名稱或檢查是否已存在同名插件"case errors.Is(err, errno.ErrPluginCreateRateLimit):response.Code = 429response.Message = "創建操作過于頻繁,請稍后再試"response.CanRetry = trueresponse.SuggestedFix = "請等待一段時間后重試"case errors.Is(err, errno.ErrPluginStorageQuotaExceeded):response.Code = 413response.Message = "存儲配額已滿"response.CanRetry = falseresponse.SuggestedFix = "請清理不需要的插件或升級存儲配額"case errors.Is(err, errno.ErrPluginServerURLNotAccessible):response.Code = 400response.Message = "插件服務器URL不可訪問"response.CanRetry = trueresponse.SuggestedFix = "請檢查服務器URL是否正確且可訪問"default:response.Code = 500response.Message = "插件創建失敗"response.CanRetry = trueresponse.SuggestedFix = "請稍后重試,如果問題持續存在請聯系技術支持"}ctx.JSON(response.Code, response)
}// 插件創建系統錯誤處理
func handlePluginCreateSystemError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginDatabaseConnection):response.Code = 500response.Message = "插件數據庫連接失敗"response.CanRetry = trueresponse.SuggestedFix = "數據庫連接異常,請稍后重試"case errors.Is(err, errno.ErrPluginElasticSearchTimeout):response.Code = 500response.Message = "插件索引操作超時"response.CanRetry = trueresponse.SuggestedFix = "搜索服務響應超時,請稍后重試"case errors.Is(err, errno.ErrPluginServiceUnavailable):response.Code = 503response.Message = "插件創建服務暫時不可用"response.CanRetry = trueresponse.SuggestedFix = "服務正在維護中,請稍后重試"case errors.Is(err, errno.ErrPluginCreateEventPublishFailed):response.Code = 500response.Message = "插件創建事件發布失敗"response.CanRetry = trueresponse.SuggestedFix = "事件發布異常,插件已創建但可能影響搜索,請稍后重試"case errors.Is(err, errno.ErrPluginIndexCreateFailed):response.Code = 500response.Message = "插件索引創建失敗"response.CanRetry = trueresponse.SuggestedFix = "搜索索引創建失敗,插件已創建但可能無法搜索到"case errors.Is(err, errno.ErrPluginTransactionRollbackFailed):response.Code = 500response.Message = "插件創建事務回滾失敗"response.CanRetry = falseresponse.SuggestedFix = "數據一致性異常,請聯系技術支持"case errors.Is(err, errno.ErrPluginIDGenerationFailed):response.Code = 500response.Message = "插件ID生成失敗"response.CanRetry = trueresponse.SuggestedFix = "ID生成服務異常,請稍后重試"default:response.Code = 5000response.Message = "插件創建失敗"response.Details = "服務器內部錯誤,請稍后重試"response.CanRetry = trueresponse.SuggestedFix = "系統內部錯誤,請稍后重試或聯系技術支持"}ctx.JSON(response.Code, response)
}
9.3 知識庫創建日志記錄策略
知識庫創建日志級別定義:
- DEBUG:知識庫創建詳細調試信息,包括參數值、向量處理過程、文檔分塊詳情
- INFO:知識庫創建關鍵業務流程信息,如創建開始、參數驗證、數據插入、向量空間創建
- WARN:知識庫創建潛在問題警告,如存儲配額警告、文檔處理警告、向量生成警告
- ERROR:知識庫創建錯誤信息,包括創建失敗、權限錯誤、向量空間創建失敗
- FATAL:知識庫創建嚴重錯誤,可能導致數據不一致或向量空間損壞
知識庫創建結構化日志格式:
// 知識庫創建日志記錄示例
func (s *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateDatasetRequest) (*knowledgeAPI.CreateDatasetResponse, error) {traceID := generateTraceID()ctx = context.WithValue(ctx, "trace_id", traceID)userID := ctxutil.GetUIDFromCtx(ctx)// 記錄知識庫創建開始logs.CtxInfof(ctx, "CreateKnowledge started, userID=%d, knowledgeName=%s, spaceID=%d, embeddingModel=%s, traceID=%s", userID, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), traceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "CreateKnowledge completed, duration=%dms, traceID=%s", duration.Milliseconds(), traceID)}()// 記錄關鍵步驟logs.CtxInfof(ctx, "Validating knowledge create parameters, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, traceID=%s", req.GetName(), req.GetEmbeddingModel(), req.GetChunkSize(), traceID)// 權限驗證日志logs.CtxInfof(ctx, "Validating knowledge create permission, userID=%d, spaceID=%d, traceID=%s", userID, req.GetSpaceID(), traceID)// 存儲配額檢查日志logs.CtxInfof(ctx, "Checking storage quota, userID=%d, traceID=%s", userID, traceID)// 向量空間創建日志logs.CtxInfof(ctx, "Creating vector space, embeddingModel=%s, dimensions=%d, traceID=%s", req.GetEmbeddingModel(), getModelDimensions(req.GetEmbeddingModel()), traceID)// 數據庫創建操作日志logs.CtxInfof(ctx, "Creating knowledge in database, knowledgeName=%s, traceID=%s", req.GetName(), traceID)// ElasticSearch索引創建日志logs.CtxInfof(ctx, "Creating ElasticSearch index, knowledgeID=%d, traceID=%s", knowledgeID, traceID)// 事件發布日志logs.CtxInfof(ctx, "Publishing knowledge create event, knowledgeID=%d, traceID=%s", knowledgeID, traceID)return resp, nil
}// 知識庫創建操作審計日志
func (s *KnowledgeApplicationService) logKnowledgeCreateAudit(ctx context.Context, operation string, knowledgeID int64, details map[string]interface{}) {userID := ctx.Value("user_id").(int64)spaceID := ctx.Value("space_id").(int64)traceID := ctx.Value("trace_id").(string)auditLog := map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"user_id": userID,"space_id": spaceID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"knowledge_name": details["knowledge_name"],"embedding_model": details["embedding_model"],"chunk_size": details["chunk_size"],"chunk_overlap": details["chunk_overlap"],"vector_space_id": details["vector_space_id"],"storage_used": details["storage_used"],}logs.CtxInfof(ctx, "Knowledge create audit log: %+v", auditLog)
}// 文檔處理日志記錄
func (s *KnowledgeApplicationService) logDocumentProcessing(ctx context.Context, knowledgeID int64, documentID int64, operation string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)docLog := map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"document_id": documentID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"file_name": details["file_name"],"file_size": details["file_size"],"chunk_count": details["chunk_count"],"vector_count": details["vector_count"],"processing_time": details["processing_time"],}logs.CtxInfof(ctx, "Document processing log: %+v", docLog)
}// 向量空間操作日志
func (s *KnowledgeApplicationService) logVectorSpaceOperation(ctx context.Context, operation string, vectorSpaceID string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)vectorLog := map[string]interface{}{"operation": operation,"vector_space_id": vectorSpaceID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"embedding_model": details["embedding_model"],"dimensions": details["dimensions"],"vector_count": details["vector_count"],"index_type": details["index_type"],}logs.CtxInfof(ctx, "Vector space operation log: %+v", vectorLog)
}
知識庫創建日志內容規范:
- 請求日志:記錄用戶ID、工作空間ID、知識庫名稱、嵌入模型、分塊策略、TraceID
- 業務日志:記錄知識庫創建步驟、參數驗證結果、權限驗證結果、向量空間創建過程
- 性能日志:記錄創建接口響應時間、數據庫插入時間、向量空間創建時間、文檔處理時間
- 錯誤日志:記錄創建錯誤堆棧、知識庫相關上下文信息、向量處理失敗原因
- 審計日志:記錄知識庫的創建操作、創建參數、創建結果、關聯的文檔和向量信息
- 安全日志:記錄創建頻率、權限驗證、存儲配額檢查、可疑創建行為
- 文檔處理日志:記錄文檔上傳、分塊處理、向量生成、索引創建等詳細過程
- 向量空間日志:記錄向量空間創建、配置、索引構建、查詢性能等信息
9.4 知識庫創建監控和告警
知識庫創建關鍵指標監控:
- 創建性能:知識庫創建響應時間、創建成功率、創建QPS、創建吞吐量
- 資源使用:數據庫連接數、向量空間創建延遲、內存使用率、文檔處理隊列長度
- 業務指標:知識庫創建成功率、創建頻率分布、不同嵌入模型使用比例、用戶創建活躍度
- 安全指標:權限驗證通過率、惡意創建嘗試次數、創建頻率限制觸發次數、存儲配額檢查失敗率
- 質量指標:向量空間創建成功率、文檔處理成功率、嵌入模型響應率、索引創建成功率
- 存儲指標:存儲使用量、向量數量、文檔數量、索引大小、存儲增長率
- 向量處理指標:向量生成延遲、向量維度分布、嵌入模型調用次數、向量相似度計算性能
知識庫創建告警策略:
- 創建失敗率告警:當知識庫創建失敗率超過3%時觸發告警
- 性能告警:當知識庫創建響應時間超過10秒時觸發告警
- 資源告警:當數據庫連接數超過80%或向量數據庫連接異常時觸發告警
- 安全告警:當檢測到異常創建行為或存儲配額濫用時立即觸發告警
- 數據一致性告警:當MySQL、ES和向量數據庫創建狀態不一致時觸發告警
- 配額告警:當用戶存儲使用量超過90%時觸發告警
- 向量服務告警:當嵌入模型服務不可用或響應超時時觸發告警
- 文檔處理告警:當文檔處理隊列積壓超過閾值時觸發告警
// 知識庫創建監控指標收集
type KnowledgeCreateMetrics struct {CreateSuccessCount int64 // 創建成功次數CreateFailureCount int64 // 創建失敗次數CreateLatency time.Duration // 創建延遲PermissionDeniedCount int64 // 權限拒絕次數RateLimitCount int64 // 頻率限制次數ParameterValidationFailCount int64 // 參數驗證失敗次數VectorSpaceCreateLatency time.Duration // 向量空間創建延遲VectorSpaceCreateFailCount int64 // 向量空間創建失敗次數DocumentProcessingLatency time.Duration // 文檔處理延遲EmbeddingGenerationLatency time.Duration // 嵌入生成延遲EmbeddingModelFailCount int64 // 嵌入模型調用失敗次數StorageQuotaExceededCount int64 // 存儲配額超限次數IndexCreateLatency time.Duration // 索引創建延遲IndexCreateFailCount int64 // 索引創建失敗次數EventPublishLatency time.Duration // 事件發布延遲DatabaseInsertLatency time.Duration // 數據庫插入延遲VectorDatabaseLatency time.Duration // 向量數據庫操作延遲TotalStorageUsed int64 // 總存儲使用量TotalVectorCount int64 // 總向量數量TotalDocumentCount int64 // 總文檔數量
}// 知識庫創建監控指標上報
func (s *KnowledgeApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, knowledgeID int64, req *knowledgeAPI.CreateDatasetRequest, err error) {latency := time.Since(startTime)if err != nil {metrics.CreateFailureCount++// 根據錯誤類型分類統計switch {case errors.Is(err, errno.ErrKnowledgePermissionCode):metrics.PermissionDeniedCount++case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):metrics.RateLimitCount++case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):metrics.ParameterValidationFailCount++case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):metrics.StorageQuotaExceededCount++case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):metrics.VectorSpaceCreateFailCount++case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):metrics.EmbeddingModelFailCount++case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):metrics.IndexCreateFailCount++}logs.CtxErrorf(ctx, "Knowledge %s failed, knowledgeName=%s, spaceID=%d, embeddingModel=%s, error=%v, latency=%dms", operation, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), err, latency.Milliseconds())} else {metrics.CreateSuccessCount++metrics.CreateLatency = latency// 記錄知識庫類型統計embeddingModel := req.GetEmbeddingModel()chunkSize := req.GetChunkSize()logs.CtxInfof(ctx, "Knowledge %s succeeded, knowledgeID=%d, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, latency=%dms", operation, knowledgeID, req.GetName(), embeddingModel, chunkSize, latency.Milliseconds())}// 上報到監控系統s.metricsReporter.Report(ctx, "knowledge_create", map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"knowledge_name": req.GetName(),"embedding_model": req.GetEmbeddingModel(),"chunk_size": req.GetChunkSize(),"chunk_overlap": req.GetChunkOverlap(),"space_id": req.GetSpaceID(),"success": err == nil,"latency_ms": latency.Milliseconds(),"error_type": getKnowledgeCreateErrorType(err),"vector_dimensions": getModelDimensions(req.GetEmbeddingModel()),"storage_used": getStorageUsed(ctx, req.GetSpaceID()),})
}// 獲取知識庫創建錯誤類型
func getKnowledgeCreateErrorType(err error) string {if err == nil {return "none"}// 基于知識庫錯誤碼定義switch {case errors.Is(err, errno.ErrKnowledgePermissionCode):return "permission_denied"case errors.Is(err, errno.ErrKnowledgeNameExistsCode):return "knowledge_exists"case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):return "invalid_parameters"case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):return "storage_quota_exceeded"case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):return "vector_space_create_failed"case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):return "embedding_model_failed"case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):return "index_create_failed"case errors.Is(err, errno.ErrKnowledgeDocumentProcessingFailedCode):return "document_processing_failed"case errors.Is(err, errno.ErrKnowledgeVectorDatabaseTimeoutCode):return "vector_database_timeout"case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):return "rate_limit_exceeded"default:return "system_error"}
}// 知識庫創建告警檢查
func (s *KnowledgeApplicationService) checkCreateAlerts(ctx context.Context, metrics *KnowledgeCreateMetrics) {// 創建失敗率告警totalCreates := metrics.CreateSuccessCount + metrics.CreateFailureCountif totalCreates > 100 {failureRate := float64(metrics.CreateFailureCount) / float64(totalCreates)if failureRate > 0.03 { // 3%s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "knowledge_create_failure_rate",Message: fmt.Sprintf("知識庫創建失敗率過高: %.2f%%", failureRate*100),Metrics: map[string]interface{}{"failure_rate": failureRate,"total_creates": totalCreates,},})}}// 性能告警if metrics.CreateLatency > 10*time.Second {s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "knowledge_create_latency",Message: fmt.Sprintf("知識庫創建延遲過高: %dms", metrics.CreateLatency.Milliseconds()),Metrics: map[string]interface{}{"latency_ms": metrics.CreateLatency.Milliseconds(),},})}// 存儲配額告警if metrics.StorageQuotaExceededCount > 10 {s.alertManager.SendAlert(ctx, &Alert{Level: "critical",Type: "knowledge_storage_quota_exceeded",Message: fmt.Sprintf("存儲配額超限次數過多: %d", metrics.StorageQuotaExceededCount),Metrics: map[string]interface{}{"quota_exceeded_count": metrics.StorageQuotaExceededCount,},})}// 向量空間創建失敗告警if metrics.VectorSpaceCreateFailCount > 5 {s.alertManager.SendAlert(ctx, &Alert{Level: "critical",Type: "knowledge_vector_space_create_failed",Message: fmt.Sprintf("向量空間創建失敗次數過多: %d", metrics.VectorSpaceCreateFailCount),Metrics: map[string]interface{}{"vector_space_fail_count": metrics.VectorSpaceCreateFailCount,},})}// 嵌入模型失敗告警if metrics.EmbeddingModelFailCount > 20 {s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "knowledge_embedding_model_failed",Message: fmt.Sprintf("嵌入模型調用失敗次數過多: %d", metrics.EmbeddingModelFailCount),Metrics: map[string]interface{}{"embedding_fail_count": metrics.EmbeddingModelFailCount,},})}
}