將 AzureBlob 的日志通過 Azure Event Hubs 發給 Elasticsearch(3.純python的實惠版)

前情:

將 AzureBlob 的日志通過 Azure Event Hubs 發給 Elasticsearch(1.標準版)-CSDN博客

將 AzureBlob 的日志通過 Azure Event Hubs 發給 Elasticsearch(2.換掉付費的Event Hubs)-CSDN博客

python腳本實現

厲害的小伙伴最終使用python腳本免費實現了這個功能

文件結構

以下是cursor對代碼的評審意見:

這是一個用于下載?Azure Storage?訪問日志并將其導入到?Elasticsearch?的項目。讓我逐個文件分析:

1.?整體架構

項目采用了良好的模塊化設計,主要包含以下組件:

  • main.py:?主程序入口,協調整體流程
  • azure_log_downloader.py: Azure?blob 存儲日志下載器
  • elasticsearch_sender.py: Elasticsearch 數據發送器
  • models.py:?數據模型定義
  • log_config.py:?日志配置
  • .env: 環境配置文件

2. 具體評審

main.py

優點:

  • 支持日期范圍處理
  • 良好的錯誤處理和日志記錄
  • 模塊化的設計,職責劃分清晰

建議改進:

  • 可以考慮添加命令行參數支持,使日期范圍可配置
  • 可以添加并發處理支持,提高處理速度
azure_log_downloader.py

優點:

  • 清晰的類設計
  • 良好的錯誤處理

建議改進:

  • 可以添加重試機制
  • 可以考慮支持異步操作
elasticsearch_sender.py

優點:

  • 實現了批量處理
  • 包含重試機制
  • 良好的連接測試功能

建議改進:

  • 可以考慮添加索引模板管理
  • 可以添加更多的?ES?配置選項
models.py

優點:

  • 使用 dataclass 簡化了類定義
  • 詳細的字段解析邏輯
  • 良好的類型注解

建議改進:

  • 可以添加字段驗證
  • 可以考慮使用 Pydantic 進行數據驗證
log_config.py

優點:

  • 完善的日志配置
  • 支持文件和控制臺輸出

建議改進:

  • 可以考慮添加日志輪轉
  • 可以添加日志級別配置選項

3.?安全性建議

  • 建議使用更安全的方式存儲敏感信息(如?API keys)
  • 建議添加 SSL?證書驗證
  • 建議添加請求速率限制

4. 性能建議

  • 考慮添加并發處理
  • 可以實現增量更新機制
  • 考慮添加緩存機制

5.?可維護性建議

  • 添加單元測試
  • 添加更詳細的文檔
  • 考慮添加監控指標
  • 添加?CI/CD?配置

總體來說,這是一個設計良好的項目,代碼質量較高,有良好的錯誤處理和日志記錄。主要可以在性能優化、安全性和可維護性方面進行進一步改進。

具體腳本

1. azure_log_downloader.py:

from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLogclass AzureLogDownloader:def __init__(self, connection_string, container_name, blob_name, logger=None):self.logger = loggerself.connection_string = connection_stringself.container_name = container_nameself.blob_name = blob_namedef _get_blob_client(self):blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)blob_client = blob_service_client.get_blob_client(container=self.container_name, blob=self.blob_name)return blob_clientdef download_and_transform(self):"""Download and transform log data from Azure storage"""try:blob_client = self._get_blob_client()if not blob_client.exists():self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")return []blob_data = blob_client.download_blob().readall().decode('utf-8')transformed_entries = []for line in blob_data.splitlines():if line.strip():try:log_entry = json.loads(line)log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)if log_obj:transformed_entries.append(log_obj)except json.JSONDecodeError as e:self.logger.error(f"Error parsing line: {str(e)}")continueself.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")return transformed_entriesexcept Exception as e:self.logger.error(f"Error downloading blob: {str(e)}")self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")self.logger.error(f"Error type: {type(e).__name__}")return []

