Python-正則表達式-信息提取-滑動窗口-數據分發-文件加載及分析器-瀏覽器分析-學習筆記

欠4前年的一份筆記 ,獻給今后的自己。

正則表達式

概述

正則表達式,Regular Expression,縮寫為regex、regexp、RE等。

正則表達式是文本處理極為重要的技術,用它可以對字符串按照某種規則進行檢索、替換。

1970年代,Unix之父Ken Thompson將正則表達式引入到Unix中文本編輯器ed和grep命令中,由此正則表達式普及未來。

1980年后,perl語言對Henry Spencef編寫的庫,擴展了很多新的特性。1997年開始,Philip Hazel開發出了PCRE (Perl Compatible Regular Expressions),它被PHP和HTTPD等工具采用。

正則表達式應用極其廣泛,shell中處理文本的命令、各種高級編程語言都支持正則表達式。

參考 https://www.w3cschool.cn/regex_rmjc/

分類

  1. BRE

基本正則表達式,grep、sed、vi等軟件支持。vim有擴展。

  1. ERE

擴展正則表達式,egrep(grep-E)、sed-r等。

  1. PCRE

幾乎所有高級語言都是PCRE的方言或者變種。Python從1.6開始使用SRE正則表達式引擎, 可以認為是PCRE的子集,見模塊re。

基本語法

元字符 metacharacter

在這里插入圖片描述

轉義

凡是在正則表達式中有特殊意義的符號,如果想使用它的本意,請使用\轉義。反斜杠自身,得使用\\
\r,\n還是轉義后代表回車、換行

重復

在這里插入圖片描述

練習:

1、匹配手機號碼

字符串為“手機號碼13851888188。”

2、匹配中國座機

字符串為“號碼025-83105736、0543-5467328。”

  1. \d{11}
  2. \d{3,4)-1d{7,8)

在這里插入圖片描述

注意:
斷言會不會捕獲呢?也就是斷言占不占分組號呢?
斷言不占分組號。斷言如同條件,只是要求匹配必須滿足斷言的條件。
分組和捕獲是同一個意思
使用正則表達式時,能用簡單表達式,就不要復雜的表達式
貪婪與非貪婪
默認是貪婪模式,也就是說盡量多匹配更長的字符串。
非貪婪很簡單,在重復的符號后面加上一個?問號,就盡量的少匹配了。

在這里插入圖片描述

very very happy 使用v.*y和v.*?y

引擎選項

在這里插入圖片描述
單行模式:
. 可以匹配所有字符,包括換行符。
^表示整個字符串的開頭,$整個字符串的結尾
多行模式:
.可以匹配除了換行符之外的字符。
^ 表不行首,$行尾
^表示整個字符串的開始,$表示整個字符串的結尾。開始指的是\n后緊接著下一個字符,結束
指的是/n前的字符
可以認為,單行模式就如同看穿了換行符,所有文本就是一個長長的只有一行的字符串,所有^就是這一行字符串的行首,$就是這一行的行尾。
多行模式,無法穿透換行符,^和$還是行首行尾的意思,只不過限于每一行
注意:注意字符串中看不見的換行符,\r\n會影響e$的測試,e$只能匹配e\n

舉例
very very happy
harry key
上面2行happy之后,有可能是\r\n結尾。
y$ 單行匹配key的y,多行匹配happy和key的y。

練習
1、匹配一個0~999之間的任意數字

1
12
995
9999
102
02
003
4d\d    							1位數
[1-9]?\d    					1-2位數
^([1-9]\d\d?|\d)				1-3位數
^([1-9]\d\d?|\d)$				1-3位數的行
^([1-9]\d\d?|\d)\r?$
^([1-9]\d\d?|\d)(?!\d) 			數字開頭1-3位數且之后不能是數字

2、IP地址
匹配合法的IP地址

192.168.1.150
0.0.0.0
255.255.255.255
17.16.52.100
172.16.0.100
400.400.999.888
001.022.003.000
257.257.255.256

((\d{1,3}).)<3}(\d{1,3})
(?:(\d{1,3)). 3}(\d{1,3}) # 400.400.999.888

對于ip地址驗證的問題

  • 可以把數據提出來后,交給IP地址解析庫 處理,如果解析異常,就說明有問題,正則的驗
    證只是一個初步的篩選,把明顯錯誤過濾掉。
  • 可以使用復雜的正則表達式驗證地址正確性
  • 前導0是可以的
import socketnw = socket.inet_aton('192.168.05.001')
print(nw, socket.inet_ntoa(nw))分析:
每一段上可以寫的數字有10100100023023230100,也就說1位就是0-92位每一位
也是0-9,3位第一位只能0-2,其余2位都可以0-9
(?:([e-2]\d{2|\d{1,2})\.){3}([0-2]\d{2|\d{1,2})# 解決超出200的問題,但是256呢?
200是特殊的,要再單獨分情況處理
250-5|20-4]\d|01]?\d\d?這就是每一段的邏輯
(?: (25[0-5]|2[0-4]\d|[01]?\d\d?)1.){3)(25[0-5]|2[0-4]\d|[01]?\d\d?)

3、選出含有ftp的鏈接,且文件類型是gz或者xz的文件名

ftp://ftp.astron.com/pub/file/file-5.14.tar.gz
ttp://ftp.gmplib.org/pub/gmp-5.1.2/gmp-5.1.2.tar.xz
ttp://ftp.vim.org/pub/vim/unix/vim-7.3.tar.bz2
http://anduin.linuxfromscratch.org/sources/LFS/1fs-packages/conglomeration//iana-etc/iana-etc-2.30.tar.bz2
http://anduin.linuxfromscratch.org/sources/other/udev-1fs-205-1.tar.bz2
http://download.savannah.gnu.org/releases/1ibpipeline/libpipeline-1.2.4.tar.gz
http://download.savannah.gnu.org/releases/man-db/man-db-2.6.5.tar.xz
http://download.savannah.gnu.org/releases/sysvinit/sysvinit-2.88dsf.tar.bz2
http://ftp.altlinux.org/pub/people/legion/kbd/kbd-1.15.5.tar.gz
http://mirror.hust.edu.cn/gnu/autoconf/autoconf-2.69.tar.xz
http://mirror.hust.edu.cn/gnu/automake/automake-1.14.tar.xz

.*ftp.*\.(?:gz|xz) #
ftp.*/(.*(?:gz|xz))
\.*ftp.*/([^/]*\.(?:gz|xz))# 捕獲文件名分組
(?<=.*ftp.*/)[^/]*\.(?:gz|xz)# 斷言文件名前一定還有ftp

Python的正則表達式

Python使用re模塊提供了正則表達式處理的能力。

在這里插入圖片描述

方法

編譯

