python3實現類似expect shell的交互式與SFTP的腳本

前面寫過一篇關于python實現類似expect shell的交互式能力的文章,現在補全一下加上sftp的能力腳本。
例子在代碼中__example()方法。

依賴paramiko庫,所以需要執行pip install paramiko來安裝。

import os
import queue
import re
import threading
import time
import traceback
import stat
import datetimeimport paramiko
from paramiko import SSHClient, SSHException, SFTPClient, Channelclass SFTPMultipleClient(object):"""支持多sftp連接拉取文件支持快捷執行遠程命令"""def __init__(self, host, port, username, pwd, work_count=1) -> None:super().__init__()# client = SSHClient()# client.set_missing_host_key_policy(paramiko.AutoAddPolicy())# client.connect(hostname=host, timeout=60, port=port, username=username, password=pwd)# self.ssh_client = client# self.sftp_client = client.open_sftp()self.host = hostself.port = portself.username = usernameself.pwd = pwdself.work_count = work_countself.thread_local_data = threading.local()self.pull_queue = queue.Queue()self.ssh_clientlist = []self.sftp_clientlist = []self.thread_list = {}self.email_send_files = []self.ptySessions = [](self.ssh_client, self.sftp_client) = self._gen_sftp_client(host, port, username,pwd)  # type:(SSHClient, SFTPClient)for i in range(work_count):self.thread_list[self._start_back_task(self._pull_event_handler)] = Falsedef _start_back_task(self, fun, args=()):t = threading.Thread(target=fun, daemon=True, args=args)t.start()return tdef remote_scp_progress(self, a, b):"""遠程scp進度打印:param a: 當前已傳輸大小(單位字節):param b: 文件總大小(單位字節)"""if a > b:a = bsecond = self.thread_local_data.timer.last_press_time_deltaif second > 1 or a == b:self.thread_local_data.timer.press()speed = a - self.thread_local_data.last_progressself.thread_local_data.last_progress = aremaining_time_second = (b - a) / speeds = self.thread_local_data.get_file_desc + "%s  %s %02d%s %dKB/s %02d:%02d " % \(a, b, int(a / b * 100), "%", speed / 1024, remaining_time_second / 60, remaining_time_second % 60)print(s)# print(s, end="", flush=True)# _back = ""# for i in range(len(s)):#     _back += '\b'# print(_back, end="")def _gen_sftp_client(self, host, port, username, pwd) -> (SSHClient, SFTPClient):"""生成sftp客戶端當連接斷開會進行重試,最大重試連接5次"""_try_count = 0_ssh_client = SSHClient()_ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())_sftp_client = Nonetname = threading.current_thread().namestime = 1while _try_count < 5:connect_failed = Falsetry:_ssh_client.connect(hostname=host, timeout=60, port=port, username=username,password=pwd)_sftp_client = _ssh_client.open_sftp()print(tname + " ssh連接成功.")breakexcept Exception as e:connect_failed = Truetraceback.print_exc()if connect_failed:try:_ssh_client.close()if _sftp_client:_sftp_client.close()except Exception as e:traceback.print_exc()_try_count += 1if _try_count == 5:raise Exception("嘗試重新連接達到最大次數.斷開連接.")traceback.print_exc()print(tname + " 第%d次重新連接ssh..." % _try_count)stime *= 2time.sleep(stime)self.ssh_clientlist.append(_ssh_client)self.sftp_clientlist.append(_sftp_client)self.thread_local_data.ssh_client = _ssh_clientself.thread_local_data.sftp_client = _sftp_clientreturn _ssh_client, _sftp_clientdef _pull_event_handler(self):"""拉取任務處理"""self._gen_sftp_client(self.host, self.port, self.username, self.pwd)# self.thread_local_data.ssh_client = _ssh_client# self.thread_local_data.sftp_client = _sftp_clientwhile True:self.thread_list[threading.current_thread()] = Falsedata = self.pull_queue.get(block=True, timeout=None)self.thread_list[threading.current_thread()] = True# 通過隊列插入None來結束線程,如果有n個線程在監聽隊列,那么需要n個None來結束if data is None:self.thread_list[threading.current_thread()] = Falsebreaktry:self._remote_scp(self.thread_local_data.ssh_client, self.thread_local_data.sftp_client,data.remote_path,data.local_path, data.max_size, callback=self.remote_scp_progress)fun = data.call_bak_funif fun:call_bak_fun_args = ()call_bak_fun_kwargs = {}if type(fun) in [list, tuple]:if not callable(fun[0]):raise Exception("call_bak_fun 不是可調用對象")if len(fun) > 1:call_bak_fun_args = fun[1]if len(fun) > 2:call_bak_fun_kwargs = fun[2]else:if not callable(fun):raise Exception("call_bak_fun 不是可調用對象")fun[0](*call_bak_fun_args, **call_bak_fun_kwargs)except Exception as e:traceback.print_exc()# pull_queue.put(data)# print("線程%s 結束." % threading.current_thread().name)def _remote_scp(self, _ssh_client, _sftp_client, _remote_path, _local_path, max_size=None, callback=None):reconnect = Falsetry:if os.path.exists(_local_path):rf_stat = _sftp_client.lstat(_remote_path)lf_stat = os.stat(_local_path)if rf_stat.st_size == lf_stat.st_size:print(_local_path + " already exists.")returnif max_size and rf_stat.st_size > max_size:  # 如果大于1m則認為是空板圖片print(_local_path + " > " + max_size + " skipped.")returnprint(threading.current_thread().name + "copy file:%s << %s\t" % (_local_path, _remote_path))self.thread_local_data.get_file_desc = "copy file:%s << %s\t" % (_local_path, _remote_path)self.thread_local_data.timer = Timer()self.thread_local_data.last_progress = 0_sftp_client.get(_remote_path, _local_path, callback=callback)except FileNotFoundError as e:traceback.print_exc()print("continue...")except SSHException as e:traceback.print_exc()reconnect = Trueexcept OSError as e:print("os error====")traceback.print_exc()reconnect = Trueif reconnect:print("重新連接ssh...")_sftp_client.close()_ssh_client.close()self.sftp_clientlist.remove(_sftp_client)self.ssh_clientlist.remove(_ssh_client)_ssh_client, _sftp_client = self._gen_sftp_client(self.host, self.port, self.username, self.pwd)self._remote_scp(_ssh_client, _sftp_client, _remote_path, _local_path, max_size, callback)if callback:print()def pull_file(self, remote_path, local_path, max_size=None, print_progress=False):"""拉取文件:param remote_path: 遠程文件路徑:param local_path: 本地文件路徑:param max_size: 遠程文件如果超過了這個大小,則不做拉取:param print_progress: 是否打印文件拉取進度"""if print_progress:self._remote_scp(self.ssh_client, self.sftp_client, remote_path, local_path, max_size,self.remote_scp_progress)else:self._remote_scp(self.ssh_client, self.sftp_client, remote_path, local_path, max_size)def submit_pull_work(self, remote_path, local_path, max_size=None, call_bak_fun=None):"""提交拉取文件任務:param remote_path: 遠程文件路徑:param local_path: 本地文件路徑:param max_size: 遠程文件如果超過了這個大小,則不做拉取:param call_bak_fun: 拉取完成回調方法"""e_data = Task(remote_path, local_path, max_size, call_bak_fun)self.pull_queue.put(e_data)def exec_command(self, command, *args, raise_e=True, **kwargs):"""執行遠程命令"""stdin, stdout, stderr = self.ssh_client.exec_command(command, *args, **kwargs)if raise_e and stdout.channel.recv_exit_status() != 0:raise Exception(stderr.readline())return stdin, stdout, stderrdef mkdir(self, path, recursive=True, mode=0o777):"""創建文件夾:param path: 遠程文件夾:param recursive: 是否遞歸創建 默認true:param mode: 權限,默認0o777-全部權限"""if not recursive:self.sftp_client.mkdir(path, mode)returndirs = path.split(os.path.sep)current_dir = "/"for i, d in enumerate(dirs):d = os.path.join(current_dir, d)create = Truetry:if stat.S_ISDIR(self.sftp_client.stat(d).st_mode):create = Falseexcept FileNotFoundError:create = Trueif create:self.sftp_client.mkdir(d, mode)current_dir = ddef destroy(self):"""銷毀當前實例"""while not self.pull_queue.empty():  # 等待隊列任務處理完畢time.sleep(0.2)# 結束線程for thread in self.thread_list:self.pull_queue.put(None)  # 通過None來停止線程while True:if thread.is_alive() and self.thread_list[thread]:time.sleep(1)else:break# self.thread_list.pop(thread)# for i in range(len(self.thread_list)):#     self.pull_queue.put(None)  # 通過None來停止線程#     self.thread_list.pop()  # 移除成員for ptySession in self.ptySessions:ptySession.destroy()for i in range(len(self.sftp_clientlist)):sftp_client = self.sftp_clientlist.pop()sftp_client.close()for i in range(len(self.ssh_clientlist)):ssh_client = self.ssh_clientlist.pop()ssh_client.close()def __enter__(self):return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.destroy()def open_pty_session(self, ending_char, timeout=30):"""從當前channel開啟一個session并激活@return:"""_session = self.ssh_client.get_transport().open_session(timeout=1 * 3600)  # type:Channel_session.get_pty()_session.invoke_shell()_ptysession = PtySession(_session, ending_char=ending_char, timeout=timeout)self.ptySessions.append(_ptysession)return _ptysessiondef listdir_attr(self, path):"""列出遠程目錄:param path: 遠程目錄:return: list of `.SFTPAttributes` objects"""_reconnect = Falsetry:return self.sftp_client.listdir_attr(path)except SSHException as e:traceback.print_exc()_reconnect = Trueexcept OSError as e:traceback.print_exc()_reconnect = Trueif _reconnect:print("重新連接ssh...")self.sftp_client.close()self.ssh_client.close()self.sftp_clientlist.remove(self.sftp_client)self.ssh_clientlist.remove(self.ssh_client)self.ssh_client, self.sftp_client = self._gen_sftp_client(self.host, self.port, self.username, self.pwd)return self.listdir_attr(path)# 一次讀取字節長度
recv_len = 1024 * 5class Task(object):def __init__(self, remote_path, local_path, max_size=None, call_bak_fun=None) -> None:super().__init__()self.remote_path = remote_path  # 遠程路徑self.local_path = local_path  # 本地路徑self.max_size = max_size  # 最大大小,當大于這個大小將跳過不做處理self.call_bak_fun = call_bak_fun  # 回調方法 格式:(fn,arg,kw)class PtySession(object):def __init__(self, session, ending_char, timeout=30) -> None:super().__init__()self.session = session  # type:Channelself.last_line = ""self.ending_char = ending_char  # 每執行一個命令之后,標記輸出的結束字符self.clear_tail()self.timeout = timeout  # 超時時間,秒def clear_tail(self, _ending_char=None):"""清理輸出還處于緩沖區中未讀取的流"""if not _ending_char:_ending_char = self.ending_charwhile True:time.sleep(0.2)# self.session.recv_ready()在讀取過程中不一定總是True,只有當讀取緩沖流中有字節讀取時,才會為True。所以在讀取頭一次后獲取下次流到緩沖區中前為Falseif self.session.recv_ready():self.last_line = self.session.recv(recv_len)self.last_line = self.last_line.decode('utf-8')print(self.last_line)if re.search(_ending_char, self.last_line):breakdef destroy(self):"""銷毀并關閉session:return::rtype:"""self.clear_tail()self.session.close()def exp(self, *exp_cmds):"""期望并執行,與expect的用法類似。session.exp(("\$", "scp test.txt luckydog@127.0.0.1:~/\r",(("yes/no", "yes\r",("Password:", "luckydog\r")),("Password:", "luckydog\r")))):param exp_cmds: 第一個元素為獲取的期望結束字符,第二個元素為需要執行的命令,如果傳入的第三個元素,則第三個元素必須為元祖,并且也同父級一樣,屬遞歸結構。類似GNU的遞歸縮寫。:type exp_cmds: tuple"""interval = 0.2cur_time = 0.0try:while True:if self.session.recv_ready():self.last_line = self.session.recv(recv_len).decode('utf-8')print(self.last_line)elif self.session.send_ready():for exp_cmd in exp_cmds:_cmd = exp_cmd[1]if not _cmd.endswith("\r"):_cmd += "\r"match = re.search(exp_cmd[0], self.last_line)if match and match.group():self.session.send(_cmd)# 清空最后一行數據緩存,便于下個命令的讀取流輸出。此行代碼去除,會導致無法等待命令執行完畢提前執行后續代碼的問題。self.last_line = ""if len(exp_cmd) == 3 and exp_cmd[2]:self.exp(*exp_cmd[2])returncur_time += intervalif cur_time >= self.timeout:raise Exception("timeout...")time.sleep(interval)except Exception as e:traceback.print_exc()# finally:#     self.clear_tail()def send(self, cmd, _ending_char=None):"""單純的發送命令到目標服務器執行。:param cmd: 命令:type cmd: str"""self.last_line = ""if not cmd.endswith("\r"):cmd += "\r"self.session.send(cmd)self.clear_tail(_ending_char)# -----------------------
class Timer(object):def __init__(self) -> None:super().__init__()self._start_time = Noneself._end_time = Noneself._seconds_delta = Noneself._last_press_time = None  # 最后一個計次時間def start(self):if not self._start_time:self._start_time = datetime.datetime.now()self._last_press_time = self._start_timeelse:raise Exception("timer is already started.")return self._start_timedef end(self):if not self._end_time:self._end_time = datetime.datetime.now()self._seconds_delta = (self._end_time - self._start_time).total_seconds()else:raise Exception("timer is already stopped.")def press(self):if not self._start_time:self._last_press_time = self.start()return 0now = datetime.datetime.now()last_time = self._last_press_timeself._last_press_time = nowreturn (now - last_time).total_seconds()@propertydef last_press_time_delta(self):if not self._last_press_time:self.press()return 0return (datetime.datetime.now() - self._last_press_time).total_seconds()@propertydef start_time(self):return self._start_time@propertydef end_time(self):return self._end_timedef timedelta_seconds(self):if self._seconds_delta:return self._seconds_deltaelse:raise Exception("timer not stopped.")# ===================test=========================
def _file_pull_complete(filename, **kw):print(filename + "文件上傳完成")def __example():sftp = SFTPMultipleClient(host="130.16.16.18", port=22, username="baiyang", pwd="xxx")# 創建目錄sftp.mkdir("/home/username/123/")# 下載文件到本地(同步執行)sftp.pull_file("/home/username/temp/rumenz.img",  # 遠程文件"/Users/username/rumenz.img",  # 本地路徑文件,需要帶上文件名print_progress=True  # 打印文件拉取進度)# 提交下載遠程文件異步任務(異步線程執行不阻塞)sftp.submit_pull_work("/home/username/temp/rumenz.img",  # 遠程文件"/Users/username/rumenz.img",  # 本地路徑文件,需要帶上文件名call_bak_fun=(_file_pull_complete, ("rumenz.img",))  # 文件上傳完成回調,第一個元素為方法,第二個是入參)# 列出文件列表files = sftp.listdir_attr("/home/baiyang")for f in files:print(f)# 開啟交互回話session = sftp.open_pty_session("\$")# 發送單條命令session.send("cd ~/123")# # 發送單條命令并等待獲取_ending_char符號(結束符支持el)session.send("scp test.txt username@130.16.16.133:~/\r", "Password:|yes/no")# # 當最后一行為yes/no 則執行yes,如果是Password則輸入密碼session.exp(("yes/no", "yes\r",("Password:", "xxx\r")),("Password:", "xxx\r"))# scp方式2'''偽代碼if (yes/no):send yesif (Password:)send 密碼elif (Password:)send 密碼'''session.exp(("\$", "scp test.txt username@130.16.16.133:~/\r",  # 期望$,并發送scp命令(("yes/no", "yes\r", (  # 如果返回yes/no,則發送yes"Password:", "xxx\r")),  # 發送yes后,如果返回Password:結尾,則發送密碼("Password:", "xxx\r")  # 如果返回的是Password:結尾,則發送密碼)))# 銷毀session.destroy()sftp.destroy()# 支持通過with的方式使用SFTPMultipleClient,這樣不需要顯示的調用 sftp.destroy()with SFTPMultipleClient(host="130.16.16.18", port=22, username="baiyang", pwd="xxx") as sftp:# 列出文件列表files = sftp.listdir_attr("/home/baiyang")for f in files:print(f)if __name__ == '__main__':__example()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/165887.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/165887.shtml
英文地址,請注明出處:http://en.pswp.cn/news/165887.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