2. elasticsearch_sender.py:

from elasticsearch import Elasticsearch, helpers
import time
import uuidclass ElasticsearchSender:def __init__(self, host, api_key=None, index_name="logs", logger=None):self.logger = loggerself.config = {'hosts': host,'timeout': 30,'retry_on_timeout': True,'max_retries': 3,'verify_certs': False,'ssl_show_warn': False,'use_ssl': True}if api_key:self.config['api_key'] = api_keyself.index_name = index_nameself.es = Elasticsearch(**self.config)def test_connection(self):"""Test Elasticsearch connection"""try:info = self.es.info()self.logger.info("\nElasticsearch Server Info:")self.logger.info(f"Version: {info['version']['number']}")self.logger.info(f"Cluster Name: {info['cluster_name']}")return Trueexcept Exception as e:self.logger.error(f"\nElasticsearch connection failed: {str(e)}")return Falsedef send_logs(self, log_entries, batch_size=500, max_retries=3):"""Send logs to Elasticsearch"""def generate_actions():for entry in log_entries:doc_data = entry.__dict__.copy()if 'time' in doc_data:doc_data['@timestamp'] = doc_data.pop('time')action = {'_index': self.index_name,'_id': str(uuid.uuid4()),'_source': doc_data}yield actionsuccess_count = 0failure_count = 0retry_count = 0while retry_count < max_retries:try:success, failed = helpers.bulk(self.es,generate_actions(),chunk_size=batch_size,raise_on_error=False,raise_on_exception=False)success_count += successfailure_count += len(failed) if failed else 0self.logger.info(f"\nBatch processing results:")self.logger.info(f"- Successfully indexed: {success_count} documents")self.logger.info(f"- Failed: {failure_count} documents")if not failed:breakretry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)except Exception as e:self.logger.error(f"\nBulk indexing error: {str(e)}")retry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)else:self.logger.info("Maximum retry attempts reached")breakreturn success_count, failure_count

3. log_config.py:

import logging
import os
from datetime import UTC, datetimedef setup_logger(target_date: datetime = None, log_prefix: str = "app"):base_dir = os.path.dirname(os.path.abspath(__file__))log_dir = os.path.join(base_dir, 'logs')if not os.path.exists(log_dir):os.makedirs(log_dir)current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')logger = logging.getLogger('AccessLog')logger.setLevel(logging.INFO)file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return logger

4. models.py:

from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional@dataclass
class StorageOperationLog:time: datetimecategory: Optional[str]operationName: Optional[str]callerIpAddress: Optional[str]location: Optional[str]uri: Optional[str]durationMs: Optional[int]referrerHeader: Optional[str]userAgentHeader: Optional[str]requestBodySize: Optional[int]responseBodySize: Optional[int]serverLatencyMs: Optional[int]objectKey: Optional[str]functionName: Optional[str]file_extension: Optional[str]@staticmethoddef parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:"""Parse objectKey to get institution_id and functionName"""try:container_match = re.search(r'container-(\d+)', object_key)parts = object_key.split('/')function_name = Noneif container_match:container_index = next((i for i, part in enumerate(parts) if 'container-' in part), None)if container_index is not None and container_index + 1 < len(parts):function_name = parts[container_index + 1]file_extension = Noneif parts and '.' in parts[-1]:file_extension = parts[-1].split('.')[-1].lower()return function_name, file_extensionexcept Exception as e:if logger:logger.error(f"Error parsing object_key {object_key}: {str(e)}")return None, None@classmethoddef from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:"""Create StorageOperationLog instance from raw log entry"""try:properties = entry.get('properties', {})object_key = properties.get('objectKey', '')function_name, file_extension = cls.parse_object_key(object_key)return cls(time=entry.get('time'),category=entry.get('category'),operationName=entry.get('operationName'),callerIpAddress=entry.get('callerIpAddress'),location=entry.get('location'),uri=entry.get('uri'),durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,referrerHeader=properties.get('referrerHeader'),userAgentHeader=properties.get('userAgentHeader'),requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,objectKey=object_key,functionName=function_name,file_extension=file_extension)except Exception as e:if logger:logger.error(f"Error creating StorageOperationLog: {str(e)}")return Nonedef __post_init__(self):if isinstance(self.time, str):if 'Z' in self.time:time_parts = self.time.split('.')if len(time_parts) > 1:microseconds = time_parts[1].replace('Z', '')[:6]time_str = f"{time_parts[0]}.{microseconds}Z"self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")else:self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")