re.compile(pattern, flags=0)
設定flags,編譯模式,返回正則表達式對象regex。
pattern就是正則表達式字符串,flags是選項。正則表達式需要被編譯,為了提高效率,這些編譯后的結果被保存,下次使用同樣的pattern的時候,就不需要再次編譯。
re的其它方法為了提高效率都調用了編譯方法,就是為了提速。

單次匹配

re.match(pattern, string, flags=0)

regex.match(string[, pos[, endpos]])

match匹配從字符串的開頭匹配,regex對象match方法可以重設定開始位置和結束位置。返回match對象
re.search(pattern, string, flags=0)
regex. search(string[, pos[, endpos]])

從頭搜索直到第一個匹配,regex對象search方法可以重設定開始位置和結束位置,返回match對象

re.fullmatch(pattern, string, flags=0)

regex.fullmatch(string[, pos[, endpos]])

整個字符串和正則表達式匹配

import res = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')print()# (0, 'b') (1, 'o') (2, 't') (3, 't') (4, 'l') (5, 'e') (6, '\n') (7, 'b')
# (8, 'a') (9, 'g') (10, '\n') (11, 'b') (12, 'i') (13, 'g') (14, '\n') (15, 'a')
# (16, 'p') (17, 'p') (18, 'l') (19, 'e')# match方法
print('--match--')
result = re.match('b', s)  # 找到一個就不找了
print(1, result)  # <re.Match object; span=(0, 1), match='b'> 
result = re.match('a', s)  # 沒找到,返回None
print(2, result)     # None
result = re.match('^a', s, re.M)  # 依然從頭開始找,多行模式沒有用
print(3, result)        # None
result = re.match('^a', s, re.S)  # 依然從頭開始找print(4, result)        # None
# 先編譯,然后使用正則表達式對象
regex = re.compile('a')
result = regex.match(s)  # 依然從頭開始找
print(5, result)        # None
result = regex.match(s, 15)  # 把索引15作為開始找
print(6, result)  # <re.Match object; span=(15, 16), match='a'>
print()
# search方法
print('--search--')
result = re.search('a', s)  # 掃描找到匹配的第一個位置
print(7, result)  # apple       # <re.Match object; span=(8, 9), match='a'>
regex = re.compile('b')
result = regex.search(s, 1)     print(8, result)  # bag     # <re.Match object; span=(7, 8), match='b'>
regex = re.compile('^b', re.M)result = regex.search(s)  # 不管是不是多行,找到就返回
print(8.5, result)  # bottle    # <re.Match object; span=(0, 1), match='b'>
result = regex.search(s, 8)
print(9, result)  # big     # <re.Match object; span=(11, 12), match='b'>
# fullmatch/j?Z
result = re.fullmatch('bag', s)
print(10, result)       # None
regex = re.compile('bag')
result = regex.fullmatch(s)     
print(11, result)       # None
result = regex.fullmatch(s, 7)
print(12, result)       # None
result = regex.fullmatch(s, 7, 10)
print(13, result)  # 要完全匹配,多了少了都不行,[7,10)   <re.Match object; span=(7, 10), match='bag'>

全文搜索

re.findall (pattern, string, flags=0)
regex.findall(string[, pos[, endpos]])
對整個字符串,從左至右匹配,返回所有匹配項的列表
re.finditer(pattern, string, flags=0)
regex.finditer(string[, pos[, endpos]])
對整個字符串,從左至右匹配,返回所有匹配項,返回迭代器。
注意每次迭代返回的是match對象。

import res = '''bottle\nbag\nbig\nable'''
for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
# (0, 'b') (1, 'o') (2, 't') (3, 't') (4, 'l') (5, 'e') (6, '\n') (7, 'b')
# (8, 'a') (9, 'g') (10, '\n') (11, 'b') (12, 'i') (13, 'g') (14, '\n') (15, 'a')
# (16, 'b') (17, 'l') (18, 'e') 
print()
# findal1方法result = re.findall('b', s)
print(1, result)        # ['b', 'b', 'b', 'b']
regex = re.compile('^b')
result = regex.findall(s)
print(2, result)        # ['b']
regex = re.compile('^b', re.M)
result = regex.findall(s, 7)
print(3, result)  # bag big     ['b', 'b']regex = re.compile('^b', re.S)result = regex.findall(s)
print(4, result)  # bottle      ['b']
regex = re.compile('^b', re.M)                  
result = regex.findall(s, 7, 10)
print(5, result)  # bag   ['b']
# finditer E
result = regex.finditer(s)
print(type(result))             # <class 'callable_iterator'>
print(next(result))             # <re.Match object; span=(0, 1), match='b'>
print(next(result))             # <re.Match object; span=(7, 8), match='b'>

匹配替換

re.sub(pattern, replacement, string, count=0, flags=0)

regex.sub(replacement, string, count=0)

使用pattern對字符串string進行匹配,對匹配項使用repl替換。

replacement可以是 string, bytes, function.

re.subn(pattern, replacement, string, count=0, flags=0)

regex.subn(replacement, string, count=0)

同sub返回一個元組(new_string,number_of_subs_made)

import re
s = '''bottle\nbag\nbig\napple'''
for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')
print()# 替換方法
regex = re.compile('b\wg')
result = regex.sub('magedu', s)
print(1, result)  # 被替換后的字符串
result = regex.sub('magedu', s, 1)  # 替換1次
print(2, result)  # 被替換后的字符串
regex = re.compile('\s+')
result = regex.subn('\t', s)
print(3, result)  # 被替換后的字符串及替換次數的元組

分割字符串

字符串的分割函數,太難用,不能指定多個字符進行分割。

re.split(pattern, string, maxsplit=0, flags=0)

re.split分割字符串

import res = '''01 bottle
02 bag
03 big1
100 able'''for i,c in enumerate(s, 1):print((i-1, c), end='\n' if i%8==0 else ' ')print()# 把每行單詞提取出來print(s.split()) # 做不到 ['01', 'bottle', '02', 'bag', '03', 'big1', '100', 'able']result = re.split('[\s\d]+',s)print(1, result) # ['', 'bottle', 'bag', 'big', 'able']regex = re.compile('^[\s\d]+') # 字符串首result = regex.split(s)print(2, result) # '', 'bottle\n02 bag\n03 big1\n100 able']regex = re.compile('^[\s\d]+', re.M) # 行首result = regex.split(s)print(3, result) # ['', 'bottle\n', 'bag\n', 'big1\n', 'able']regex = re.compile('\s+\d+\s+')result = regex.split(' ' + s)print(4, result)

分組

使用小括號的pattern捕獲的數據被放到了組group中。

match、search函數可以返回match對象;findall這回字符串列表;finditer返回一個個match對象

如果pattern中使用了分組,如果有匹配的結果,會在match對象中

1、使用group(N)方式返回對應分組,1-N是對應的分組,0返回整個匹配的字符串
2、如果使用了命名分組,可以使用group(‘name’) 的方式取分組
3、也可以使用groups() 返回所有組
4、使用groupdict() 返回所有命名的分組