綜合實力盤點高性價比還優質的云服務器:亞馬遜云科技仍然領跑市場

如果說云計算是一條流向數字化未來的河流&#xff0c;那亞馬遜云科技毫無疑問是航行在最前面的帆船&#xff1b;如果說云計算是一條通往數字化未來的鐵軌&#xff0c;那亞馬遜云科技就是行駛在最前面的高鐵。接下來回首往昔&#xff0c;以史為鏡&#xff0c;得出云服務器哪家便…

毛里塔尼亞市場開發攻略,收藏一篇就夠了

毛里塔尼亞是非洲西北部的一個國家&#xff0c;也是中國長期援建的一個國家&#xff0c;也是一帶一路上的國家。毛里塔尼亞生產生活資料依賴進口&#xff0c;長期依賴跟我們國家的貿易關系也是比較緊密的&#xff0c;今天就來給大家介紹一下毛里塔尼亞的市場開發公路。文章略長…

Python監控服務進程及自啟動服務方法與實踐

1. 需求概述 當我們在Windows Server環境中部署XX系統的實際應用中&#xff0c;往往會遇到一些運維管理的挑戰。為了確保系統的持續穩定運行&#xff0c;特別是在服務程序因各種原因突然關閉的情況下&#xff0c;我們可以借助Python的強大生態系統來構建一個監控與自動重啟的管…

