前面寫過一篇關于python實現類似expect shell的交互式能力的文章,現在補全一下加上sftp的能力腳本。
例子在代碼中__example()
方法。
依賴paramiko
庫,所以需要執行pip install paramiko
來安裝。
import os
import queue
import re
import threading
import time
import traceback
import stat
import datetimeimport paramiko
from paramiko import SSHClient, SSHException, SFTPClient, Channelclass SFTPMultipleClient(object):"""支持多sftp連接拉取文件支持快捷執行遠程命令"""def __init__(self, host, port, username, pwd, work_count=1) -> None:super().__init__()# client = SSHClient()# client.set_missing_host_key_policy(paramiko.AutoAddPolicy())# client.connect(hostname=host, timeout=60, port=port, username=username, password=pwd)# self.ssh_client = client# self.sftp_client = client.open_sftp()self.host = hostself.port = portself.username = usernameself.pwd = pwdself.work_count = work_countself.thread_local_data = threading.local()self.pull_queue = queue.Queue()self.ssh_clientlist = []self.sftp_clientlist = []self.thread_list = {}self.email_send_files = []self.ptySessions = [](self.ssh_client, self.sftp_client) = self._gen_sftp_client(host, port, username,pwd) # type:(SSHClient, SFTPClient)for i in range(work_count):self.thread_list[self._start_back_task(self._pull_event_handler)] = Falsedef _start_back_task(self, fun, args=()):t = threading.Thread(target=fun, daemon=True, args=args)t.start()return tdef remote_scp_progress(self, a, b):"""遠程scp進度打印:param a: 當前已傳輸大小(單位字節):param b: 文件總大小(單位字節)"""if a > b:a = bsecond = self.thread_local_data.timer.last_press_time_deltaif second > 1 or a == b:self.thread_local_data.timer.press()speed = a - self.thread_local_data.last_progressself.thread_local_data.last_progress = aremaining_time_second = (b - a) / speeds = self.thread_local_data.get_file_desc + "%s %s %02d%s %dKB/s %02d:%02d " % \(a, b, int(a / b * 100), "%", speed / 1024, remaining_time_second / 60, remaining_time_second % 60)print(s)# print(s, end="", flush=True)# _back = ""# for i in range(len(s)):# _back += '\b'# print(_back, end="")def _gen_sftp_client(self, host, port, username, pwd) -> (SSHClient, SFTPClient):"""生成sftp客戶端當連接斷開會進行重試,最大重試連接5次"""_try_count = 0_ssh_client = SSHClient()_ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())_sftp_client = Nonetname = threading.current_thread().namestime = 1while _try_count < 5:connect_failed = Falsetry:_ssh_client.connect(hostname=host, timeout=60, port=port, username=username,password=pwd)_sftp_client = _ssh_client.open_sftp()print(tname + " ssh連接成功.")breakexcept Exception as e:connect_failed = Truetraceback.print_exc()if connect_failed:try:_ssh_client.close()if _sftp_client:_sftp_client.close()except Exception as e:traceback.print_exc()_try_count += 1if _try_count == 5:raise Exception("嘗試重新連接達到最大次數.斷開連接.")traceback.print_exc()print(tname + " 第%d次重新連接ssh..." % _try_count)stime *= 2time.sleep(stime)self.ssh_clientlist.append(_ssh_client)self.sftp_clientlist.append(_sftp_client)self.thread_local_data.ssh_client = _ssh_clientself.thread_local_data.sftp_client = _sftp_clientreturn _ssh_client, _sftp_clientdef _pull_event_handler(self):"""拉取任務處理"""self._gen_sftp_client(self.host, self.port, self.username, self.pwd)# self.thread_local_data.ssh_client = _ssh_client# self.thread_local_data.sftp_client = _sftp_clientwhile True:self.thread_list[threading.current_thread()] = Falsedata = self.pull_queue.get(block=True, timeout=None)self.thread_list[threading.current_thread()] = True# 通過隊列插入None來結束線程,如果有n個線程在監聽隊列,那么需要n個None來結束if data is None:self.thread_list[threading.current_thread()] = Falsebreaktry:self._remote_scp(self.thread_local_data.ssh_client, self.thread_local_data.sftp_client,data.remote_path,data.local_path, data.max_size, callback=self.remote_scp_progress)fun = data.call_bak_funif fun:call_bak_fun_args = ()call_bak_fun_kwargs = {}if type(fun) in [list, tuple]:if not callable(fun[0]):raise Exception("call_bak_fun 不是可調用對象")if len(fun) > 1:call_bak_fun_args = fun[1]if len(fun) > 2:call_bak_fun_kwargs = fun[2]else:if not callable(fun):raise Exception("call_bak_fun 不是可調用對象")fun[0](*call_bak_fun_args, **call_bak_fun_kwargs)except Exception as e:traceback.print_exc()# pull_queue.put(data)# print("線程%s 結束." % threading.current_thread().name)def _remote_scp(self, _ssh_client, _sftp_client, _remote_path, _local_path, max_size=None, callback=None):reconnect = Falsetry:if os.path.exists(_local_path):rf_stat = _sftp_client.lstat(_remote_path)lf_stat = os.stat(_local_path)if rf_stat.st_size == lf_stat.st_size:print(_local_path + " already exists.")returnif max_size and rf_stat.st_size > max_size: # 如果大于1m則認為是空板圖片print(_local_path + " > " + max_size + " skipped.")returnprint(threading.current_thread().name + "copy file:%s << %s\t" % (_local_path, _remote_path))self.thread_local_data.get_file_desc = "copy file:%s << %s\t" % (_local_path, _remote_path)self.thread_local_data.timer = Timer()self.thread_local_data.last_progress = 0_sftp_client.get(_remote_path, _local_path, callback=callback)except FileNotFoundError as e:traceback.print_exc()print("continue...")except SSHException as e:traceback.print_exc()reconnect = Trueexcept OSError as e:print("os error====")traceback.print_exc()reconnect = Trueif reconnect:print("重新連接ssh...")_sftp_client.close()_ssh_client.close()self.sftp_clientlist.remove(_sftp_client)self.ssh_clientlist.remove(_ssh_client)_ssh_client, _sftp_client = self._gen_sftp_client(self.host, self.port, self.username, self.pwd)self._remote_scp(_ssh_client, _sftp_client, _remote_path, _local_path, max_size, callback)if callback:print()def pull_file(self, remote_path, local_path, max_size=None, print_progress=False):"""拉取文件:param remote_path: 遠程文件路徑:param local_path: 本地文件路徑:param max_size: 遠程文件如果超過了這個大小,則不做拉取:param print_progress: 是否打印文件拉取進度"""if print_progress:self._remote_scp(self.ssh_client, self.sftp_client, remote_path, local_path, max_size,self.remote_scp_progress)else:self._remote_scp(self.ssh_client, self.sftp_client, remote_path, local_path, max_size)def submit_pull_work(self, remote_path, local_path, max_size=None, call_bak_fun=None):"""提交拉取文件任務:param remote_path: 遠程文件路徑:param local_path: 本地文件路徑:param max_size: 遠程文件如果超過了這個大小,則不做拉取:param call_bak_fun: 拉取完成回調方法"""e_data = Task(remote_path, local_path, max_size, call_bak_fun)self.pull_queue.put(e_data)def exec_command(self, command, *args, raise_e=True, **kwargs):"""執行遠程命令"""stdin, stdout, stderr = self.ssh_client.exec_command(command, *args, **kwargs)if raise_e and stdout.channel.recv_exit_status() != 0:raise Exception(stderr.readline())return stdin, stdout, stderrdef mkdir(self, path, recursive=True, mode=0o777):"""創建文件夾:param path: 遠程文件夾:param recursive: 是否遞歸創建 默認true:param mode: 權限,默認0o777-全部權限"""if not recursive:self.sftp_client.mkdir(path, mode)returndirs = path.split(os.path.sep)current_dir = "/"for i, d in enumerate(dirs):d = os.path.join(current_dir, d)create = Truetry:if stat.S_ISDIR(self.sftp_client.stat(d).st_mode):create = Falseexcept FileNotFoundError:create = Trueif create:self.sftp_client.mkdir(d, mode)current_dir = ddef destroy(self):"""銷毀當前實例"""while not self.pull_queue.empty(): # 等待隊列任務處理完畢time.sleep(0.2)# 結束線程for thread in self.thread_list:self.pull_queue.put(None) # 通過None來停止線程while True:if thread.is_alive() and self.thread_list[thread]:time.sleep(1)else:break# self.thread_list.pop(thread)# for i in range(len(self.thread_list)):# self.pull_queue.put(None) # 通過None來停止線程# self.thread_list.pop() # 移除成員for ptySession in self.ptySessions:ptySession.destroy()for i in range(len(self.sftp_clientlist)):sftp_client = self.sftp_clientlist.pop()sftp_client.close()for i in range(len(self.ssh_clientlist)):ssh_client = self.ssh_clientlist.pop()ssh_client.close()def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.destroy()def open_pty_session(self, ending_char, timeout=30):"""從當前channel開啟一個session并激活@return:"""_session = self.ssh_client.get_transport().open_session(timeout=1 * 3600) # type:Channel_session.get_pty()_session.invoke_shell()_ptysession = PtySession(_session, ending_char=ending_char, timeout=timeout)self.ptySessions.append(_ptysession)return _ptysessiondef listdir_attr(self, path):"""列出遠程目錄:param path: 遠程目錄:return: list of `.SFTPAttributes` objects"""_reconnect = Falsetry:return self.sftp_client.listdir_attr(path)except SSHException as e:traceback.print_exc()_reconnect = Trueexcept OSError as e:traceback.print_exc()_reconnect = Trueif _reconnect:print("重新連接ssh...")self.sftp_client.close()self.ssh_client.close()self.sftp_clientlist.remove(self.sftp_client)self.ssh_clientlist.remove(self.ssh_client)self.ssh_client, self.sftp_client = self._gen_sftp_client(self.host, self.port, self.username, self.pwd)return self.listdir_attr(path)# 一次讀取字節長度
recv_len = 1024 * 5class Task(object):def __init__(self, remote_path, local_path, max_size=None, call_bak_fun=None) -> None:super().__init__()self.remote_path = remote_path # 遠程路徑self.local_path = local_path # 本地路徑self.max_size = max_size # 最大大小,當大于這個大小將跳過不做處理self.call_bak_fun = call_bak_fun # 回調方法 格式:(fn,arg,kw)class PtySession(object):def __init__(self, session, ending_char, timeout=30) -> None:super().__init__()self.session = session # type:Channelself.last_line = ""self.ending_char = ending_char # 每執行一個命令之后,標記輸出的結束字符self.clear_tail()self.timeout = timeout # 超時時間,秒def clear_tail(self, _ending_char=None):"""清理輸出還處于緩沖區中未讀取的流"""if not _ending_char:_ending_char = self.ending_charwhile True:time.sleep(0.2)# self.session.recv_ready()在讀取過程中不一定總是True,只有當讀取緩沖流中有字節讀取時,才會為True。所以在讀取頭一次后獲取下次流到緩沖區中前為Falseif self.session.recv_ready():self.last_line = self.session.recv(recv_len)self.last_line = self.last_line.decode('utf-8')print(self.last_line)if re.search(_ending_char, self.last_line):breakdef destroy(self):"""銷毀并關閉session:return::rtype:"""self.clear_tail()self.session.close()def exp(self, *exp_cmds):"""期望并執行,與expect的用法類似。session.exp(("\$", "scp test.txt luckydog@127.0.0.1:~/\r",(("yes/no", "yes\r",("Password:", "luckydog\r")),("Password:", "luckydog\r")))):param exp_cmds: 第一個元素為獲取的期望結束字符,第二個元素為需要執行的命令,如果傳入的第三個元素,則第三個元素必須為元祖,并且也同父級一樣,屬遞歸結構。類似GNU的遞歸縮寫。:type exp_cmds: tuple"""interval = 0.2cur_time = 0.0try:while True:if self.session.recv_ready():self.last_line = self.session.recv(recv_len).decode('utf-8')print(self.last_line)elif self.session.send_ready():for exp_cmd in exp_cmds:_cmd = exp_cmd[1]if not _cmd.endswith("\r"):_cmd += "\r"match = re.search(exp_cmd[0], self.last_line)if match and match.group():self.session.send(_cmd)# 清空最后一行數據緩存,便于下個命令的讀取流輸出。此行代碼去除,會導致無法等待命令執行完畢提前執行后續代碼的問題。self.last_line = ""if len(exp_cmd) == 3 and exp_cmd[2]:self.exp(*exp_cmd[2])returncur_time += intervalif cur_time >= self.timeout:raise Exception("timeout...")time.sleep(interval)except Exception as e:traceback.print_exc()# finally:# self.clear_tail()def send(self, cmd, _ending_char=None):"""單純的發送命令到目標服務器執行。:param cmd: 命令:type cmd: str"""self.last_line = ""if not cmd.endswith("\r"):cmd += "\r"self.session.send(cmd)self.clear_tail(_ending_char)# -----------------------
class Timer(object):def __init__(self) -> None:super().__init__()self._start_time = Noneself._end_time = Noneself._seconds_delta = Noneself._last_press_time = None # 最后一個計次時間def start(self):if not self._start_time:self._start_time = datetime.datetime.now()self._last_press_time = self._start_timeelse:raise Exception("timer is already started.")return self._start_timedef end(self):if not self._end_time:self._end_time = datetime.datetime.now()self._seconds_delta = (self._end_time - self._start_time).total_seconds()else:raise Exception("timer is already stopped.")def press(self):if not self._start_time:self._last_press_time = self.start()return 0now = datetime.datetime.now()last_time = self._last_press_timeself._last_press_time = nowreturn (now - last_time).total_seconds()@propertydef last_press_time_delta(self):if not self._last_press_time:self.press()return 0return (datetime.datetime.now() - self._last_press_time).total_seconds()@propertydef start_time(self):return self._start_time@propertydef end_time(self):return self._end_timedef timedelta_seconds(self):if self._seconds_delta:return self._seconds_deltaelse:raise Exception("timer not stopped.")# ===================test=========================
def _file_pull_complete(filename, **kw):print(filename + "文件上傳完成")def __example():sftp = SFTPMultipleClient(host="130.16.16.18", port=22, username="baiyang", pwd="xxx")# 創建目錄sftp.mkdir("/home/username/123/")# 下載文件到本地(同步執行)sftp.pull_file("/home/username/temp/rumenz.img", # 遠程文件"/Users/username/rumenz.img", # 本地路徑文件,需要帶上文件名print_progress=True # 打印文件拉取進度)# 提交下載遠程文件異步任務(異步線程執行不阻塞)sftp.submit_pull_work("/home/username/temp/rumenz.img", # 遠程文件"/Users/username/rumenz.img", # 本地路徑文件,需要帶上文件名call_bak_fun=(_file_pull_complete, ("rumenz.img",)) # 文件上傳完成回調,第一個元素為方法,第二個是入參)# 列出文件列表files = sftp.listdir_attr("/home/baiyang")for f in files:print(f)# 開啟交互回話session = sftp.open_pty_session("\$")# 發送單條命令session.send("cd ~/123")# # 發送單條命令并等待獲取_ending_char符號(結束符支持el)session.send("scp test.txt username@130.16.16.133:~/\r", "Password:|yes/no")# # 當最后一行為yes/no 則執行yes,如果是Password則輸入密碼session.exp(("yes/no", "yes\r",("Password:", "xxx\r")),("Password:", "xxx\r"))# scp方式2'''偽代碼if (yes/no):send yesif (Password:)send 密碼elif (Password:)send 密碼'''session.exp(("\$", "scp test.txt username@130.16.16.133:~/\r", # 期望$,并發送scp命令(("yes/no", "yes\r", ( # 如果返回yes/no,則發送yes"Password:", "xxx\r")), # 發送yes后,如果返回Password:結尾,則發送密碼("Password:", "xxx\r") # 如果返回的是Password:結尾,則發送密碼)))# 銷毀session.destroy()sftp.destroy()# 支持通過with的方式使用SFTPMultipleClient,這樣不需要顯示的調用 sftp.destroy()with SFTPMultipleClient(host="130.16.16.18", port=22, username="baiyang", pwd="xxx") as sftp:# 列出文件列表files = sftp.listdir_attr("/home/baiyang")for f in files:print(f)if __name__ == '__main__':__example()