import res = '''bottle\nbag\nbig\napple'''for i, c in enumerate(s, 1):print((i - 1, c), end='\n' if i % 8 == 0 else ' ')print()# 分組
regex = re.compile('(b\w+)')
result = regex.match(s)
print(type(result))
print(1, 'match', result.groups())
result = regex.search(s, 1)
print(2, 'search', result.groups())  ## 命名分組regex = re.compile('(b\w+)\n(?P<name2>b\w+)\n(?P<name3>b\w+)')result = regex.match(s)print(3, 'match', result)print(4, result.group(3), result.group(2), result.group(1))print(5, result.group(0).encode())  # 0 返回整個匹配字符串print(6, result.group('name2'), result.group('name3'))print(6, result.groups())print(7, result.groupdict())result = regex.findall(s)for x in result:  # 字符串列表print(type(x), x)regex = re.compile('(?P<head>b\w+)')result = regex.finditers()for x in result:print(type(x), x, x.group(), x.group('head'))

練習

匹配郵箱地址

test@hot-mail.com
v-ipamagedu.com
web.manager@magedu. com.cn
super.user@google.com
a@w-a-com

匹 html標記內的內容

<a href= http://www.magedu.com/index.html' target='_blank'>馬哥教育</a>

匹配URL

http://www.magedu.com/index.html
https://login.magedu.com
file:///ect/sysconfig/network

匹配二代中國身份證ID

321105700101003
321105197001010030
11210020170101054X
17位數字+1位校驗碼組成

前6位地址碼,8位出生年月,3位數字,1位校驗位(0-9或X)

判斷密碼強弱

要求密碼必須由 10-15位 指定字符組成:

十進制數字
大寫字母
小寫字母
下劃線
要求四種類型的字符都要出現才算合法的強密碼

例如:Aatb32_67mq,其中包含大寫字母、小寫字母、數字和下劃線,是合格的強密碼

單詞統計 word count

對sample文件進行單詞統計,要求使用正則表達式

參考

郵箱


\w+[-.\w]*@[\w-]+(\.[\w-]+)+html提取<[^<>]+>(.*)<[^<>]+>如果要匹配標記a<(\w+)\s+[^<>]+>(.*)(</\1>)URL提取
(wt)://([^\s]+)身份證驗證
身份證驗證需要使用公式計算,最嚴格的應該實名驗證。\d{17)[0-9xX]|\d{15}強密碼Aatb32_67mnq
Aatb32_67m.nq
中國是一個偉大的國家aA_810-15位,其中包含大寫字母、小寫字母、數字和下劃線
^\w{10,15}$如果測試有不可見字符干擾使用^w{10,15}\r?$看似正確,但是,如果密碼有中文呢?^[a-zA-Z0-9_]{10,15}$但是還是沒有解決類似于 111111111112 這種密碼的問題,如何解決?
需要用到一些非正則表達式的手段。利用判斷來解決,思路如下:
1、可以判斷當前密碼字符串中是否有\W,如果出現就說明一定不是合法的,如果不出現說明合法
2、對合法繼續判斷,如果出現過_下劃線,說明有可能是強密碼,但是沒有下劃線說明一定不是強密碼。
3、對包含下劃線的合法密碼字符串繼續判斷,如果出現過\\d的,說明有可能是強密碼,沒有出現\\d的一定不是強密碼
4、對上一次的包含下劃線、數字的合法的密碼字符串繼續判斷,如果出現了[A-Z]說明有可能是 強密碼,沒有出現[A-Z]說明一定不是強密碼
5、對上一次包含下劃線、數字、大寫字母的合法密碼字符串繼續判斷,如果出現了[a-z]說明就是強密碼,找到了,沒有出現小寫字母就一定不是強密碼。請注意上面的判斷的順序,應該是概率上最可能不出在密碼字符申的先判斷。

單詞統計

from collections import defaultdictimport redef makekey2(line:str, chars=set("""!'"#./\()[],*- \r\n""")):start = 0for i, c in enumerate(line):if c in chars:if start == i:  # 如果緊挨著還是特字符,start一定等丁istart += 1  # 加1并continuecontinueyield line[start:i]start = i + 1  # 加1是跳過這個不需要的特珠字符celse:if start < len(line):  # 小于,說明還有有效的字符,而且一直到末尾yield line[start:]# ''"'I\host\mount splitext('.cshrc splitdrive("//host/computer/dir . abc path s\r\n
regex = re.compile('[^\w-]+')def makekey3(line: str):for word in regex.split(line):if len(word):yield worddef wordcount(filename, encoding='utf8', ignore=set()):d = defaultdict(lambda: 0)with open(filename, encoding=encoding) as f:for line in f:for word in map(str.lower, makekey2(line)):if word not in ignore:d[word] += 1return ddef top(d: dict, n=10):for i, (k, v) in enumerate(sorted(d.items(), key=lambda item: item[1], reverse=True)):if i > n:breakprint(k, v)# 單詞統計前兒名
top(wordcount('sample', ignore={'the', 'a'}))

滑動窗口

數據載入

對于本項目來說,數據就是日志的一行行記錄,載入數據就是文件10的讀取。將獲取數據的方法封裝成函數。

def load(path):""""裝載日志文件"""with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失敗就拋棄,或者打印日志

時間窗口分析

概念

很多數據,例如日志,都是和時間相關的,都是按照時間順序產生的。產生的數據分析的時候,要按照時間求值
interval 表示每一次求值的時間間隔
width 時間窗口寬度,指的一次求值的時間窗口寬度

當 width > interval
在這里插入圖片描述

數據求值時會有重疊

width = interval

在這里插入圖片描述

數據求值沒有重疊
當 width < interval
一般不采納這種方案,會有數據缺失

時序數據

運維環境中,日志、監控等產生的數據都是與時間相關的數據,按照時間先后產生并記錄下來的
數據,所以一般按照時間對數據進行分析。

數據分析基本程序結構

無限的生成隨機數函數,產生時間相關的數據,返回時間和隨機數的字典
每次取3個數據,求平均值。

import random
import datetime
import timedef source():while True:yield {'value': random.randint(1, 100), 'datetime': datetime.datetime.now()}time.sleep(1)# 獲取數據
s = source()
items = [next(s) for _ in range(3)]# 處理函數
def handler(iterable):return sum(map(lambda item: item['value'], iterable)) / len(iterable)print(items)
print("{:.2f}".format(handler(items)))# 輸出:
# [{'value': 52, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 57, 876429)}, {'value': 5, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 58, 880602)}, {'value': 91, 'datetime': datetime.datetime(2025, 7, 10, 23, 3, 59, 885730)}]
# 49.33

上面代碼模擬了,一段時間內產生了數據,等了一段固定的時間取數據來計算平均值。

窗口函數實現

將上面的獲取數據的程序擴展為window函數。使用重疊的方案。