分布式鏈路追蹤入門篇-基礎原理與快速應用

為什么需要鏈路追蹤&#xff1f; 我們程序員在日常工作中&#xff0c;最常做事情之一就是修bug了。如果程序只是運行在單機上&#xff0c;我們最常用的方式就是在程序上打日志&#xff0c;然后程序運行的過程中將日志輸出到文件上&#xff0c;然后我們根據日志去推斷程序是哪一…

Comsol Multiphysics 6.2 for Mac建模仿真軟件

COMSOL Multiphysics是一款多物理場仿真軟件&#xff0c;旨在幫助工程師、科學家和研究人員解決各種復雜的工程和科學問題。該軟件使用有限元分析方法&#xff0c;可以模擬和分析多個物理場的相互作用&#xff0c;包括結構力學、熱傳導、電磁場、流體力學和化學反應等。 COMSOL…

一些好用的前端小插件(轉自知乎)

一些好用的前端小插件&#xff08;2&#xff09; 1. cropper.js Cropper.js 2.0 是一系列用于圖像裁剪的 Web 組件。 官網地址&#xff1a;https://fengyuanchen.github.io/cropperjs/v2/zh/ 2. Vditor Vditor是一款瀏覽器端的 Markdown 編輯器&#xff0c;支持所見即所得、…

