最近做新老版本zabbix監控主機遷移發現zabbix6.0后api安全有了效大升級,批量主機維護腳本出現認證兼容性問題,以下為腳本更新token支持:`在這里插入代碼片:
# /usr/bin/env python3
# -*- coding:utf-8 -*-
import requests
import json
import os
import re
from typing import List, Dict
import ipaddress## 用戶配置區域
ZABBIX_URL = "http://[IP]/api_jsonrpc.php" # Zabbix API 地址(根據不同版本api接口地址調整)
USE_API_TOKEN = True # 是否使用 API Token 認證(True/False,token與傳統認證方式二選一)
API_TOKEN = "[token]" # zbx6創建固定Token時填寫,zbx5.0以下用動態auth-token忽略
USERNAME = "[user]" # 傳統認證用戶名(zbx6以下兼容,使用api-token認證時忽略)
PASSWORD = "[password]" # 傳統認證密碼(zbx6以下兼容,使用api-token認證時忽略)
HOSTS_FILE = "serverlist.txt" # 主機列表文件路徑class ZabbixClient:def __init__(self, url: str, use_api_token: bool, token: str = None, username: str = None, password: str = None):self.url = urlself.use_api_token = use_api_tokenself.headers = {"Content-Type": "application/json-rpc"}# 配置認證if use_api_token:self.headers["Authorization"] = f"Bearer {token}"else:self.auth = self._login(username, password)def _login(self, username: str, password: str) -> str:"""傳統認證獲取Token"""payload = {"jsonrpc": "2.0","method": "user.login","params": {"user": username, "password": password},"id": 1}return self._send_request(payload, require_auth=False)["result"]def _send_request(self, payload: Dict, require_auth: bool = True) -> Dict:"""統一請求處理"""if require_auth and not self.use_api_token:payload["auth"] = self.authtry:response = requests.post(self.url,headers=self.headers,data=json.dumps(payload),timeout=10)response.raise_for_status()result = response.json()if "error" in result:error_info = result["error"]raise Exception(f"API錯誤 ({error_info['code']}): {error_info['message']}\n數據: {error_info.get('data', '')}")return resultexcept requests.exceptions.RequestException as e:raise Exception(f"請求失敗: {str(e)}")def check_host_exists(self, hostname: str) -> bool:"""檢查主機是否存在"""payload = {"jsonrpc": "2.0","method": "host.get","params": {"filter": {"host": hostname}, "output": ["hostid"]},"id": 2}result = self._send_request(payload, require_auth=True)return len(result["result"]) > 0def get_hostgroup(self, group_name: str) -> List[Dict]:"""獲取主機組ID"""payload = {"jsonrpc": "2.0","method": "hostgroup.get","params": {"filter": {"name": [group_name]},"output": ["groupid"]},"id": 3}return self._send_request(payload, require_auth=True).get("result", [])def create_hostgroup(self, group_name: str) -> str:"""創建主機組并返回ID"""payload = {"jsonrpc": "2.0","method": "hostgroup.create","params": {"name": group_name},"id": 4}result = self._send_request(payload, require_auth=True)return result["result"]["groupids"][0]def get_template(self, template_name: str) -> List[Dict]:"""獲取模板ID"""payload = {"jsonrpc": "2.0","method": "template.get","params": {"filter": {"host": [template_name]},"output": ["templateid"]},"id": 5}return self._send_request(payload, require_auth=True).get("result", [])def create_host(self, hostname: str, ip: str, groups: List[str], templates: List[str]) -> None:"""創建主機(帶完整參數校驗)"""self._validate_base_params(hostname, ip, groups, templates)if self.check_host_exists(hostname):print(f"主機 {hostname} 已存在,跳過")returngroup_ids = self._process_hostgroups(groups)template_ids = self._process_templates(templates)interface = self._build_interface(ip)payload = self._build_payload(hostname, interface, group_ids, template_ids)self._send_request(payload, require_auth=True)print(f"主機 {hostname} ({ip}) 創建成功")def _validate_base_params(self, hostname: str, ip: str, groups: List[str], templates: List[str]):"""基礎參數校驗"""if not hostname.strip():raise ValueError("主機名不可為空或僅包含空格")if not re.match(r'^[\w\-_.]+$', hostname):raise ValueError(f"主機名包含非法字符: {hostname}")if not self._is_valid_ip(ip):raise ValueError(f"無效IP地址: {ip}")if not groups or not all(groups):raise ValueError("主機組不可為空或包含空值")if not templates or not all(templates):raise ValueError("模板不可為空或包含空值")def _process_hostgroups(self, groups: List[str]) -> List[Dict]:"""處理主機組(創建不存在的組)"""group_ids = []for group in groups:group = group.strip()group_info = self.get_hostgroup(group)if not group_info:print(f"創建主機組 '{group}'...")group_id = self.create_hostgroup(group)group_ids.append({"groupid": group_id})else:group_ids.append({"groupid": group_info[0]["groupid"]})return group_idsdef _process_templates(self, templates: List[str]) -> List[Dict]:"""處理模板(校驗存在性)"""template_ids = []for template in templates:template = template.strip()template_info = self.get_template(template)if not template_info:raise Exception(f"模板 '{template}' 不存在,請檢查名稱(區分大小寫)")template_ids.append({"templateid": template_info[0]["templateid"]})return template_idsdef _build_interface(self, ip: str) -> Dict:"""構建接口參數(完整字段)"""return {"type": 1, # Zabbix Agent接口"main": 1, # 主接口"useip": 1, # 使用IP地址"ip": ip,"port": "10050", # 默認端口"dns": "", # 可選字段,顯式為空"ipmi_dns": "","username": "","password": ""}def _build_payload(self, hostname: str, interface: Dict, group_ids: List[Dict], template_ids: List[Dict]) -> Dict:"""構建完整請求參數(移除無效庫存字段)"""return {"jsonrpc": "2.0","method": "host.create","params": {"host": hostname,"interfaces": [interface],"groups": group_ids,"templates": template_ids,"inventory_mode": 0 # 禁用自動發現,無需inventory字段},"id": 6}def _is_valid_ip(self, ip: str) -> bool:"""驗證IP地址格式(支持IPv4/IPv6)"""try:ipaddress.ip_address(ip)return Trueexcept ValueError:print(f"無效IP格式: {ip}")return Falsedef main():"""主函數:批量導入主機"""try:# 初始化客戶端if USE_API_TOKEN:zabbix = ZabbixClient(url=ZABBIX_URL,use_api_token=True,token=API_TOKEN)else:zabbix = ZabbixClient(url=ZABBIX_URL,use_api_token=False,username=USERNAME,password=PASSWORD)# 驗證主機列表文件if not os.path.exists(HOSTS_FILE):raise FileNotFoundError(f"文件 {HOSTS_FILE} 不存在")# 處理主機列表with open(HOSTS_FILE, "r", encoding="utf-8") as f:for line_num, line in enumerate(f, 1):line = line.strip()if not line or line.startswith("#"):continuetry:# 解析行數據parts = line.split("#", 3)if len(parts) != 4:raise ValueError("字段數量錯誤,必須為4個(主機名#IP#主機組#模板)")hostname, ip, groups_str, templates_str = partsgroups = groups_str.split(",") if groups_str else []templates = templates_str.split(",") if templates_str else []# 創建主機zabbix.create_host(hostname, ip, groups, templates)except ValueError as ve:print(f"行 {line_num} 格式錯誤: {str(ve)},跳過")except Exception as e:print(f"行 {line_num} 處理失敗: {str(e)}")except Exception as e:print(f"\n程序終止: {str(e)}")exit(1)if __name__ == "__main__":main()print("\n批量導入完成!")
注:輸入文件 serverlist.txt 格式:
每行 主機名#IP地址#主機組1,主機組2#模板1,模板,示例:
web-server#192.168.1.1#Web Servers,Linux#Template OS Linux
db-server#192.168.1.2#DB Servers#Template OS Linux,Template MySQL