import random
import datetime
import timedef source(second=1):"""生成數據"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)def window(iterator, handler, width: int, interval: int):""":param iterator:數據源,生成器,用來拿數據:param handler:數據處理函數:param width:時間窗口寬度,秒:param interval:處理時間間隔,秒"""start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待計算數據delta = datetime.timedelta(seconds=width - interval)while True:# 從數據源獲取數據data = next(iterator)if data:buffer.append(data)  # 存入臨時緩沖等待計算current = data['datetime']# 每隔interval計算buffer中的數據一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{:.2f}'.format(ret))start = current# 清除超出width的數據buffer = [x for x in buffer if x['datetime'] > current - delta]def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)window(source(), handler, 10, 5)

時間的計算

在這里插入圖片描述

數據分發

分發
生產者消費者模型
對于一個監控系統,需要處理很多數據,包括日志。對其中已有數據的采集、分析。
被監控對象就是數據的生產者producer,數據的處理程序就是數據的消費者consumer。
生產者消費者傳統模型

在這里插入圖片描述

傳統的生產者消費者模型,生產者生產,消費者消費。但這種模型有些問題
開發的代碼耦合太高,如果生成規模擴大,不易擴展,生產和消費的速度很難匹配等。
思考一下,生產者和消費者的問題是什么?
舉例:
賣包子的,如果包子賣不完,還要繼續蒸包子,會怎么樣?門可羅雀,包子成山。
如果把包子先蒸一些,賣著,快賣完了,趕緊包,再蒸一些。不會有等包子的隊伍。
如果包子供不應求,還沒有和面呢,包子都被預定了,出現排隊等包子的情況。
上面這些情況,最核心的問題,就是生產者和消費者速度要匹配的問題。
但是,往往速度不能夠很好的匹配。
解決的辦法–隊列queue。
作用–解耦、緩沖。

在這里插入圖片描述

日志生產者往往會部署好幾個程序,日志產生的也很多,而消費者也會有多個程序,去提取日志

分析處理。
數據的生產是不穩定的!會造成短時間數據的“潮涌”,需要緩沖。
消費者消費能力不一樣,有快有慢,消費者可以自己決定消費緩沖區中的數據。
單機可以使用queue內建的模塊構建進程內的隊列,滿足多個線程間的生產消費需要。
大型系統可以使用第三方消息中間件:RabbitMQ、RocketMQ、Kafka

queue模塊–隊列

queue模塊提供了一個先進先出的隊列Queue。
queue. Queue(maxsize=0)
創建FIFO隊列,返回Queue對象。
maxsize 小于等于0,隊列長度沒有限制。
Queue.get(block=True, timeout=None)
從隊列中移除元素并返回這個元素。
block 為阻塞,timeout為超時。
如果block為True,是阻塞,timeout為None就是一直阻塞。
如果block為True但是timeout有值,就阻塞到一定秒數拋出Empty異常。
block為False,是非阻塞,timeout將被忽略,要么成功返回一個元素,要么拋出empty異常。
Queue.get_nowait()
等價于 get(False),也就是說要么成功返回一個元素,要么拋出empty異常。
但是queue的這種阻塞效果,需要多線程的時候演示。
Queue.put(item, block=True, timeout=None)
把一個元素加入到隊列中去。
block=True,timeout=None,一直阻塞直至有空位放元素。
block=True, timeout=5,阻塞5秒就拋出Full異常。
block=False, timeout失效,立即返回,能塞進去就塞,不能則返回拋出Full異常。
Queue.put_nowait(item)
等價于 put(item, False),也就是能塞進去就塞,不能則返回拋出Full異常。

# Queue測試
from queue import Queue
import randomq = Queue()q.put(random.randint(1, 100))
q.put(random.randint(1, 100))print(q.get())
print(q.get())# print(q.get())# 阻塞
# Traceback (most recent call last):
#   File "/Users/quyixiao/pp/python_lesson/function1/function15.py", line 14, in <module>
#     print(q.get(timeout=3))  # 阻塞,但超時拋異常
#           ~~~~~^^^^^^^^^^^
#   File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/queue.py", line 212, in get
#     raise Empty
# _queue.Empty
print(q.get(timeout=3))  # 阻塞,但超時拋異常

分發器的實現

生產者(數據源)生產數據,緩沖到消息隊列中
數據處理流程:
數據加載 -》提取 -》分析(滑動窗口函數)
處理大量數據的時候,對于一個數據源來說,需要多個消費者處理。但是如何分配數據就是個問題了。
需要一個分發器(調度器),把數據分發給不同的消費者處理。
每一個消費者拿到數據后,有自己的處理函數。所以要有一種注冊機制

數據加載–》提取–》分發–》分析函數1
??????????????????????????????????????????|----》分析函數2

分析1和分析2是不同的handler,不同的窗口寬度、間隔時間
如何分發?
這里就簡單一點,輪詢策略。
一對多的副本發送,一個數據通過分發器,發送到n個消費者。
消息隊列
在生產者和消費者之間使用消息隊列,那么所有消費者共用一個消息隊列,還是各自擁有一個消息隊列呢?
共用一個消息隊列也可以,但是需要解決爭搶的問題。相對來說每一個消費者自己擁有一個隊列,較為容易。

如何注冊?
在調度器內部記錄有哪些消費者,每一個消費者擁有自己的隊列。
線程

由于一條數據會被多個不同的注冊過的handler處理,所以最好的方式是多線程。

import threading# 定義線程
# target線程中運行的函數;args這個函數運行時需要的實參的元組
t = threading.Thread(target=window, args=(src, handler, width, interval))# 啟動線程
t.start()

分發器代碼實現

import random
import datetime
import time
from queue import Queue
import threadingdef source(second=1):"""生成數據"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)def window(src: Queue, handler, width: int, interval: int):"""窗口函數:param src:數據源,緩存隊列,用來拿數據:param handler:數據處理函數:param width:時間窗口寬度,秒:param interval:處理時間間隔,秒"""start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待計算數據delta = datetime.timedelta(seconds=width - interval)while True:# 從數據源獲取數據data = src.get()if data:buffer.append(data)  # 存入臨時緩沖等待計算current = data['datetime']# 每隔interval計算buffer中的數據一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{:.2f}'.format(ret))start = current# 清除超出width的數據buffer = [x for x in buffer if x['datetime'] > current - delta]def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)def dispatcher(src):# 分發器中記錄handler,同時保存各自的隊列handlers = []queues = []def reg(handler, width: int, interval: int):"""注冊 窗口處理函數:param handler:注冊的數據處理函數:param width:時間窗口寬度:param interval:時間間隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 啟動線程處理數據for item in src:  # 將數據源取到的數據分發到所有隊列中for q in queues:q.put(item)return reg, runreg, run = dispatcher(source())
reg(handler, 10, 5)  # 注冊
run()  # 運行