5. main.py:

from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import osload_dotenv()def _get_index_name(target_date: datetime):"""Get full index name for the specified date"""return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(year=target_date.year,month=target_date.month)def _get_blob_name_list(target_date: datetime):"""Get blob paths for all hours of the specified date"""blobs = []for hour in range(24):blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(year=blob_time.year,month=blob_time.month,day=blob_time.day,hour=blob_time.hour)blobs.append(blob_name)return blobsdef main():start_date = datetime(2024, 1, 1, tzinfo=UTC)end_date = datetime(2024, 1, 2, tzinfo=UTC)current_date = start_datewhile current_date <= end_date:target_date = current_datelogger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))try:logger.info(f"\nProcessing data for {current_date.date()}")elasticsearch_index = _get_index_name(target_date)sender = ElasticsearchSender(os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),os.getenv('ELASTICSEARCH_API_KEY'),elasticsearch_index,logger)if not sender.test_connection():logger.error("Elasticsearch connection failed")current_date += timedelta(days=1)continuetotal_logs = total_success = total_failed = 0blobs = _get_blob_name_list(target_date)for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):logger.info(f"\nProcessing container: {container}")for blob_name in blobs:logger.info(f"\nProcessing blob: {blob_name}")downloader = AzureLogDownloader(os.getenv('AZURE_STORAGE_URI'),container,blob_name,logger)try:log_entries = downloader.download_and_transform()success, failed = sender.send_logs(log_entries)total_logs += len(log_entries)total_success += successtotal_failed += failedexcept Exception as e:logger.error(f"Error processing {blob_name}: {str(e)}")continuelogger.info(f"\n{current_date.date()} Processing completed:")logger.info(f"Total documents processed: {total_logs}")logger.info(f"Successfully indexed: {total_success}")logger.info(f"Failed: {total_failed}")finally:for handler in logger.handlers[:]:handler.close()logger.removeHandler(handler)current_date += timedelta(days=1)if __name__ == "__main__":main()

6. .env?:

ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app


前情后續:

將 AzureBlob 的日志通過 Azure Event Hubs 發給 Elasticsearch(1.標準版)-CSDN博客

將 AzureBlob 的日志通過 Azure Event Hubs 發給 Elasticsearch(2.換掉付費的Event Hubs)-CSDN博客

將 AzureBlob 的日志通過 Azure Event Hubs 發給 Elasticsearch(3.純python的實惠版)-CSDN博客




本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66622.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66622.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66622.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python學opencv|讀取圖像(四十)掩模:三通道圖像的局部覆蓋

【1】引言 前序學習了使用numpy創建單通道的灰色圖像&#xff0c;并對灰色圖像的局部進行了顏色更改&#xff0c;相關鏈接為&#xff1a; python學opencv|讀取圖像&#xff08;九&#xff09;用numpy創建黑白相間灰度圖_numpy生成全黑圖片-CSDN博客 之后又學習了使用numpy創…

【全面解析】深入解析 TCP/IP 協議:網絡通信的基石

深入解析 TCP/IP 協議&#xff1a;網絡通信的基石 導語 你是否曾好奇&#xff0c;現代互聯網是如何實現全球設備之間的高速、穩定和可靠通信的&#xff1f;無論是瀏覽網頁、發送電子郵件&#xff0c;還是進行視頻通話&#xff0c;背后都離不開 TCP/IP 協議 的支撐。作為互聯網…

