目錄
Python正則表達式re庫的基本用法
引入re庫
各函數功能
總結
使用方法舉例
正則表達式語法與書寫方式
正則表達式的常用操作符
思科ASA防火墻數據
數據1
數據2
書寫正則表達式
Python中pydantic的使用
導入基礎數據模板
根據數據采集目標定義Pydantic數據類型
數據分析采集函數?
Python正則表達式re庫的基本用法
引入re庫
import re
各函數功能
函數 | 用法 |
re.match() | 從字符串起始位置用正則表達式進行數據匹配,匹配到一個數據立即結束并返回第一個匹配到的結果,返回一個Match object對象 |
re.search() | 從字符串起始位置用正則表達式進行數據匹配,返回一個Match object對象,含有第一個匹配到數據的位置 |
re.findall() | 從字符串起始位置用自定義的正則表達式進行數據匹配,一直嘗試匹配到字符串結束,返回匹配到的所有子串,是一個列表 |
re.split() | 從字符串起始位置用自定義的正則表達式進行數據匹配,一直嘗試匹配到字符串結束,將字符串按正則表達式匹配到的結果進行分割,返回被分割的子串,是一個列表 |
re.sub() | 從字符串起始位置用自定義的正則表達式進行數據匹配,一直嘗試匹配到字符串結束,將字符串中按正則表達式匹配到的子串全部替換為自定義的內容,返回被替換后的字符串 |
re.finditer() | 從字符串起始位置用自定義的正則表達式進行數據匹配,一直嘗試匹配到字符串結束,返回一個callable_iterator object對象,可以被迭代,里面每個數據是Match object對象 |
總結
其實在數據處理中我使用re.search()居多,在使用上也很符合查找內容的慣性思維,把需要采集的數據拆分書寫成一個個短小的正則表達式,寫出的代碼也容易維護,缺點是會對字符串進行多次匹配,會浪費資源,如果設涉及大量數據的分析采集推薦使用re.match()或者是混合使用其他函數,并書寫單個較長的正則表達式可以達到節省資源的目的
使用方法舉例
data = re.match(正則表達式,需要處理的字符串,匹配選項)
在Python中正則表達式的格式為:r'...正則表達式內容...'
例如data = re.match(r'[0-9]', '123')
正則表達式語法與書寫方式
正則表達式的常用操作符
其余可自行搜索用法,本文主要涉及這幾個
符號 | 說明 |
. | 匹配除換行符(\n、\r)之外的任何單個字符 |
\d | 匹配一個數字,等價于[0-9] |
\D | 匹配一個任何非數字字符,等價于[^0-9] |
\s | 匹配一個任何空白字符,如空格 |
\S | 匹配一個任何非空白字符 |
\n | 匹配一個換行符 |
* | 匹配前一個字符0次或無限次 |
+ | 匹配前一個字符1次或無限次 |
() | 用于標明匹配組,方便用一個正則表達式匹配多個內容 |
在學習如何書寫正則表達式之前需要先了解下需要處理的數據
思科ASA防火墻數據
數據1
用Python腳本連接思科ASA防火墻執行如下命令獲取(此文章不介紹如何采集數據,數據均為樣例,和真實環境也會有出入)
show interface ip brief
?回顯內容
Interface IP-Address OK? Method Status Protocol
Ethernet0/0 192.168.1.1 YES manual up up
Ethernet0/1 10.1.1.1 YES manual up up
數據2
執行命令?
show interface detail
回顯內容?
Interface Ethernet0 "outside", is up, line protocol is upHardware is i82558, BW 100 Mbps, DLY 100 usecAuto-Duplex(Full-duplex), Auto-Speed(100 Mbps)Input flow control is unsupported, output flow control is unsupportedMAC address 001e.4f9c.2345, MTU 1500IP address 172.16.0.1, subnet mask 255.255.255.128132463 packets input, 9245212 bytes, 0 no bufferReceived 24 broadcasts, 0 runts, 0 giants0 input errors, 0 CRC, 0 frame, 0 overrun, 0 ignored, 0 abort0 pause input, 0 resume input0 L2 decode drops238982 packets output, 15789347 bytes, 0 underruns0 pause output, 0 resume output0 output errors, 0 collisions, 0 interface resets0 babbles, 0 late collisions, 0 deferred0 lost carrier, 0 no carrier0 input reset drops, 0 output reset dropsinput queue (curr/max packets): hardware (0/1) software (0/8)output queue (curr/max packets): hardware (0/9) software (0/1)Traffic Statistics for "outside":132463 packets input, 9245212 bytes238982 packets output, 15789347 bytes3258 packets dropped1 minute input rate 2 pkts/sec, 110 bytes/sec1 minute output rate 7 pkts/sec, 980 bytes/sec1 minute drop rate, 0 pkts/sec5 minute input rate 3 pkts/sec, 150 bytes/sec5 minute output rate 8 pkts/sec, 1012 bytes/sec5 minute drop rate, 0 pkts/secControl Point Interface States:Interface number is 1Interface config status is activeInterface state is activeInterface Ethernet1 "inside", is up, line protocol is upHardware is i82558, BW 100 Mbps, DLY 100 usecAuto-Duplex(Full-duplex), Auto-Speed(100 Mbps)Input flow control is unsupported, output flow control is unsupportedMAC address 001e.4f9c.6789, MTU 1500IP address 192.168.10.1, subnet mask 255.255.255.0204892 packets input, 14235129 bytes, 0 no bufferReceived 18 broadcasts, 0 runts, 0 giants0 input errors, 0 CRC, 0 frame, 0 overrun, 0 ignored, 0 abort0 pause input, 0 resume input0 L2 decode drops174562 packets output, 20982145 bytes, 0 underruns0 pause output, 0 resume output0 output errors, 0 collisions, 0 interface resets0 babbles, 0 late collisions, 0 deferred0 lost carrier, 0 no carrier0 input reset drops, 0 output reset dropsinput queue (curr/max packets): hardware (0/1) software (0/10)output queue (curr/max packets): hardware (2/10) software (0/1)Traffic Statistics for "inside":204892 packets input, 14235129 bytes174562 packets output, 20982145 bytes5124 packets dropped1 minute input rate 4 pkts/sec, 360 bytes/sec1 minute output rate 6 pkts/sec, 890 bytes/sec1 minute drop rate, 1 pkts/sec5 minute input rate 5 pkts/sec, 480 bytes/sec5 minute output rate 7 pkts/sec, 910 bytes/sec5 minute drop rate, 0 pkts/secControl Point Interface States:Interface number is 2Interface config status is activeInterface state is active
書寫正則表達式
先來一個簡單的例子,在執行了show interface ip brief后會顯示出接口狀態和其ip地址等信息
Interface IP-Address OK? Method Status Protocol
Ethernet0/0 192.168.1.1 YES manual up up
Ethernet0/1 10.1.1.1 YES manual up up
?現在假設我只對接口名稱和其對應的ip地址感興趣,想要用正則表達式將他們提取出來可以使用如下正則表達式,這個表達式還有一些小瑕疵,下面來拆解看一下他的含義:
推薦正則表達式輔助書寫和練習的工具:regex101: build, test, and debug regex
(\S+)\s*(\S+).*?
其中被"()"括起來的內容為我們想要捕獲的數據,獲取第一個"()"中的內容可對返回的對象使用.group(1)方法來獲取以此類推獲取第二個可用.group(2)來獲取
\S匹配一個任何非空白字符,后面添加“+”符號代表連續匹配1個以上的非空白字符:
\s匹配一個任何空白字符,如空格,后面加“*”代表連續匹配0個以上的非空白字符:
將這兩者組合起來:
發現除了名稱和ip之外還匹配到了其他內容,可以在后面加上“.*”直接代替其余內容直到出現換行符\n:
發現還有一個小細節沒考慮到,會匹配到上面的描述字段“Interface” 、“IP-Address”,這時可以觀察感想要匹配數據的特征來做一個排除,“Ethernet0/0”可以拆分成多個字符+一個數字的形式,即拆分成:“Ethernet0/”和“0”,其余數據也適用,都可以堪稱一個字符+結尾一個數字的格式,這時可以做一下修改,把“(\S+)”修改為“(\S+\d)”:
這樣就可以成功獲取到想要的數據了,可以編寫python代碼來試驗一下:
import redata = """
Interface IP-Address OK? Method Status Protocol
Ethernet0/0 192.168.1.1 YES manual up up
Ethernet0/1 10.1.1.1 YES manual up up
"""interface_match = re.search(r'(\S+\d)\s*(\S+).*', data)print(interface_match.group(1))
print(interface_match.group(2))
輸出結果:
?
re.search方法會在查找到第一個數據后停止匹配,返回第一次匹配到的結果,如果想要捕獲所有的Interface數據,可以先將data按行進行分割再進行匹配:
import redata = """
Interface IP-Address OK? Method Status Protocol
Ethernet0/0 192.168.1.1 YES manual up up
Ethernet0/1 10.1.1.1 YES manual up up
"""data_list = data.split('\n')
for line in data_list:interface_match = re.search(r'(\S+\d)\s*(\S+).*', line)if interface_match:print(interface_match.group(1))print(interface_match.group(2))
輸出結果:
?
Python中pydantic的使用
導入基礎數據模板
from pydantic import BaseModel
一個簡單的例子
pydantic在python開發中可以將數據類型明確,也方便對數據的校驗
from pydantic import BaseModelclass User(BaseModel):id: intname: strpermission: list[str]user = User(id=123456, name='admin',permission=['can_add_student','can_edit_student'])print(user)
根據數據采集目標定義Pydantic數據類型
# 定義Pydantic數據類型
class Count(BaseModel):packets: intpackets_rate: strbytes: intbytes_rate: strclass Input(Count):passclass Output(Count):passclass Dropped(Count):passclass InterfaceStatus(BaseModel):input_status: Inputoutput_status: Outputdrop_status: Droppedclass InterfaceBaseInfo(BaseModel):name: strstatus: strprotocol_status: strip_address: strsubnet_mask: strmac_address: strbandwidth: strdelay: strmtu: inthardware_type: strclass Interface(BaseModel):base_info: InterfaceBaseInfodetail_info: InterfaceStatusclass ASAData(BaseModel):outside_interface: Interfaceinside_interface: Interface
數據分析采集函數?
和上面的思路一樣,只是多了很多需要匹配的數據
def get_asa_data():# 聲明interfaces = {}# 處理interface_detail返回數據for paragraph in interface_detail.split("Interface"):if not paragraph.strip():continue# 正則匹配base_infoname_match = re.search(r'(\S+)\s"(\S+)",', paragraph)mac_mtu_match = re.search(r'MAC\saddress\s+(\S+),\sMTU\s(\d+)', paragraph)ip_match = re.search(r'IP\saddress\s(\S+),.*mask\s(\S+)', paragraph)hardware_match = re.search(r'Hardware\sis\s(.*),\sBW\s(.*),\sDLY\s(.*)', paragraph)if not name_match or not mac_mtu_match or not ip_match or not hardware_match:continuebase_info = {"name": name_match.group(1),"status": "None", # 先做初始化,后續再從interface_ip_brief中獲取"protocol_status": "None","ip_address": ip_match.group(1),"subnet_mask": ip_match.group(2),"mac_address": mac_mtu_match.group(1),"bandwidth": hardware_match.group(2),"delay": hardware_match.group(3),"mtu": int(mac_mtu_match.group(2)),"hardware_type": hardware_match.group(1),}# 正則匹配detail_infoinput_match = re.search(r'(\d+)\spackets\sinput,\s(\d+)\sbytes', paragraph)input_rate_match = re.search(r'5\sminute\sinput\srate\s(.*),\s+(.*)', paragraph)output_match = re.search(r'(\d+)\spackets\soutput,\s(\d+)\sbytes', paragraph)output_rate_match = re.search(r'5\sminute\soutput\srate\s(.*),\s+(.*)', paragraph)drop_match = re.search(r'(\d+)\spackets\sdropped', paragraph)drop_rate_match = re.search(r'5\sminute\sdrop\srate,\s(.*)', paragraph)detail_info = {"input_status": Input(packets = int(input_match.group(1)) if input_match else -1,bytes = int(input_match.group(2)) if input_match else -1,packets_rate = input_rate_match.group(1) if input_rate_match else "None",bytes_rate = input_rate_match.group(2) if input_rate_match else "None",),"output_status": Output(packets = int(output_match.group(1)) if output_match else -1,bytes = int(output_match.group(2)) if output_match else -1,packets_rate = output_rate_match.group(1) if output_rate_match else "None",bytes_rate = output_rate_match.group(2) if output_rate_match else "None",),"drop_status": Dropped(packets = int(drop_match.group(1)) if drop_match else -1,bytes = -1,packets_rate = drop_rate_match.group(1) if drop_rate_match else "None",bytes_rate = "None",),}interface_name = name_match.group(2)interfaces[interface_name] = Interface(base_info=InterfaceBaseInfo(**base_info),detail_info=InterfaceStatus(**detail_info),)# 處理interface_ip_brie返回數據for line in interface_ip_brief.strip().split("\n"):match = re.search(r'(\S+)\s+(\S+)\s+\S+\s+\S+\s+(\S+)\s+(\S+)', line)if match:name, ip, status, protocol_status = match.groups()for interface in interfaces.values():if interface.base_info.ip_address == ip:interface.base_info.status = statusinterface.base_info.protocol_status = protocol_statusasa_data = ASAData(outside_interface=interfaces.get("outside"),inside_interface=interfaces.get("inside"),)# 將數據格式化為json返回return(asa_data.json())
?打印輸出結果
# 打印輸出結果
print(get_asa_data())