2024年度投資策略:AI大模型和半導體國產化加速

今天分享的是AI系列深度研究報告&#xff1a;《2024年度投資策略&#xff1a;AI大模型和半導體國產化加速》。 &#xff08;報告出品方&#xff1a;東方證券&#xff09; 報告共計&#xff1a;48頁 前言: 行情回顧與未來展望 電子板塊漲幅轉正&#xff0c;信心逐漸回歸。截至…

人人都會Blazor —— 3.3 參數

參數最常見的使用,目的是使組件可以接收動態數據。 聲明參數 參數使用 [Parameter] 特性的公共 C# 屬性進行定義。 在下面的示例中,內置引用類型 (System.String) 和用戶定義的引用類型 (PanelBody) 作為組件參數進行傳遞。 PanelBody.cs: public class PanelBody {publ…

SQL注入漏洞發現和利用,以及SQL注入的防護

一、背景 SQL注入漏洞是一種常見的軟件安全問題&#xff0c;它發生在應用程序的數據庫層中。其核心原理是將用戶輸入的數據當做代碼來執行&#xff0c;違反了“數據與代碼分離”的原則。具體來說&#xff0c;攻擊者通過構造惡意的SQL查詢語句&#xff0c;使得應用程序在執行SQ…

Android NFC手機上實現卡模擬