注意,以上代碼也只是現階段所學知識的一種實現,項目中建議使用消息隊列服務的“訂閱”模式,消費者各自消費自己的隊列的數據。

日志分析

概述
生成中會生成大量的系統日志、應用程序日志、安全日志等等日志,通過對日志的分析可以了解服務器的負載、健康狀況,可以分析客戶的分布情況、客戶的行為,甚至基于這些分析可以做出預測。
一般采集流程

日志產出 采集(Logstash、Flume、Scribe) >存儲>分析 >存儲(數據庫、NoSQL) ->可視化
開源實時日志分析ELK平臺
Logstash收集日志,并存放到ElasticSearch集群中,Kibana則從ES集群中查詢數據生成圖表,返回瀏覽器端

分析的前提

半結構化數據日志是半結構化數據,是有組織的,有格式的數據。可以分割成行和列,就可以當做表理解和處理了,當然也可以分析里面的數據。

文本分析

日志是文本文件,需要依賴文件10、字符串操作、正則表達式等技術。
通過這些技術就能夠把日志中需要的數據提取出來。

183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"

這是最常見的日志,nginx、tomcat等WEB Server都會產生這樣的日志。如何提取出數據?這里面每一段有效的數據對后期的分析都是必須的。

提取數據
一、空格分割
with open('xxx.log') as f:for line in f:for field in line.split():print(field)

缺點:
數據并沒有按照業務分割好,比如時間就被分開了,URL相關的也被分開了,User Agent的空格多,被分割了。

所以,定義的時候不選用這種在filed中出現的字符就可以省很多事,例如使用’\x01’這個不可見的

ASCII,print(‘\x01’)試一試
能否依舊是空格分割,但是遇到雙引號、中括號特殊處理一下?
思路:

先按照空格切分,然后一個個字符迭代,但如果發現是[或者”,則就不判斷是否空格,直到]或

者”結尾,這個區間獲取的就是時間等數據。

line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''CHARS = set("\t")def makekey(line: str):start = 0skip = Falsefor i, c in enumerate(line):if not skip and c in '"[':  # [或第一個引號start = i + 1skip = Trueelif skip and c in '"]':  # 第二個引號 或]skip = Falseyield line[start: i]start = i + 1continueif skip:  # 如果遇到[或 第一個引號就跳過continueif c in CHARS:if start == i:start = i + 1continueyield line[start:i]start = i + 1else:if start < len(line):yield line[start:]print(list(makekey(line)))輸出:
['19/Feb/2013:10:23:29 +0800', 'GET /o2o/media.html?menu=3 НТТР/1.1', '-', 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)']

類型轉換
fields中的數據是有類型的,例如時間、狀態碼等。對不同的field要做不同的類型轉換,甚至是自
定義的轉換
時間轉換
19/Feb/2013:10:23:29 +0800 對應格式是
%d/%b/%Y:%H:%M:%S %z

import datetimedef convert_time(timestr):return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')

可以得到
lambda timestr: datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’)
狀態碼和字節數
都是整型,使用int函數轉換
請求信息的解析

GET /020/media.html?menu=3 HTTP/1.1
method url protocol 三部分都非常重要

def get_request(request:str):return dict(zip(['method', 'url', 'protocol'], request.split()))

lambda request: dict(zip(‘method’, ‘url’, protocol’),request.split0))

映射
對每一個字段命名,然后與值和類型轉換的方法對應。解析每一行是有順序的。

import datetimeline = '''66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"'''CHARS = set(" \t")def makekey(line: str):start = 0skip = Falsefor i, c in enumerate(line):if not skip and c in '"[':start = i + 1skip = Trueelif skip and c in '"]':skip = Falseyield line[start:i]start = i + 1continueif skip:continueif c in CHARS:if start == i:start = i + 1continueyield line[start:i]start = i + 1else:if start < len(line):yield line[start:]names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent')ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),lambda request: dict(zip(['method', 'url', 'protocol'], request.split())),int, int, None, None)def extract(line: str):return dict(map(lambda item:(item[0], item[2](item[1]) if item[2] is not None else item[1]), zip(names, makekey(line), ops)))print(extract(line))輸出:
{'remote': '66.249.69.131', '': '-', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'request': {'method': 'GET', 'url': '/robots.txt', 'protocol': 'HTTP/1.1'}, 'status': 404, 'length': 162, 'useragent': 'Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)'}

二、正則表達式提取
構造一個正則表達式提取需要的字段,改造extract函數、names和ops

import  datetimenames = ('remote', 'datetime', 'method', 'url', 'protocol', 'status', 'length', 'useragent')ops = (None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
None, None, None, int, int, None)pattern = r'''([\d.]{7,}) - - \[([/\w +:]+)\] "(\w+) (\S+) ([\w/\d.]+)" (\d+) (\d+).+ "(.+)"'''

能夠使用命名分組呢?進一步改造pattern為命名分組,ops也就可以和名詞對應了,names就沒有必要存在了


PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''regex = re.compile(PATTERN)ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}

改造后的代碼

import datetimeimport reline = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''pattern = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}regex = re.compile(pattern)def extract(line: str) -> dict:matcher = regex.match(line)return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}print(extract(line))輸出:{'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}

異常處理
日志中不免會出現一些不匹配的行,需要處理。
這里使用re.match方法,有可能匹配不上。所以要增加一個判斷
采用拋出異常的方式,讓調用者獲得異常并自行處理。

import re
import datetimeline = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''pattern = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}regex = re.compile(pattern)def extract(logline: str) -> dict:""""返回字段的字典,拋出異常說明匹配失敗"""matcher = regex.match(line)if matcher:return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}else:raise Exception('No match')

但是,也可以采用返回一個特殊值的方式,告知調用者沒有匹配。

def extract(logline: str) -> dict:"""返回字段的字典,如果返回None說明匹配失敗"""matcher = regex.match(line)if matcher:return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}else:return None

通過返回值,在函數外部獲取了None,同樣也可以采取一些措施。本次采用返回None的實現。

文件加載及分析器

整合代碼

load函數就是從日志中提取合格的數據的生成器函數。
它可以作為dispatcher函數的數據源。
原來寫的handler函數處理一個字典的’datetime’字段,不能處理日志抽取函數extract返回的字典,
提供一個新的函數。