全面解析 Java 流程控制語句

Java學習資料 Java學習資料 Java學習資料 在 Java 編程中&#xff0c;流程控制語句是構建程序邏輯的關鍵部分&#xff0c;它決定了程序的執行順序和走向。通過合理運用這些語句&#xff0c;開發者能夠實現各種復雜的業務邏輯&#xff0c;讓程序更加靈活和智能。 順序結構 順…

Linux系統常用指令

查找文件 find / -name "<文件名>" 2>/dev/null //遍歷系統查找指定文件名文件ls -l | grep "<文件名>" //列出當前目錄下有關文件名的文件find -name sw_sfp_alarm_cfg.xml //查找文件名對應路徑 切換目錄 編輯文件 vi <文件…

【Unity】ScrollViewContent適配問題(Contentsizefilter不刷新、ContentSizeFilter失效問題)

最近做了一個項目&#xff0c;菜單欄讀取數據后自動生成&#xff0c;結果用到了雙重布局 父物體 嘗試了很多方式&#xff0c;也看過很多大佬的文章&#xff0c;后來自己琢磨了一下&#xff0c;當子物體組件自動生成之后&#xff0c;使用以下以下代碼效果會好一些&#xff1a; …

AI輔助醫學統計分析APP

AI輔助醫學統計分析APP 醫學統計分析的困難點在于開始階段分析的規劃和得出分析結果之后分析結果的解釋&#xff0c;前者之所以困難是因為分析方法繁多又有不同的使用條件&#xff0c;后者則是因為結果中術語較多&#xff0c;且各種分析方法術語又有不同&#xff0c;非統計專業…

[STM32 HAL庫]串口中斷編程思路

一、前言 最近在準備藍橋杯比賽&#xff08;嵌入式賽道&#xff09;&#xff0c;研究了以下串口空閑中斷DMA接收不定長的數據&#xff0c;感覺這個方法的接收效率很高&#xff0c;十分好用。方法配置都成功了&#xff0c;但是有一個點需要進行考慮&#xff0c;就是一般我們需要…

淺談Java之AJAX

一、基本介紹 在Java開發中&#xff0c;AJAX&#xff08;Asynchronous JavaScript and XML&#xff09;是一種用于創建動態網頁的技術&#xff0c;它允許網頁在不重新加載整個頁面的情況下與服務器進行交互。 二、關鍵點和示例 1. AJAX的基本原理 AJAX通過JavaScript的XMLHtt…

AutoSar架構學習筆記

1.AUTOSAR&#xff08;Automotive Open System Architecture&#xff0c;汽車開放系統架構&#xff09;是一個針對汽車行業的軟件架構標準&#xff0c;旨在提升汽車電子系統的模塊化、可擴展性、可重用性和互操作性。AUTOSAR的目標是為汽車電子控制單元&#xff08;ECU&#xf…

算法競賽之差分進階——等差數列差分 python

目錄 前置知識進入正題實戰演練 前置知識 給定區間 [ l, r ]&#xff0c;讓我們把數組中的[ l, r ] 區間中的每一個數加上c,即 a[ l ] c , a[ l 1 ] c , a[ l 2] c , a[ r ] c; 怎么做&#xff1f;很簡單&#xff0c;差分一下即可 還不會的小伙伴點此進入學習 進入正題 …

TDengine 做 Apache SuperSet 數據源

?Apache Superset? 是一個現代的企業級商業智能&#xff08;BI&#xff09;Web 應用程序&#xff0c;主要用于數據探索和可視化。它由 Apache 軟件基金會支持&#xff0c;是一個開源項目&#xff0c;它擁有活躍的社區和豐富的生態系統。Apache Superset 提供了直觀的用戶界面…

金融場景 PB 級大規模日志平臺:中信銀行信用卡中心從 Elasticsearch 到 Apache Doris 的先進實踐