1&#xff0c; 問&#xff1a;能否在AndroidNFC手機上實現卡模擬&#xff1f; 答&#xff1a;在技術上可行&#xff0c;但是&#xff0c;對一般開發人員來講&#xff0c;目前看來僅僅是技術上可行。 2&#xff0c; 問&#xff1a;具體如何實現呢&#xff1f; 答&#xff1…

git的使用記錄

GitHub是公有的遠程倉庫&#xff0c;Gitlab是私有的遠程倉庫。 git add file git commit -m "add file" git mv filea fileb git log 顯示提交記錄 git log --oneline 一行的簡略信息顯示 git log --oneline --decorate 顯示當前指針 git reset --ha…

矩陣知識補充

正交矩陣 定義&#xff1a; 正交矩陣是一種滿足 A T A E A^{T}AE ATAE的方陣 正交矩陣具有以下幾個重要性質&#xff1a; A的逆等于A的轉置&#xff0c;即 A ? 1 A T A^{-1}A^{T} A?1AT**A的行列式的絕對值等于1&#xff0c;即 ∣ d e t ( A ) ∣ 1 |det(A)|1 ∣det(A)∣…

通用功能——git 攻略

摘要 本文主要介紹git常用命令的使用方法&#xff0c;同時介紹一些常見問題的處理方法&#xff0c;持續更新中… git命令通用選項 大多數git命令都適用的選項列表如下&#xff1a; -v, --verbose show hash and subject, give twice for upstream branch -q, --quie…