import random
import datetime
import time
from queue import Queue
import threading
import re# 數據源PATTERN = r'(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) .* .* \[(?P<datetime>.*)\] "(?P<method>\w+) (?P<url>[^\s]*) (?P<version>[\w|/\.\d]*)" (?P<status>\d{3}) (?P<length>\d+) "(?P<refer>[^\s]*)" "(?P<userAgent>.*)"'regex = re.compile(PATTERN)  # 40i7
ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}def extract(line: str) -> dict:matcher = regex.match(line)if matcher:return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}def load(path):""""裝載日志文件"""with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失敗則拋棄或者記錄日志# 數據處理
def source(second=1):"""生成數據"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)# 滑動窗口函數
def window(src: Queue, handler, width: int, interval: int):"""窗口函數:param src:數據源,緩存隊列,用來拿數據:param handler:數據處理函數:param width:時間窗口寬度,秒:param interval:處理時間間隔,秒"""start = datetime.datetime.strptime('20170101 080000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待計算數據delta = datetime.timedelta(seconds=width - interval)while True:# 從數據源獲取數據data = src.get()if data:buffer.append(data)  # 存入臨時緩沖等待計算current = data['datetime']# 每隔interval計算buffer中的數據一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('(}'.format(ret))start = current# 清除超出width的數據buffer = [x for x in buffer if x['datetime'] > current - delta]# 隨機數平均數測試函數def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)# 測試函數
def donothing_handler(iterable):return iterable# 分發器
def dispatcher(src):# 分發器中記錄handler,同時保存各自的隊列handlers = []queues = []def reg(handler, width: int, interval: int):"""注冊 窗口處理函數:param handler:注冊的數據處理函數:param width:時間窗口寬度:param interval:時間間隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 啟動線程處理數據for item in src:  # 將數據源取到的數據分發到所有隊列中for q in queues:print('item: {}'.format(item))q.put(item)return reg, runif __name__ == '__main__':import sys# path = sys.argv[1]path = 'test.log'reg, run = dispatcher(load(path))reg(donothing_handler, 10, 5)  # 注冊處理函數run()  # 運行test.log 文件內容為66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"
完成分析功能

分析日志很重要,通過海量數據分析就能夠知道是否遭受了攻擊,是否被爬取及爬取高峰期,是否有盜鏈等。
百度(Baidu) 爬蟲名稱(Baiduspider)
谷歌(Google)爬蟲名稱(Googlebot)

狀態碼分析

狀態碼中包含了很多信息。例如
304,服務器收到客戶端提交的請求參數,發現資源未變化,要求瀏覽器使用靜態資源的緩存
404,服務器找不大請求的資源
304占比大,說明靜態緩存效果明顯。404占比大,說明網站出現了錯誤鏈接,或者嘗試嗅探網站
資源。
如果400、500占比突然開始增大,網站一定出問題了。

# 狀態碼占比
def status_handler(iterable):# 時間窗口內的一批數據status = {}for item in iterable:key = item['status']status[key] = status.get(key, 0) + 1# total = sum(status.values())total = len(iterable)return {k: status[k] / total for k, v in status.items()}

如果還需要什么分析,增加分析函數handler注冊就行了

日志文件的加載

目前實現的代碼中,只能接受一個路徑,修改為接受一批路徑。

可以約定一下路徑下文件的存放方式:

如果送來的是一批路徑,就迭代其中路徑。

如果路徑是一個普通文件,就按照行讀取內容。

如果路徑是一個目錄,就遍歷路徑下所有普通文件,每一個文件按照行處理。不遞歸處理子目錄。

from pathlib import Pathdef load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():pass  # 和下面處理一樣elif p.is_file():with open(str(p)) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失敗則拋棄或者記錄日志

寫的過程中發現重復的地方,把文件處理部分提出來寫成函數。

from pathlib import Pathdef openfile(path: str):with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失敗則拋棄或者記錄日志def load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():yield from openfile(str(file))elif p.is_file():yield from openfile(str(p))
完整代碼
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path# 數據源PATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''regex = re.compile(PATTERN)ops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int
}def extract(line: str) -> dict:matcher = regex.match(line)if matcher:return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}# 裝載文件
def openfile(path: str):with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失敗則拋棄或者記錄日志def load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():yield from openfile(str(file))elif p.is_file():yield from openfile(str(p))# 數據處理
def source(second=1):"""生成數據"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}print('sleep second:',second)time.sleep(second)# 滑動窗口函數
def window(src: Queue, handler, width: int, interval: int):"""窗口函數:param src:數據源,緩存隊列,用來拿數據:param handler:數據處理函數:param width:時間窗口寬度,秒:param interval:處理時間間隔,秒"""start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待計算數據delta = datetime.timedelta(seconds=width - interval)while True:# 從數據源獲取數據data = src.get()if data:buffer.append(data)  # 存入臨時緩沖等待計算current = data['datetime']# 每隔interval計算buffer中的數據一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{}'.format(ret))start = current# 清除超出width的數據buffer = [x for x in buffer if x['datetime'] > current - delta]# 隨機數平均數測試函數def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)# 測試凼數
def donothing_handler(iterable):return iterable# 狀態碼占比
def status_handler(iterable):# 時間窗口內的一批數據status = {}for item in iterable:key = item['status']status[key] = status.get(key, 0) + 1# total = sum(status.values())total = len(iterable)return {k: status[k] / total for k, v in status.items()}# 分發器
def dispatcher(src):# 分發器中記錄handler,同時保存各自的隊列handlers = []queues = []def reg(handler, width: int, interval: int):"""注冊 窗口處理函數:param handler:注冊的數據處理函數:param width:時間窗口寬度:param interval:時間間隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 啟動線程處理數據for item in src:  # 將數據源取到的數據分發到所有隊列中for q in queues:print('queue put ', item)q.put(item)return reg, runif __name__ == '__main__':import sys# path = sys.argv[1]path = 'test1.log'reg, run = dispatcher(load(path))reg(status_handler, 10, 5)  # 注冊run()  # 1Z1J 妁高薪職業學院# test1.log 
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 НТТР/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"輸出:queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}
queue put  {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': 'Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)'}

到這里,一個離線日志分析項目基本完成。
1、可以指定文件或目錄,對日志進行數據分析
2、分析函數可以動態注冊
3、數據可以分發給不同的分析處理程序處理

瀏覽器分析

useragent
這里指的是,軟件按照一定的格式向遠端的服務器提供一個標識自己的字符串。在HTTP協議中,使用user-agent字段傳送這個字符串。
注意:這個值可以被修改
格式

現在瀏覽器的user-agent值格式一般如下:Mozilla/[version] ([system and browser information]) [platform] ([platform details]
) [extensions]例如  ChromeMozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chr
ome/57.0.2987.133 Safari/537.36FirefoxMozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0
Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0IE
Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2; .NET
CLR 2.0.50727; .NET CLR 3.5.30729; ?NET CLR 3.0.30729; Media Center PC 6.0; .NET4.
eC; .NET4.0E)

信息提取
pyyaml, ua-parser. user-agents模塊。
安裝
pip3 install pyyaml ua-parser user-agents
使用

from user_agents import parseuseragents = ["Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36","Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0""Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0","Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0; SLCC2;.NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; -NET4.0E)"
]for uastring in useragents:ua = parse(uastring)print(ua.browser, ua.browser.family, ua.browser.version, ua.browser.version_string)輸出:
Browser(family='Chrome', version=(57, 0, 2987), version_string='57.0.2987') Chrome (57, 0, 2987) 57.0.2987
Browser(family='Firefox', version=(52, 0), version_string='52.0') Firefox (52, 0) 52.0
Browser(family='IE', version=(10, 0), version_string='10.0') IE (10, 0) 10.0

