目錄 | | 目錄 | |
---|---|---|---|
迭代器與生成器(一) | 1.手動遍歷迭代器 2.代理迭代 3.使用生成器創建新的迭代模式 4.實現迭代器協議 | 迭代器與生成器(三) | 9.排列組合的迭代 10.序列上索引值迭代 11.同時迭代多個序列 12.不同集合上元素的迭代 |
迭代器與生成器(二) | 5.反向迭代 6.帶有外部狀態的生成器函數 7.迭代器切片 8.跳過可迭代對象的開始部分 | 迭代器與生成器(四) | 13.創建數據處理管道 14.展開嵌套的序列 15.順序迭代合并后的排序迭代對象 16.迭代器代替 while 無限循環 |
迭代器與生成器(四)
- 13.創建數據處理管道
- 14.展開嵌套的序列
- 15.順序迭代合并后的排序迭代對象
- 16.迭代器代替 while 無限循環
13.創建數據處理管道
你想以數據管道(類似 Unix 管道)的方式迭代處理數據。比如,你有個大量的數據需要處理,但是不能將它們一次性放入內存中。
生成器函數是一個實現管道機制的好辦法。為了演示,假定你要處理一個非常大的日志文件目錄:
foo/access-log-012007.gzaccess-log-022007.gzaccess-log-032007.gz...access-log-012008
bar/access-log-092007.bz2...access-log-022008
假設每個日志文件包含這樣的數據:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...
為了處理這些文件,你可以定義一個由多個執行特定任務獨立任務的簡單生成器函數組成的容器。就像這樣:
import os
import fnmatch
import gzip
import bz2
import redef gen_find(filepat, top):'''在目錄樹中查找所有匹配指定通配符模式的文件名。'''for path, dirlist, filelist in os.walk(top): # 使用 os.walk(top) 遞歸遍歷 top 目錄及其子目錄。for name in fnmatch.filter(filelist, filepat): # 對于每個目錄,篩選出匹配 filepat 的文件名。yield os.path.join(path, name) # 使用 yield 生成每個匹配文件的完整路徑(拼接目錄路徑和文件名)。def gen_opener(filenames):'''按順序打開一系列文件,每次生成一個文件對象。文件會在下一次迭代前關閉。'''for filename in filenames:if filename.endswith('.gz'):f = gzip.open(filename, 'rt')elif filename.endswith('.bz2'):f = bz2.open(filename, 'rt')else:f = open(filename, 'rt')yield ff.close() # 由于生成器會在 yield 后暫停,文件對象的實際關閉是在調用代碼繼續下一次迭代時發生的。def gen_concatenate(iterators):'''將多個迭代器連接成一個單一的序列。'''for it in iterators:yield from itdef gen_grep(pattern, lines):'''在行序列中搜索匹配正則表達式的行。'''pat = re.compile(pattern)for line in lines:if pat.search(line):yield line
現在你可以很容易的將這些函數連起來創建一個處理管道。比如,為了查找包含單詞 python
的所有日志行,你可以這樣做:
lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines) # (?i)是正則表達式的忽略大小寫標志,因此會匹配 python、Python、PYTHON 等。
for line in pylines:print(line)
如果將來的時候你想擴展管道,你甚至可以在生成器表達式中包裝數據。比如,下面這個版本計算出傳輸的字節數并計算其總和。
lognames = gen_find('access-log*', 'www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
bytes = (int(x) for x in bytecolumn if x != '-')
print('Total', sum(bytes))
line.rsplit(None, 1)
:None
表示按任意空白字符(空格、制表符等)分割。1
表示最多分割一次,從右側開始分割。- 例如:
"1234 python 512".rsplit(None, 1)
→['1234 python', '512']
。
[1]
取分割后的最后一列(如'512'
)。bytecolumn
是一個生成器表達式,生成所有行的最后一列值。
示例輸出:
# 輸入pylines:
# ["1234 python 512", "42 PYTHON 1024", "python - 2048"]
bytecolumn = (line.rsplit(None,1)[1] for line in pylines)
# 生成的bytecolumn內容:
# ['512', '1024', '2048']
# 輸入bytecolumn:
# ['512', '1024', '2048']
bytes = (int(x) for x in bytecolumn if x != '-')
# 生成的bytes內容:
# 512, 1024, 2048
以管道方式處理數據可以用來解決各類其他問題,包括解析,讀取實時數據,定時輪詢等。
為了理解上述代碼,重點是要明白 yield
語句作為數據的 生產者,而 for
循環語句作為數據的 消費者。當這些生成器被連在一起后,每個 yield
會將一個單獨的數據元素傳遞給迭代處理管道的下一階段。在例子最后部分,sum()
函數是最終的程序驅動者,每次從生成器管道中提取出一個元素。
這種方式一個非常好的特點是每個生成器函數很小并且都是獨立的。這樣的話就很容易編寫和維護它們了。很多時候,這些函數如果比較通用的話可以在其他場景重復使用。并且最終將這些組件組合起來的代碼看上去非常簡單,也很容易理解。
使用這種方式的內存效率也不得不提。上述代碼即便是在一個超大型文件目錄中也能工作的很好。事實上,由于使用了迭代方式處理,代碼運行過程中只需要很小很小的內存。
在調用 gen_concatenate()
函數的時候你可能會有些不太明白。這個函數的目的是將輸入序列拼接成一個很長的行序列。itertools.chain()
函數同樣有類似的功能,但是它需要將所有可迭代對象作為參數傳入。
在上面這個例子中,你可能會寫類似這樣的語句 lines = itertools.chain(*files)
,這將導致 gen_opener()
生成器被提前全部消費掉。但由于 gen_opener()
生成器每次生成一個打開過的文件,等到下一個迭代步驟時文件就關閉了,因此 chain()
在這里不能這樣使用。上面的方案可以避免這種情況。
files = gen_opener(['a.txt', 'b.gz']) # 生成器,每次 yield 一個文件對象
lines = itertools.chain(*files) # 錯誤!files 會被全部消費,文件可能已關閉
for line in lines: # 實際迭代時文件已關閉,可能報錯print(line)
files = gen_opener(['a.txt', 'b.gz']) # 生成器,每次 yield 一個文件對象
lines = gen_concatenate(files) # 惰性拼接,按需打開文件
for line in lines: # 安全迭代print(line)
gen_concatenate()
函數中出現過 yield from
語句,它將 yield
操作代理到父生成器上去。語句 yield from it
簡單的返回生成器 it
所產生的所有值。
🚀 對
yield from
仍有疑問的,可以參考博主的這篇博客《yield from 功能解析》。
最后還有一點需要注意的是,管道方式并不是萬能的。有時候你想立即處理所有數據。然而,即便是這種情況,使用生成器管道也可以將這類問題從邏輯上變為工作流的處理方式。
14.展開嵌套的序列
你想將一個多層嵌套的序列展開成一個單層列表。
可以寫一個包含 yield from
語句的遞歸生成器來輕松解決這個問題。比如:
from collections import Iterabledef flatten(items, ignore_types=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):yield from flatten(x)else:yield xitems = [1, 2, [3, 4, [5, 6], 7], 8]
# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):print(x)
在上面代碼中, isinstance(x, Iterable)
檢查某個元素是否是可迭代的。如果是的話, yield from
就會返回所有子例程的值。最終返回結果就是一個沒有嵌套的簡單序列了。
額外的參數 ignore_types
和檢測語句 isinstance(x, ignore_types)
用來將字符串和字節排除在可迭代對象外,防止將它們再展開成單個的字符。這樣的話字符串數組就能最終返回我們所期望的結果了。比如:
>>> items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
>>> for x in flatten(items):
... print(x)
...
Dave
Paula
Thomas
Lewis
>>>
語句 yield from
在你想在生成器中調用其他生成器作為子例程的時候非常有用。如果你不使用它的話,那么就必須寫額外的 for
循環了。比如:
def flatten(items, ignore_types=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):for i in flatten(x):yield ielse:yield x
盡管只改了一點點,但是 yield from
語句看上去感覺更好,并且也使得代碼更簡潔清爽。
之前提到的對于字符串和字節的額外檢查是為了防止將它們再展開成單個字符。如果還有其他你不想展開的類型,修改參數 ignore_types
即可。
最后要注意的一點是, yield from
在涉及到基于協程和生成器的并發編程中扮演著更加重要的角色。
15.順序迭代合并后的排序迭代對象
你有一系列排序序列,想將它們合并后得到一個排序序列并在上面迭代遍歷。
heapq.merge()
函數可以幫你解決這個問題。比如:
>>> import heapq
>>> a = [1, 4, 7, 10]
>>> b = [2, 5, 6, 11]
>>> for c in heapq.merge(a, b):
... print(c)
...
1
2
4
5
6
7
10
11
heapq.merge
可迭代特性意味著它不會立馬讀取所有序列。這就意味著你可以在非常長的序列中使用它,而不會有太大的開銷。比如,下面是一個例子來演示如何合并兩個排序文件:
with open('sorted_file_1', 'rt') as file1, \open('sorted_file_2', 'rt') as file2, \open('merged_file', 'wt') as outf:for line in heapq.merge(file1, file2):outf.write(line)
有一點要強調的是 heapq.merge()
需要 所有輸入序列必須是排過序的。特別的,它并不會預先讀取所有數據到堆棧中或者預先排序,也不會對輸入做任何的排序檢測。它僅僅是檢查所有序列的開始部分并返回最小的那個,這個過程一直會持續直到所有輸入序列中的元素都被遍歷完。
16.迭代器代替 while 無限循環
你在代碼中使用 while
循環來迭代處理數據,因為它需要調用某個函數或者和一般迭代模式不同的測試條件。能不能用迭代器來重寫這個循環呢?
一個常見的 IO 操作程序可能會像下面這樣:
CHUNKSIZE = 8192def reader(s):while True:data = s.recv(CHUNKSIZE)if data == b'':breakprocess_data(data)
這種代碼通常可以使用 iter()
來代替,如下所示:
def reader2(s):for chunk in iter(lambda: s.recv(CHUNKSIZE), b''):pass# process_data(data)
如果你懷疑它到底能不能正常工作,可以試驗下一個簡單的例子。比如:
>>> import sys
>>> f = open('/etc/passwd')
>>> for chunk in iter(lambda: f.read(10), ''):
... n = sys.stdout.write(chunk)
...
nobody:*:-2:-2:Unprivileged User:/var/empty:/usr/bin/false
root:*:0:0:System Administrator:/var/root:/bin/sh
daemon:*:1:1:System Services:/var/root:/usr/bin/false
_uucp:*:4:4:Unix to Unix Copy Protocol:/var/spool/uucp:/usr/sbin/uucico
...
>>>
iter
函數一個鮮為人知的特性是它接受一個可選的 callable
對象和一個標記(結尾)值作為輸入參數。當以這種方式使用的時候,它會創建一個迭代器,這個迭代器會不斷調用 callable
對象直到返回值和標記值相等為止。
iter(callable, sentinel)
的用法
- 功能:
創建一個迭代器,重復調用callable
(可調用對象)直到它返回sentinel
(哨兵值)。- 在此代碼中:
callable
是lambda: f.read(10)
:每次調用時從文件f
讀取 10 字節。sentinel
是''
:當f.read(10)
返回空字符串時(表示文件結束),迭代停止。- 效果:
將文件分塊讀取,每次 10 字節,避免一次性加載大文件到內存。
這種特殊的方法對于一些特定的會被重復調用的函數很有效果,比如涉及到 I/O 調用的函數。舉例來講,如果你想從套接字或文件中以數據塊的方式讀取數據,通常你得要不斷重復的執行 read()
或 recv()
,并在后面緊跟一個文件結尾測試來決定是否終止。這節中的方案使用一個簡單的 iter()
調用就可以將兩者結合起來了。其中 lambda
函數參數是為了創建一個無參的 callable
對象,并為 recv
或 read()
方法提供了 size
參數。