https://www.python.org/static/community_logos/python-logo-master-v3-TM.png
基礎設施即代碼(IaC)
使用Fabric執行遠程命令
python
復制
下載
from fabric import Connectiondef deploy_app():# 連接到遠程服務器with Connection('web-server.example.com', user='deploy', connect_kwargs={"key_filename": "/path/to/key.pem"}) as c:# 更新代碼c.run('cd /var/www/myapp && git pull origin master')# 安裝依賴c.run('cd /var/www/myapp && pip install -r requirements.txt')# 重啟服務c.sudo('systemctl restart myapp', pty=True)# 檢查狀態result = c.run('systemctl status myapp', hide=True)print(f"服務狀態:\n{result.stdout}")if __name__ == '__main__':deploy_app()
Ansible Python API
python
復制
下載
from ansible.module_utils.basic import AnsibleModule import subprocessdef run_ansible_playbook(playbook_path):try:result = subprocess.run(['ansible-playbook', playbook_path],check=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)print("Playbook執行成功:")print(result.stdout)except subprocess.CalledProcessError as e:print("Playbook執行失敗:")print(e.stderr)# 自定義Ansible模塊示例 def configure_nginx(module):# 獲取參數server_name = module.params['server_name']root_path = module.params['root_path']# 生成Nginx配置config = f"""server {{listen 80;server_name {server_name};root {root_path};location / {{try_files $uri $uri/ =404;}}}}"""# 寫入配置文件try:with open(f'/etc/nginx/sites-available/{server_name}', 'w') as f:f.write(config)# 創建符號鏈接subprocess.run(['ln', '-sf', f'/etc/nginx/sites-available/{server_name}', f'/etc/nginx/sites-enabled/{server_name}'], check=True)# 測試并重載Nginxsubprocess.run(['nginx', '-t'], check=True)subprocess.run(['systemctl', 'reload', 'nginx'], check=True)module.exit_json(changed=True, msg="Nginx配置更新成功")except Exception as e:module.fail_json(msg=f"配置失敗: {str(e)}")if __name__ == '__main__':run_ansible_playbook('deploy.yml')
監控與告警系統
Prometheus自定義指標
python
復制
下載
from prometheus_client import start_http_server, Gauge import psutil import time# 創建指標 CPU_USAGE = Gauge('system_cpu_percent', 'CPU使用百分比') MEMORY_USAGE = Gauge('system_memory_percent', '內存使用百分比') DISK_USAGE = Gauge('system_disk_percent', '磁盤使用百分比')def collect_metrics():while True:# 收集CPU使用率CPU_USAGE.set(psutil.cpu_percent())# 收集內存使用率MEMORY_USAGE.set(psutil.virtual_memory().percent)# 收集磁盤使用率DISK_USAGE.set(psutil.disk_usage('/').percent)time.sleep(5)if __name__ == '__main__':# 啟動指標服務器start_http_server(8000)print("Prometheus指標服務器已啟動,端口8000")collect_metrics()
告警規則與通知
python
復制
下載
import smtplib from email.mime.text import MIMEText from datetime import datetimeclass AlertManager:def __init__(self, thresholds):self.thresholds = thresholdsself.alert_history = {}def check_metrics(self, metrics):alerts = []current_time = datetime.now()# CPU檢查if metrics['cpu'] > self.thresholds['cpu']:alert_key = 'high_cpu'if self._should_alert(alert_key, current_time):alerts.append(f"CPU使用率過高: {metrics['cpu']}%")# 內存檢查if metrics['memory'] > self.thresholds['memory']:alert_key = 'high_memory'if self._should_alert(alert_key, current_time):alerts.append(f"內存使用率過高: {metrics['memory']}%")# 磁盤檢查if metrics['disk'] > self.thresholds['disk']:alert_key = 'high_disk'if self._should_alert(alert_key, current_time):alerts.append(f"磁盤使用率過高: {metrics['disk']}%")return alertsdef _should_alert(self, alert_key, current_time):# 防止告警風暴,同一問題5分鐘內不重復告警last_alert = self.alert_history.get(alert_key)if last_alert and (current_time - last_alert).seconds < 300:return Falseself.alert_history[alert_key] = current_timereturn Truedef send_email_alert(self, to_addr, subject, body):msg = MIMEText(body)msg['Subject'] = subjectmsg['From'] = 'alert@example.com'msg['To'] = to_addrtry:with smtplib.SMTP('smtp.example.com', 587) as server:server.starttls()server.login('user', 'password')server.send_message(msg)print("告警郵件發送成功")except Exception as e:print(f"發送告警郵件失敗: {str(e)}")# 使用示例 thresholds = {'cpu': 90, 'memory': 85, 'disk': 90} alert_manager = AlertManager(thresholds)metrics = {'cpu': 95, 'memory': 80, 'disk': 92} alerts = alert_manager.check_metrics(metrics)if alerts:alert_manager.send_email_alert('admin@example.com','系統告警通知','\n'.join(alerts))
日志管理與分析
ELK日志處理管道
python
復制
下載
import logging from pythonjsonlogger import jsonlogger from logging.handlers import RotatingFileHandler import logstashdef setup_logging():# 創建loggerlogger = logging.getLogger('app')logger.setLevel(logging.INFO)# JSON格式formatter = jsonlogger.JsonFormatter('%(asctime)s %(levelname)s %(name)s %(message)s')# 文件處理器file_handler = RotatingFileHandler('/var/log/app/app.log',maxBytes=10*1024*1024, # 10MBbackupCount=5)file_handler.setFormatter(formatter)logger.addHandler(file_handler)# Logstash處理器logstash_handler = logstash.LogstashHandler('logstash.example.com',5044,version=1)logger.addHandler(logstash_handler)# 控制臺處理器console_handler = logging.StreamHandler()console_handler.setFormatter(formatter)logger.addHandler(console_handler)return logger# 使用示例 logger = setup_logging() logger.info("應用啟動", extra={'user': 'admin', 'module': 'startup'}) try:1 / 0 except Exception as e:logger.error("發生錯誤", exc_info=True, extra={'context': 'division'})
日志分析腳本
python
復制
下載
import pandas as pd import re from collections import Counterdef analyze_logs(log_file):# 讀取日志文件logs = []with open(log_file, 'r') as f:for line in f:try:log = json.loads(line)logs.append(log)except json.JSONDecodeError:continue# 轉換為DataFramedf = pd.DataFrame(logs)# 分析錯誤級別print("\n錯誤級別分布:")print(df['levelname'].value_counts())# 提取并統計錯誤消息error_messages = df[df['levelname'] == 'ERROR']['message']print("\n最常見錯誤消息:")print(error_messages.value_counts().head(5))# 提取HTTP狀態碼df['status_code'] = df['message'].str.extract(r'status code (\d{3})')if 'status_code' in df.columns:print("\nHTTP狀態碼分布:")print(df['status_code'].value_counts())# 分析時間模式df['hour'] = pd.to_datetime(df['asctime']).dt.hourprint("\n按小時分布的日志量:")print(df['hour'].value_counts().sort_index())if __name__ == '__main__':analyze_logs('/var/log/app/app.log')
配置管理
使用Python管理配置文件
python
復制
下載
import configparser import yaml import json from typing import Dict, Anyclass ConfigManager:def __init__(self):self.configs = {}def load_ini(self, filepath: str) -> Dict[str, Any]:"""加載INI配置文件"""config = configparser.ConfigParser()config.read(filepath)self.configs['ini'] = {s: dict(config.items(s)) for s in config.sections()}return self.configs['ini']def load_yaml(self, filepath: str) -> Dict[str, Any]:"""加載YAML配置文件"""with open(filepath, 'r') as f:self.configs['yaml'] = yaml.safe_load(f)return self.configs['yaml']def load_json(self, filepath: str) -> Dict[str, Any]:"""加載JSON配置文件"""with open(filepath, 'r') as f:self.configs['json'] = json.load(f)return self.configs['json']def get_value(self, config_type: str, *keys) -> Any:"""獲取嵌套配置值"""current = self.configs.get(config_type, {})for key in keys:if isinstance(current, dict) and key in current:current = current[key]else:return Nonereturn currentdef update_config(self, config_type: str, updates: Dict[str, Any]):"""更新配置"""if config_type in self.configs:self.configs[config_type].update(updates)def save_config(self, config_type: str, filepath: str):"""保存配置到文件"""if config_type not in self.configs:raise ValueError(f"未加載的配置類型: {config_type}")if config_type == 'ini':config = configparser.ConfigParser()for section, options in self.configs['ini'].items():config[section] = optionswith open(filepath, 'w') as f:config.write(f)elif config_type == 'yaml':with open(filepath, 'w') as f:yaml.dump(self.configs['yaml'], f)elif config_type == 'json':with open(filepath, 'w') as f:json.dump(self.configs['json'], f, indent=2)# 使用示例 config_manager = ConfigManager() config_manager.load_yaml('config.yaml') db_host = config_manager.get_value('yaml', 'database', 'host') print(f"數據庫主機: {db_host}")# 更新配置 config_manager.update_config('yaml', {'database': {'host': 'new.db.example.com'}}) config_manager.save_config('yaml', 'config_updated.yaml')
容器化與編排
Docker SDK for Python
python
復制
下載
import docker from docker.errors import DockerExceptionclass DockerManager:def __init__(self):try:self.client = docker.from_env()print("Docker連接成功")except DockerException as e:print(f"連接Docker失敗: {str(e)}")raisedef list_containers(self, all=False):"""列出容器"""return self.client.containers.list(all=all)def run_container(self, image, command=None, detach=True, **kwargs):"""運行容器"""return self.client.containers.run(image,command=command,detach=detach,**kwargs)def build_image(self, path, tag, dockerfile='Dockerfile'):"""構建鏡像"""return self.client.images.build(path=path,tag=tag,dockerfile=dockerfile)def cleanup_containers(self):"""清理停止的容器"""stopped_containers = self.list_containers(all=True, filters={'status': 'exited'})for container in stopped_containers:print(f"刪除容器: {container.id}")container.remove()def cleanup_images(self, dangling=True):"""清理未使用的鏡像"""filters = {'dangling': True} if dangling else Noneunused_images = self.client.images.list(filters=filters)for image in unused_images:print(f"刪除鏡像: {image.tags}")self.client.images.remove(image.id)# 使用示例 docker_manager = DockerManager() print("運行中的容器:") for container in docker_manager.list_containers():print(f" - {container.name}: {container.status}")# 運行新容器 nginx = docker_manager.run_container('nginx:latest',ports={'80/tcp': 8080},name='web-server' ) print(f"啟動容器: {nginx.name}")
Kubernetes Python客戶端
python
復制
下載
from kubernetes import client, configclass KubernetesManager:def __init__(self, in_cluster=False):if in_cluster:config.load_incluster_config()else:config.load_kube_config()self.core_v1 = client.CoreV1Api()self.apps_v1 = client.AppsV1Api()def list_pods(self, namespace='default'):"""列出Pod"""return self.core_v1.list_namespaced_pod(namespace)def create_deployment(self, name, image, replicas=1, namespace='default'):"""創建Deployment"""deployment = client.V1Deployment(metadata=client.V1ObjectMeta(name=name),spec=client.V1DeploymentSpec(replicas=replicas,selector={'matchLabels': {'app': name}},template=client.V1PodTemplateSpec(metadata=client.V1ObjectMeta(labels={'app': name}),spec=client.V1PodSpec(containers=[client.V1Container(name=name,image=image)]))))return self.apps_v1.create_namespaced_deployment(namespace=namespace,body=deployment)def create_service(self, name, port, target_port, namespace='default', service_type='LoadBalancer'):"""創建Service"""service = client.V1Service(metadata=client.V1ObjectMeta(name=name),spec=client.V1ServiceSpec(selector={'app': name},ports=[client.V1ServicePort(port=port,target_port=target_port)],type=service_type))return self.core_v1.create_namespaced_service(namespace=namespace,body=service)# 使用示例 k8s_manager = KubernetesManager()print("集群中的Pod:") pods = k8s_manager.list_pods() for pod in pods.items:print(f" - {pod.metadata.name}: {pod.status.phase}")# 部署應用 deployment = k8s_manager.create_deployment('myapp','myapp:1.0',replicas=3 ) service = k8s_manager.create_service('myapp-service',port=80,target_port=8080 ) print(f"已部署: {deployment.metadata.name}") print(f"已創建服務: {service.metadata.name}")
CI/CD流水線
GitLab CI Python腳本
python
復制
下載
import gitlab import requests import timeclass GitLabCICD:def __init__(self, gitlab_url, private_token, project_id):self.gl = gitlab.Gitlab(gitlab_url, private_token=private_token)self.project = self.gl.projects.get(project_id)def trigger_pipeline(self, branch='master', variables=None):"""觸發CI/CD流水線"""pipeline = self.project.pipelines.create({'ref': branch,'variables': variables or {}})print(f"已觸發流水線: {pipeline.id}")return pipelinedef wait_for_pipeline(self, pipeline, timeout=1800, interval=30):"""等待流水線完成"""start_time = time.time()while time.time() - start_time < timeout:pipeline.refresh()if pipeline.status in ['success', 'failed', 'canceled', 'skipped']:print(f"流水線狀態: {pipeline.status}")return pipelineprint(f"等待中... 當前狀態: {pipeline.status}")time.sleep(interval)raise TimeoutError("等待流水線超時")def get_job_logs(self, pipeline, job_name):"""獲取作業日志"""for job in pipeline.jobs.list():if job.name == job_name:return job.trace().decode('utf-8')return Nonedef deploy_to_environment(self, environment, version):"""部署到指定環境"""# 觸發部署流水線pipeline = self.trigger_pipeline(branch='master',variables={'DEPLOY_ENV': environment,'APP_VERSION': version})# 等待部署完成pipeline = self.wait_for_pipeline(pipeline)if pipeline.status == 'success':print(f"成功部署 {version} 到 {environment}")return Trueelse:logs = self.get_job_logs(pipeline, 'deploy')print(f"部署失敗. 作業日志:\n{logs}")return False# 使用示例 ci_cd = GitLabCICD(gitlab_url='https://gitlab.example.com',private_token='your-private-token',project_id=12345 )# 部署新版本 ci_cd.deploy_to_environment('production', '1.2.0')
GitHub Actions Python SDK
python
復制
下載
import os from github import Github from base64 import b64encodeclass GitHubActionsManager:def __init__(self, token, repo_name):self.gh = Github(token)self.repo = self.gh.get_repo(repo_name)def create_workflow(self, workflow_name, workflow_content):"""創建或更新工作流"""workflow_path = f".github/workflows/{workflow_name}.yml"try:# 檢查工作流是否存在contents = self.repo.get_contents(workflow_path)# 更新現有工作流self.repo.update_file(workflow_path,f"Update {workflow_name}",workflow_content,contents.sha)print(f"已更新工作流: {workflow_name}")except Exception as e:# 創建新工作流self.repo.create_file(workflow_path,f"Create {workflow_name}",workflow_content)print(f"已創建工作流: {workflow_name}")def trigger_workflow_dispatch(self, workflow_name, ref='master', inputs=None):"""手動觸發工作流"""workflow = self.repo.get_workflow(f"{workflow_name}.yml")workflow.create_dispatch(ref, inputs=inputs or {})print(f"已觸發工作流: {workflow_name}")def get_workflow_runs(self, workflow_name, branch=None):"""獲取工作流運行記錄"""workflow = self.repo.get_workflow(f"{workflow_name}.yml")runs = workflow.get_runs(branch=branch)return list(runs)def download_workflow_logs(self, run_id, output_dir='logs'):"""下載工作流日志"""run = self.repo.get_workflow_run(run_id)jobs = run.jobs()if not os.path.exists(output_dir):os.makedirs(output_dir)for job in jobs:log = job.logs()log_file = os.path.join(output_dir, f"{run_id}_{job.name}.log")with open(log_file, 'w') as f:f.write(log)print(f"已保存日志: {log_file}")# 使用示例 actions_mgr = GitHubActionsManager(token=os.getenv('GITHUB_TOKEN'),repo_name='your-username/your-repo' )# 定義CI工作流 ci_workflow = """ name: Python CIon: [push, pull_request]jobs:build:runs-on: ubuntu-lateststeps:- uses: actions/checkout@v2- name: Set up Pythonuses: actions/setup-python@v2with:python-version: '3.9'- name: Install dependenciesrun: |python -m pip install --upgrade pippip install -r requirements.txt- name: Run testsrun: |pytest """# 創建/更新工作流 actions_mgr.create_workflow("python-ci", ci_workflow)# 手動觸發工作流 actions_mgr.trigger_workflow_dispatch("python-ci",inputs={"environment": "staging"} )
安全自動化
漏洞掃描集成
python
復制
下載
import subprocess import json from datetime import datetimeclass SecurityScanner:def __init__(self, output_dir='reports'):self.output_dir = output_dirif not os.path.exists(self.output_dir):os.makedirs(self.output_dir)def run_dependency_scan(self, project_path):"""運行依賴項漏洞掃描"""report_file = os.path.join(self.output_dir,f"dependency_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")try:result = subprocess.run(['safety', 'check', '--json', '--output', report_file],cwd=project_path,capture_output=True,text=True)if result.returncode == 0:print("未發現漏洞依賴")return Trueelse:with open(report_file, 'r') as f:vulnerabilities = json.load(f)print(f"發現 {len(vulnerabilities)} 個漏洞:")for vuln in vulnerabilities:print(f" - {vuln['package_name']} {vuln['analyzed_version']}: {vuln['advisory']}")return Falseexcept Exception as e:print(f"依賴掃描失敗: {str(e)}")return Falsedef run_code_scan(self, project_path):"""運行靜態代碼分析"""report_file = os.path.join(self.output_dir,f"code_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")try:result = subprocess.run(['bandit', '-r', '.', '-f', 'json', '-o', report_file],cwd=project_path,capture_output=True,text=True)with open(report_file, 'r') as f:report = json.load(f)if report['metrics']['_totals']['issues'] == 0:print("未發現安全問題")return Trueelse:print(f"發現 {report['metrics']['_totals']['issues']} 個安全問題:")for issue in report['results']:print(f" - {issue['issue_text']} (嚴重性: {issue['issue_severity']})")return Falseexcept Exception as e:print(f"代碼掃描失敗: {str(e)}")return Falsedef generate_report(self):"""生成安全報告"""reports = []for filename in os.listdir(self.output_dir):if filename.endswith('.json'):with open(os.path.join(self.output_dir, filename), 'r') as f:reports.append({'filename': filename,'content': json.load(f)})html_report = """<html><head><title>安全掃描報告</title></head><body><h1>安全掃描報告</h1><p>生成時間: {}</p>""".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))for report in reports:html_report += f"<h2>{report['filename']}</h2>"html_report += "<pre>" + json.dumps(report['content'], indent=2) + "</pre>"html_report += "</body></html>"report_path = os.path.join(self.output_dir, 'security_report.html')with open(report_path, 'w') as f:f.write(html_report)print(f"報告已生成: {report_path}")return report_path# 使用示例 scanner = SecurityScanner() project_path = '/path/to/your/project'# 運行掃描 dep_scan_ok = scanner.run_dependency_scan(project_path) code_scan_ok = scanner.run_code_scan(project_path)# 生成報告 if not dep_scan_ok or not code_scan_ok:report_path = scanner.generate_report()print("發現安全問題,請查看報告")
自動化安全加固
python
復制
下載
import os import platform import subprocessclass SystemHardener:def __init__(self):self.system = platform.system().lower()self.hardening_actions = []def check_permissions(self, path, recommended_mode):"""檢查文件權限"""current_mode = oct(os.stat(path).st_mode & 0o777)if current_mode != recommended_mode:self.hardening_actions.append(f"chmod {recommended_mode} {path}")return Falsereturn Truedef check_ssh_config(self):"""檢查SSH配置"""sshd_config = '/etc/ssh/sshd_config'if not os.path.exists(sshd_config):return Truewith open(sshd_config, 'r') as f:content = f.read()checks = {'PermitRootLogin': 'no','PasswordAuthentication': 'no','X11Forwarding': 'no'}for param, expected_value in checks.items():if f"{param} {expected_value}" not in content:self.hardening_actions.append(f"echo '{param} {expected_value}' >> {sshd_config}")def apply_hardening(self, dry_run=False):"""應用安全加固"""if not self.hardening_actions:print("系統已符合安全標準")returnprint("需要執行以下加固操作:")for action in self.hardening_actions:print(f" - {action}")if dry_run:returnconfirm = input("確認執行這些操作嗎? (y/n): ")if confirm.lower() == 'y':for action in self.hardening_actions:try:if action.startswith('echo'):# 處理追加配置的情況cmd, redirection = action.split('>>')param = cmd.strip().split('echo ')[1]with open(redirection.strip(), 'a') as f:f.write(f"\n{param}\n")else:subprocess.run(action, shell=True, check=True)print(f"執行成功: {action}")except subprocess.CalledProcessError as e:print(f"執行失敗: {action} - {str(e)}")print("安全加固完成")else:print("操作已取消")# 使用示例 hardener = SystemHardener()# 檢查關鍵文件權限 hardener.check_permissions('/etc/passwd', '0o644') hardener.check_permissions('/etc/shadow', '0o640')# 檢查SSH配置 hardener.check_ssh_config()# 應用加固(測試運行) hardener.apply_hardening(dry_run=True)# 實際應用加固 # hardener.apply_hardening()
結語與學習路徑
https://www.python.org/static/community_logos/python-powered-h-140x182.png
通過這十篇系列教程,你已經掌握了:
-
基礎設施即代碼(IaC)實踐
-
監控告警系統構建
-
日志管理與分析技術
-
配置管理最佳實踐
-
容器化與編排技術
-
CI/CD流水線實現
-
安全自動化與加固
進階學習方向:
-
云原生技術棧:
-
深入Kubernetes Operator開發
-
服務網格(Service Mesh)實現
-
無服務器架構(Serverless)
-
-
性能優化:
-
分布式系統性能調優
-
大規模集群管理
-
高可用架構設計
-
-
安全專業領域:
-
滲透測試與紅隊技術
-
零信任架構實現
-
合規性自動化檢查
-
-
認證體系:
-
AWS/Azure/GCP云認證
-
Certified Kubernetes Administrator
-
HashiCorp認證工程師
-
Python在自動化運維和DevOps領域的應用日益廣泛,持續學習和實踐將助你成為高效能的技術專家!