ua.browser. family和ua.browser.version_string分別返回瀏覽器名稱、版本號。

數據分析

from user_agents import parse
import datetimeops = {'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'length': int,'request': lambda request: dict(zip(('method', 'url', 'protocol'), request.split())),'useragent': lambda useragent: parse(useragent)
}from user_agents import parseops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int,'useragent': lambda ua: parse(ua)
}

增加瀏覽器分析函數

# # 瀏覽器分析
def browser_handler(iterable):browsers = {}for item in iterable:ua = item['useragent']key = (ua.browser.family, ua.browser.version_string)browsers[key] = browsers.get(key, 0) + 1return browsers

注冊handler,注意時間窗口寬度

reg(browser_handler, 5, 5)

問題
如果想知道所有瀏覽器的統計,怎么辦?

allbrowsers = 1# 瀏覽器分析
def browser_handler(iterable):browsers = {}for item in iterable:ua = item['useragent']key = (ua.browser.family, ua.browser.version_string)browsers[key] = browsers.get(key, 0) + 1allbrowsers[key] = allbrowsers.get(key, 0) + 1print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])return browsers

完整代碼

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import PathPATTERN = r'''(?P<remote>[\d\.]{7,})\s-\s-\s\[(?P<datetime>.*)\]\s(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''regex = re.compile(PATTERN)from user_agents import parseops = {'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),'status': int,'size': int,'useragent': lambda ua: parse(ua)
}def extract(line: str) -> dict:matcher = regex.match(line)if matcher:return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}# 裝載文件
def openfile(path: str):with open(path) as f:for line in f:fields = extract(line)if fields:yield fieldselse:continue  # TODO 解析失敗則拋棄或者記錄日志def load(*paths):for item in paths:p = Path(item)if not p.exists():continueif p.is_dir():for file in p.iterdir():if file.is_file():yield from openfile(str(file))elif p.is_file():yield from openfile(str(p))# 數據處理
def source(second=1):"""生成數據"""while True:yield {'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value': random.randint(1, 100)}time.sleep(second)# 滑動窗口函數
def window(src: Queue, handler, width: int, interval: int):"""窗口函數:paramsrc:數據源,緩存隊列,用來拿數據:paramhandler:數據處理函數:paramwidth:時間窗口寬度,秒:paraminterval:處理時間間隔,秒"""start = datetime.datetime.strptime('20170101 000800 +0800', '%Y%m%d %H%M%S %z')current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')buffer = []  # 窗口中的待計算數據delta = datetime.timedelta(seconds=width - interval)while True:# 從數據源獲取數據data = src.get()if data:buffer.append(data)  # 存入臨時緩沖等待計算學院。current = data['datetime']# 每隔interval計算buffer中的數據一次if (current - start).total_seconds() >= interval:ret = handler(buffer)print('{}'.format(ret))start = current# 清除超出width的數據buffer = [x for x in buffer if x['datetime'] > current - delta]# 隨機數平均數測試函數def handler(iterable):return sum(map(lambda x: x['value'], iterable)) / len(iterable)# 測試函數def donothing_handler(iterable):return iterable# 狀態碼占比def status_handler(iterable):# 時間窗口內的一批數據status = {}for item in iterable:key = item['status']status[key] = status.get(key, 0) + 1# total = sum(status.values())total = len(iterable)return {k: status[k] / total for k, v in status.items()}allbrowsers = {}# 瀏覽器分析
def browser_handler(iterable):browsers = {}for item in iterable:ua = item['useragent']key = (ua.browser.family, ua.browser.version_string)browsers[key] = browsers.get(key, 0) + 1allbrowsers[key] = allbrowsers.get(key, 0) + 1print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])return browsers# 分發器
def dispatcher(src):# 分發器中記錄handler,同時保存各自的隊列handlers = []queues = []def reg(handler, width: int, interval: int):"""注冊 窗口處理函數:param handler:注冊的數據處理函數:param width:時間窗口寬度:param interval:時間間隔"""q = Queue()queues.append(q)h = threading.Thread(target=window, args=(q, handler, width, interval))handlers.append(h)def run():for t in handlers:t.start()  # 啟動線程處理數據for item in src:  # 將數據源取到的數據分發到所有隊列中for q in queues:print('q put item :',item)q.put(item)return reg, runif __name__ == '__main__':import sys# path = sys.argv[1]path = 'test1.log'reg, run = dispatcher(load(path))reg(status_handler, 10, 5)  # VEreg(browser_handler, 5, 5)run()  # 運行test.log文件內容如下66.249.69.131 - - [19/Feb/2013:10:23:29 +0800] "GET /robots.txt HTTP/1.1" 404 162 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; + http:///bot.html)"輸出:
q put item : {'remote': '183.60.212.153', 'datetime': datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'method': '"GET', 'url': '/o2o/media.html?menu=3', 'protocol': 'НТТР/1.1', 'status': 200, 'size': 16691, 'useragent': <user_agents.parsers.UserAgent object at 0x10308f0e0>}
q put item : {'remote': '183.60.212.153', 'datetime': 

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/88487.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/88487.shtml
英文地址,請注明出處:http://en.pswp.cn/web/88487.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一文入門神經網絡:神經網絡概念初識

神經網絡的世界遠比你想象得更豐富多元。從基礎架構到前沿融合模型&#xff0c;我為你梳理了當前最值得關注的神經網絡類型&#xff0c;不僅包括那些“教科書級”的經典模型&#xff0c;也覆蓋了正在改變行業格局的新興架構。以下是系統分類與核心特點總結&#xff1a;一、基礎…

線上事故處理記錄

線上事故處理記錄 一、MySQL 導致的服務器 CPU 飆升 有一天&#xff0c;突然收到了服務器 CPU 飆升的告警信息&#xff0c;打開普羅米修斯查看 CPU 的使用情況&#xff0c;發現 CPU 確實飆升了&#xff0c;下面開始去進行問題定位了。 1. 首先連接到對應的服務器&#xff0c;然…

ParaCAD 筆記 png 圖紙標注數據集

ParaCAD-Dataset git lfs install git clone https://www.modelscope.cn/datasets/yuwenbonnie/ParaCAD-Dataset.git https://github.com/ParaCAD/ 不止100g 下個最小的 沒有三視圖

C#使用Semantic Kernel實現Embedding功能

1、背景 C#開發中&#xff0c;可以通過Semantic Kernel實現本地模型的調用和實現。 本地的Ollama的版本如下&#xff1a;安裝的Package如下&#xff1a;2、代碼實現 // See https://aka.ms/new-console-template for more information using Microsoft.Extensions.AI; using Mi…

轉轉APP逆向

APP版本 11.15.0 接口分析 # URL https://app.zhuanzhuan.com/zz/transfer/search# header cookie xxx x-zz-monitoring-metrics feMetricAntiCheatLevelV1 zztk user-agent Zhuan/11.15.0 (11015000) Dalvik/2.1.0 (Linux; U; Android 10; Pixel 3 Build/QQ3A.200805.001) z…

注解與反射的完美配合:Java中的聲明式編程實踐

注解與反射的完美配合&#xff1a;Java中的聲明式編程實踐 目錄 引言 核心概念 工作機制 實戰示例 傳統方式的痛點 注解反射的優勢 實際應用場景 最佳實踐 總結 引言 在現代Java開發中&#xff0c;我們經常看到這樣的代碼&#xff1a; Range(min 1, max 50)priva…

開源入侵防御系統——CrowdSec

1、簡介 CrowdSec 是一款現代化、開源、基于行為的入侵防御系統&#xff08;IDS/IPS&#xff09;&#xff0c;專為保護服務器、服務、容器、云原生應用而設計。它通過分析日志檢測可疑行為&#xff0c;并可基于社區協作共享惡意 IP 黑名單&#xff0c;從而實現分布式防御。 其…

imx6ull-裸機學習實驗13——串口格式化函數移植實驗

目錄 前言 格式化函數 實驗程序編寫 stdio文件夾 main.c Makefile修改 編譯下載 前言 在學習實驗12&#xff1a;imx6ull串口通信實驗&#xff0c;我們實現了 UART1 基本的數據收發功能&#xff0c;雖然可以用來調試程序&#xff0c;但是功能太單一了&#xff0c;只能輸出…

CCF-GESP 等級考試 2025年6月認證C++三級真題解析

1 單選題&#xff08;每題 2 分&#xff0c;共 30 分&#xff09;第1題 8位二進制原碼能表示的最小整數是&#xff1a;&#xff08; &#xff09;A. -127 B. -128 C. -255 …

【網絡安全】服務間身份認證與授權模式

未經許可,不得轉載。 文章目錄 問題背景用戶到服務的身份認證與授權系統對系統的通信服務與服務之間的通信需求分析Basic Auth(基本身份認證)優點缺點mTLS 證書認證優點缺點OAuth 2.0優點缺點JWS(JSON Web Signature)優點缺點結合 Open Policy Agent 的 JWS 方案優點缺點結…

【EGSR2025】材質+擴散模型+神經網絡相關論文整理隨筆(四)

An evaluation of SVBRDF Prediction from Generative Image Models for Appearance Modeling of 3D Scenes輸入3D場景的幾何和一張參考圖像&#xff0c;通過擴散模型和SVBRDF預測器獲取多視角的材質maps&#xff0c;這些maps最終合并成場景的紋理地圖集&#xff0c;并支持在任…

Grid網格布局完整功能介紹和示例演示

CSS Grid布局是一種強大的二維布局系統&#xff0c;可以將頁面劃分為行和列&#xff0c;精確控制元素的位置和大小。以下是其完整功能介紹和示例演示&#xff1a; 基本概念 網格容器&#xff08;Grid Container&#xff09;&#xff1a;應用display: grid的元素。網格項&#x…

學習C++、QT---21(QT中QFile庫的QFile讀取文件、寫入文件的講解)

每日一言把大目標拆成小步&#xff0c;每天前進一點點&#xff0c;終會抵達終點。QFile讀取文件我們記事本要進行讀取文件、寫入文件、等等的操作&#xff0c;那么這個時候我們的QT有一個QT類叫做QFile這個類的話是專門對于文件操作的&#xff0c;所以我們來學習我們在QT的幫助…

AD736ARZ-R7精密真有效值轉換器 高精度測量的首選方案

AD736ARZ-R7精密轉換器產品概述AD736ARZ-R7是ADI&#xff08;Analog Devices Inc.&#xff09;推出的一款低功耗、高精度的真有效值&#xff08;RMS&#xff09;轉直流&#xff08;DC&#xff09;轉換器&#xff0c;采用SOIC-8封裝&#xff0c;適用于需要精確測量交流或復雜波形…

【web應用】若依框架前端報表制作與導出全攻略(ECharts + html2canvas + jsPDF)

文章目錄前言一、ECharts準備工作1. 檢查ECharts安裝2. 導入ECharts3. 創建餅圖組件4. 模板部分二、報表導出功能實現1. 安裝依賴2. 導入依賴3. 完整導出函數實現4. 樣式優化三、完整組件實現四、常見問題與解決方案1. 圖表截圖不完整或模糊2. 圖表背景透明3. 導出PDF中文亂碼4…

vue3+express聯調接口時報“\“username\“ is required“問題

我用node .js的express框架寫的登錄接口&#xff0c;發現postman可以調通&#xff0c;但是vue3前端報錯vue3我發現是我后端node.js的app.js入口文件中配置的解析前端參數的解析中間件和前端請求頭中的Content-Type配置不一致的原因 解決方案 因為我后端配置解析表單數據的中間件…

《月亮與六便士》:天才的背叛與凡人救贖的殘酷辯證法

當滿地六便士成了庸人的火葬場??毛姆筆下的斯特里克蘭德&#xff0c;是一把捅穿中產幻夢的利刃。這個拋妻棄子、背叛友人的證券經紀人&#xff0c;在倫敦客廳的茶香與銀勺碰撞聲中&#xff0c;突然聽見了遠方的驚雷——“我必須畫畫”。如書中所言&#xff1a;??“在滿地都…

vue2往vue3升級需要注意的點(個人建議非必要別直接升級)

將 Vue 2 項目升級到 Vue 3 的過程中&#xff0c;需要重點關注以下幾個難點和關鍵點&#xff1a; 建議小項目直接用vue3重寫更快&#xff0c;bug更少 文章目錄1. **Composition API 的學習與應用**2. **全局 API 的變更**3. **模板語法的兼容性變化**4. **組件選項和生命周期的…

聚焦數據資源建設與應用,浙江省質科院赴景聯文科技調研交流

7月10日上午&#xff0c;浙江省質科院標準化中心副主任蔣建平、應珊婷等一行領導帶隊蒞臨景聯文科技調研指導工作。雙方圍繞工業數據展開深度交流。座談會上&#xff0c;景聯文科技詳細匯報了數據資源建設與應用方面的成果與規劃&#xff0c;介紹了公共數據授權運營與對外合作的…

【Linux】系統引導修復

目錄 開機引導過程 一.通電 二.BIOS環境檢測 三.磁盤引導階段 四.文件引導階段 自動引導配置文件丟失修復 內核參數文件丟失修復 內核鏡像文件丟失修復 內核初始化文件丟失修復 boot目錄誤刪丟失修復 開機引導過程 磁盤引導階段 /boot/grub2/grub.cfg #讀取自動引…