?
總體功能:
這段程序的作用是:
從指定的S3桶中讀取所有對象的元數據(文件名、大小、最后修改時間、存儲類型、ETag等),并把這些信息寫入到Elasticsearch(ES)中,建立索引,以便后續可以快速搜索和檢索S3中的文件信息。
各模塊詳細解析:
1. readConfig
-
從本地的
config.ini
配置文件中讀取 S3 和 Elasticsearch 的連接信息,比如 S3的桶名、訪問秘鑰,ES的地址、賬號密碼、索引名等。
2. getS3ETag
-
給定一個文件名(Key),向S3發送
HeadObject
請求,單獨獲取該文件的ETag
(一般用于文件完整性校驗或者去重標識)。 -
特別處理了返回的ETag,把多余的引號去掉。
3. fetchS3Files
-
核心邏輯
-
通過 S3 的
ListObjectsV2Paginator
,分頁地遍歷桶中的所有對象。 -
對每一個對象:
-
讀取文件名(
Key
)、大小、最后修改時間、存儲類型。 -
調用
getS3ETag
補充獲取 ETag。 -
把這些元數據組織成一個
JSON
格式的文檔。 -
通過
esapi.IndexRequest
把文檔寫入到Elasticsearch的指定索引中。
-
4. main
-
先讀取配置文件。
-
初始化 S3 客戶端(帶自定義 Endpoint,例如MinIO、私有S3等),并提供靜態訪問密鑰。
-
初始化 Elasticsearch 客戶端(支持跳過TLS證書驗證,適合開發環境)。
-
調用
fetchS3Files
,正式開始批量導入S3文件元數據到ES。
特別注意:
-
InsecureSkipVerify: true
表示 跳過SSL證書校驗,這個設置在生產環境是不安全的,一般僅用于開發測試。 -
每導入一個S3對象,都會立即
Refresh
索引,確保數據立刻可搜索,但也會增加ES壓力,批量模式可以更高效。 -
如果S3里文件特別多,可以考慮加并發處理或限制速率,否則會慢。
總結一句話:
這段程序實現了 自動同步S3文件列表到Elasticsearch索引,方便對存儲桶中的文件進行快速搜索和查詢
package mainimport ("bytes""context""encoding/json""fmt""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/credentials""github.com/aws/aws-sdk-go-v2/service/s3""github.com/elastic/go-elasticsearch/v8""github.com/elastic/go-elasticsearch/v8/esapi""gopkg.in/ini.v1""log""crypto/tls""net/http""time" )type S3Config struct {BucketName stringAccessKey stringSecretKey stringEndpointURL string }type ESConfig struct {Host stringUser stringPass stringIndexName stringSearchType string }func readConfig() (S3Config, ESConfig) {cfg, err := ini.Load("config.ini")if err != nil {log.Fatalf("無法讀取配置文件: %v", err)}s3Cfg := S3Config{BucketName: cfg.Section("s3").Key("bucket_name").String(),AccessKey: cfg.Section("s3").Key("access_key").String(),SecretKey: cfg.Section("s3").Key("secret_key").String(),EndpointURL: cfg.Section("s3").Key("endpoint_url").String(),}esCfg := ESConfig{Host: cfg.Section("elasticsearch").Key("host").String(),User: cfg.Section("elasticsearch").Key("user").String(),Pass: cfg.Section("elasticsearch").Key("password").String(),IndexName: cfg.Section("elasticsearch").Key("index_name").String(),SearchType: cfg.Section("elasticsearch").Key("search_type").String(),}return s3Cfg, esCfg }func getS3ETag(s3Client *s3.Client, bucketName, fileKey string) string {resp, err := s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{Bucket: aws.String(bucketName),Key: aws.String(fileKey),})if err != nil {log.Printf("獲取 %s 的ETag失敗: %v", fileKey, err)return ""}etag := aws.ToString(resp.ETag)if len(etag) > 0 && etag[0] == '"' && etag[len(etag)-1] == '"' {etag = etag[1 : len(etag)-1]}return etag }func fetchS3Files(s3Client *s3.Client, esClient *elasticsearch.Client, bucketName, indexName string) {paginator := s3.NewListObjectsV2Paginator(s3Client, &s3.ListObjectsV2Input{Bucket: aws.String(bucketName),})for paginator.HasMorePages() {page, err := paginator.NextPage(context.TODO())if err != nil {log.Printf("獲取S3文件列表頁失敗: %v", err)continue}for _, obj := range page.Contents {fileKey := aws.ToString(obj.Key)log.Printf("導入索引:",fileKey)fileSize := aws.ToInt64(obj.Size)lastModified := obj.LastModifiedstorageClass := string(obj.StorageClass) // 修復點etag := getS3ETag(s3Client, bucketName, fileKey)fileData := map[string]interface{}{"file_key": fileKey,"file_size": fileSize,"last_modified": lastModified,"storage_class": storageClass,"etag": etag,}fileDataJSON, err := json.Marshal(fileData)if err != nil {log.Printf("將文件數據轉換為JSON失敗: %v", err)continue}req := esapi.IndexRequest{Index: indexName,Body: bytes.NewReader(fileDataJSON), // 修復點Refresh: "true",}resp, err := req.Do(context.TODO(), esClient)if err != nil {log.Printf("將文件數據索引到Elasticsearch失敗: %v", err)continue}defer resp.Body.Close()}}fmt.Println("S3 文件索引完成") }func main() {s3Cfg, esCfg := readConfig()customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {return aws.Endpoint{URL: s3Cfg.EndpointURL,SigningRegion: "us-east-1", // 替換為你的實際regionHostnameImmutable: true,}, nil})awsCfg, err := config.LoadDefaultConfig(context.TODO(),config.WithRegion("us-east-1"), // 替換為你的實際regionconfig.WithEndpointResolverWithOptions(customResolver),config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(s3Cfg.AccessKey,s3Cfg.SecretKey,"",)),)if err != nil {log.Fatalf("無法加載S3配置: %v", err)}s3Client := s3.NewFromConfig(awsCfg)esCfgOptions := elasticsearch.Config{Addresses: []string{esCfg.Host},Username: esCfg.User,Password: esCfg.Pass,Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true, // ?? 跳過證書校驗(不安全,僅限開發)},ResponseHeaderTimeout: 10 * time.Second,},}esClient, err := elasticsearch.NewClient(esCfgOptions)if err != nil {log.Fatalf("無法創建Elasticsearch客戶端: %v", err)}fetchS3Files(s3Client, esClient, s3Cfg.BucketName, esCfg.IndexName) }
?配置文件config.ini
[elasticsearch]
host = https://localhost:9200
user = elastic
password = U********uq
index_name = jyzx_s3_files
search_type = wildcard[s3]
bucket_name = fzsjyzx
access_key = V4*****6DB
secret_key = lHdmyi5*********UjlS
endpoint_url = http://172.20.1.18:7480/