Vim 一下日志文件,Java 進程沒了?

一次端口告警&#xff0c;發現 java 進程被異常殺掉&#xff0c;而根因竟然是因為在問題機器上 vim 查看了 nginx 日志。下面我將從時間維度詳細回顧這次排查&#xff0c;希望讀者在遇到相似問題時有些許啟發。 時間線 15:19 收到端口異常 odin 告警。 狀態:P1故障 名稱:應用端…

黑馬點評筆記 redis實現優惠卷秒殺

文章目錄 難題全局唯一IDRedis實現全局唯一Id 超賣問題問題解決方案樂觀鎖問題 一人一單 難題 要解決優惠卷秒殺的問題我們要考慮到三個個問題&#xff0c;全局唯一ID&#xff0c;超賣問題&#xff0c;一人一單。 全局唯一ID 用戶搶購時&#xff0c;就會生成訂單并保存到同一…

【git】pip install git+https://github.com/xxx/xxx替換成本地下載編譯安裝解決網絡超時問題

目錄 &#x1f311;&#x1f311; 背景 &#x1f312; &#x1f312;作用 &#x1f314;&#x1f314; 問題 &#x1f314;&#x1f314;解決方案 &#x1f319;方法一 &#x1f319;方法二 &#x1f31d;&#x1f31d;我的解決方案 整理不易&#xff0c;歡迎一鍵三連…

7-12 統計投票情況(集合)

7-12 統計投票情況&#xff08;集合&#xff09; 分數 10 作者 python課程組 單位 福州大學至誠學院 利用集合分析活動投票情況。 第一小隊有五名隊員&#xff0c;序號是1,2,3,4,5&#xff1b;第二小隊也有五名隊員&#xff0c;序號6,7,8,9,10。 輸入一個由得票隊員編號組成的…

分布式篇---第三篇

系列文章目錄 文章目錄 系列文章目錄前言一、什么是補償事務?二、消息隊列是怎么實現的?三、那你說說Sagas事務模型前言 前些天發現了一個巨牛的人工智能學習網站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到網站,這篇文章男女通用,看懂了就去分享給你的碼吧。…

qgis添加postgis數據

左側瀏覽器-PostGIS-右鍵-新建連接 展開-雙擊即可呈現 可以點擊編輯按鈕對矢量數據編輯后是直接入庫的&#xff0c;因此謹慎使用。

【DQN】基于pytorch的強化學習算法Demo

目錄 簡介代碼 簡介 DQN&#xff08;Deep Q-Network&#xff09;是一種基于深度神經網絡的強化學習算法&#xff0c;于2013年由DeepMind提出。它的目標是解決具有離散動作空間的強化學習問題&#xff0c;并在多個任務中取得了令人矚目的表現。 DQN的核心思想是使用深度神經網…