導讀&#xff1a;中信銀行信用卡中心每日新增日志數據 140 億條&#xff08;80TB&#xff09;&#xff0c;全量歸檔日志量超 40PB&#xff0c;早期基于 Elasticsearch 構建的日志云平臺&#xff0c;面臨存儲成本高、實時寫入性能差、文本檢索慢以及日志分析能力不足等問題。因此…

虛幻商城 Fab 免費資產自動化入庫

文章目錄 一、背景二、實現效果展示三、實現自動化入庫一、背景 上一次寫了個這篇文章 虛幻商城 Quixel 免費資產一鍵入庫,根據這個構想,便決定將范圍擴大,使 Fab 商城的所有的免費資產自動化入庫,是所有!所有! 上一篇文章是根據下圖這部分資產一鍵入庫: 而這篇文章則…

游戲為什么失敗?回顧某平庸游戲

1、上周玩了一個老鼠為主角的游戲&#xff0c;某平臺喜1送的&#xff0c; 下載了很久而一直沒空玩&#xff0c;大約1G&#xff0c;為了清硬盤空間而玩。 也是為了拔掉心中的一根刺&#xff0c;下載了而老是不玩總感覺不舒服。 2、老鼠造型比較寫實&#xff0c;看上去就有些討…

親測有效!如何快速實現 PostgreSQL 數據遷移到 時序數據庫TDengine

小T導讀&#xff1a;本篇文章是“2024&#xff0c;我想和 TDengine 談談”征文活動的優秀投稿之一&#xff0c;作者從數據庫運維的角度出發&#xff0c;分享了利用 TDengine Cloud 提供的遷移工具&#xff0c;從 PostgreSQL 數據庫到 TDengine 進行數據遷移的完整實踐過程。文章…

C#,入門教程(01)—— Visual Studio 2022 免費安裝的詳細圖文與動畫教程

通過本課程的學習&#xff0c;你可以掌握C#編程的重點&#xff0c;享受編程的樂趣。 在本課程之前&#xff0c;你無需具備任何C#的基礎知識&#xff0c;只要能操作電腦即可。 不過&#xff0c;希望你的數學不是體育老師教的。好的程序是數理化的實現與模擬。沒有較好的數學基礎…

Linux探秘坊-------3.開發工具詳解(2)

1.動靜態庫和動靜態鏈接&#xff08;操作&#xff09; 靜態庫是指編譯鏈接時,把庫?件的代碼全部加?到可執??件中,因此?成的?件 ?較?,但在運?時也就不再需要庫?件了。其后綴名?般為“.a” 動態庫與之相反,在編譯鏈接時并 沒有把庫?件的代碼加?到可執??件中 ,?…

電腦開機出現Bitlock怎么辦

目錄 1.前言 2.產生原因&#xff1a; 1.系統異常關機 2.系統更新錯誤 3.硬件更換 4.CMOS電池問題 5.出廠設置 6.意外情況 3.解鎖步驟&#xff1a; 3.1&#xff1a;記住密鑰ID&#xff08;前6位&#xff09; 3.2&#xff1a;打開aka.ms/myrecoverykey網址 3.3&#…

C# 的 NLog 庫高級進階

一、引言 在 C# 開發的廣袤天地中&#xff0c;日志記錄宛如開發者的 “千里眼” 與 “順風耳”&#xff0c;助力我們洞察應用程序的運行狀態&#xff0c;快速定位并解決問題。而 NLog 庫&#xff0c;無疑是日志記錄領域中的璀璨明星&#xff0c;以其強大的功能、靈活的配置和出…

Avalonia系列文章之小試牛刀

最近有朋友反饋&#xff0c;能否分享一下Avalonia相關的文章&#xff0c;于是就抽空學習了一下&#xff0c;發現Avalonia真的是一款非常不錯的UI框架&#xff0c;值得花時間認真學習一下&#xff0c;于是邊學習邊記錄&#xff0c;整理成文&#xff0c;分享給大家&#xff0c;希…