LinkedIn 自動消息發送工具說明文檔
一、項目概述
本項目是一個基于 Python 的自動化工具,用于批量向指定 LinkedIn 用戶發送消息。
核心功能包括:
讀取消息模板和 URL 列表; 使用瀏覽器模擬操作,自動發送 LinkedIn 消息; 使用 Redis 緩存已發送的 URL,避免重復發送; 支持命令行參數配置,靈活控制運行行為。
二、項目結構
├── main.py # 主程序入口
├── linkedin_cat/
│ └── message.py # LinkedinMessage 類,負責發送消息
├── cookies.json # LinkedIn 登錄 Cookie(用戶自備)
├── message.txt # 消息模板文件(用戶自備)
├── urls.txt # 目標 LinkedIn 用戶 URL 列表(用戶自備)
├── log.txt # 運行日志文件(自動生成)
三、依賴環境
Python 3.6+ 第三方庫:
redis
selenium
(LinkedinMessage 類內部使用)
Redis 服務(用于緩存已發送的 URL)
可使用以下命令安裝依賴:
pip install redis selenium
四、使用方法
4.1 準備文件
cookies.json :登錄 LinkedIn 后,從瀏覽器中導出的 Cookie 信息(JSON 格式)。message.txt :要發送的消息內容(純文本)。urls.txt :每行一個目標 LinkedIn 用戶主頁 URL。
4.2 運行命令
python main.py cookies.json message.txt urls.txt "button_class" [ --可選參數]
參數說明:參數 說明 cookies LinkedIn 登錄 Cookie 文件路徑 message 消息模板文件路徑 urls 目標 URL 列表文件路徑 button_class LinkedIn 頁面“發送消息”按鈕的 class 屬性值(需自行查找)
可選參數:參數 默認值 說明 --headless
False 無頭模式運行(不打開瀏覽器窗口) --redis-host
localhost Redis 地址 --redis-port
6379 Redis 端口 --redis-db
0 Redis 數據庫編號 --redis-password
None Redis 密碼 --redis-max-connections
10 Redis 最大連接數 --max-urls
100 最多處理的 URL 數量
4.3 示例
python main.py cookies.json message.txt urls.txt "ieSHXhFfVTxQfadOJdXYOIDuVKsBXgPtjNxI" --headless --max-urls 50
五、核心邏輯說明
5.1 Redis 緩存機制
每個 URL 作為 Redis 的 key; Value 為該 URL 最近一次成功發送消息時的時間戳; 若某 URL 在 30 天內已發送過,則跳過; 若某 URL 的 Value 為小于 100 的整數,則視為“黑名單”,永久跳過。
5.2 消息發送流程
讀取消息模板和 URL 列表; 初始化 Redis 連接; 遍歷 URL 列表:
若 URL 不存在于 Redis 中,則直接發送消息; 若 URL 存在于 Redis 中,檢查時間戳:
每次成功發送后,更新 Redis 中的時間戳。
5.3 日志記錄
所有運行日志寫入 log.txt
; 日志格式:時間 - 級別 - 消息
。
六、注意事項
LinkedIn 反爬機制 :頻繁操作可能導致賬號被限制,請合理設置發送間隔和數量;Cookie 有效性 :Cookie 可能會過期,需定期更新;按鈕 class 值 :LinkedIn 頁面結構可能會變化,需定期更新按鈕 class 值;Redis 持久化 :建議開啟 Redis 持久化,避免重啟后數據丟失。
七、常見問題問題 解決方案 程序運行后立即退出 檢查 Cookie 是否有效,或按鈕 class 值是否正確 提示“Skipping URL” 該 URL 已發送過,且未超過 30 天 日志中出現異常信息 檢查 Redis 是否正常運行,網絡是否暢通
八、后續擴展建議
支持多賬號輪詢發送; 支持發送間隔隨機化,降低風控風險; 支持消息模板變量替換(如用戶名); 支持 Web 管理界面,可視化配置任務。
import redis
import argparse
from linkedin_cat. message import LinkedinMessage
import json
import time
import datetimeimport logging
logger = logging. getLogger( __name__)
logger. setLevel( logging. INFO)
file_handler = logging. FileHandler( 'log.txt' , encoding= 'utf-8' )
file_handler. setLevel( logging. INFO)
formatter = logging. Formatter( '%(asctime)s - %(levelname)s - %(message)s' )
file_handler. setFormatter( formatter)
logger. addHandler( file_handler) def is_timestamp_difference_greater_than_months ( float_value, months_in_seconds) : """判斷時間戳與當前時間的差異是否大于指定的秒數(表示的月數):param float_value: 浮點數時間戳:param months_in_seconds: 以秒為單位表示的月數:return: 如果時間差大于指定的秒數,返回True;否則返回False""" timestamp = int ( float_value) current_timestamp = int ( time. time( ) ) time_difference = current_timestamp - timestampif time_difference > months_in_seconds: print ( '可以發送.....' ) return True else : return False class RedisHelper : def __init__ ( self, host= 'localhost' , port= 6379 , db= 0 , password= None , max_connections= 10 ) : self. host = hostself. port = portself. db = dbself. password = passwordself. max_connections = max_connectionsself. pool = redis. ConnectionPool( host= self. host, port= self. port, db= self. db, password= self. password, max_connections= self. max_connections) self. conn = redis. Redis( connection_pool= self. pool) def set ( self, key, value, ex= None , px= None , nx= False , xx= False ) : self. conn. set ( key, value, ex= ex, px= px, nx= nx, xx= xx) def get ( self, key) : return self. conn. get( key) @classmethod def get_key_value_timestamp ( cls, key, host= 'localhost' , port= 6379 , db= 0 , password= None , max_connections= 10 ) : redis_helper = cls( host= host, port= port, db= db, password= password, max_connections= max_connections) value = redis_helper. get( key) if value is not None : try : decoded_string = value. decode( 'utf-8' ) float_value = float ( decoded_string) timestamp = int ( float_value) timestamp_datetime = datetime. datetime. fromtimestamp( timestamp) current_datetime = datetime. datetime. now( ) time_difference = current_datetime - timestamp_datetimeif time_difference > datetime. timedelta( weeks= 4 ) : return { "key" : key, "value" : timestamp, "resend" : True , "time_difference" : time_difference} else : return { "key" : key, "value" : timestamp, "resend" : False , "time_difference" : time_difference} except ValueError: return { "key" : key, "value" : value, "resend" : False , "error" : "value is not a valid timestamp" } else : return { "key" : key, "resend" : False , "error" : "key not found or value is None" } def get_message ( message_file_path) : with open ( message_file_path, "r" , encoding= "utf8" ) as f: message = f. read( ) return messagedef read_urls_list ( urls_file_path) : with open ( urls_file_path, "r" , encoding= "utf8" ) as f: urls_list = f. readlines( ) urls_list = [ url. strip( ) for url in urls_list] return urls_listdef send_messages ( urls_list, message, storage_helper, bot, max_urls) : urls_list = urls_list[ : max_urls] for raw_url in urls_list: url = raw_urlif storage_helper. get( url) : value = storage_helper. get( url) decoded_string = value. decode( 'utf-8' ) float_value = float ( decoded_string) timestamp = int ( float_value) if timestamp < 100 : print ( f"Skipping URL { url} as it has already been marked." ) continue result = is_timestamp_difference_greater_than_months( float_value, 30 * 86400 ) if result: print ( ">>>>>" , timestamp, url) result = bot. send_single_request( raw_url, message) if result != 'fail' : storage_helper. set ( url, time. time( ) ) print ( f"Message sent to { url} and URL marked as processed." ) continue else : print ( f"Skipping URL { url} as it has already been processed." ) continue else : result = bot. send_single_request( raw_url, message) if result != 'fail' : storage_helper. set ( url, time. time( ) ) print ( f"Message sent to { url} and URL marked as processed." ) def main ( ) : parser = argparse. ArgumentParser( description= 'Send LinkedIn messages to a list of URLs.' ) parser. add_argument( 'cookies' , type = str , help = 'Path to the LinkedIn cookies JSON file (e.g., "cookies.json").' ) parser. add_argument( 'message' , type = str , help = 'Path to the message file (e.g., "message.txt").' ) parser. add_argument( 'urls' , type = str , help = 'Path to the URLs file (e.g., "urls.txt").' ) parser. add_argument( 'button_class' , type = str , help = """Message Button Class: ieSHXhFfVTxQfadOJdXYOIDuVKsBXgPtjNxI (eg:<button aria-label="Invite XXXX to connect" id="ember840"class="artdeco-button artdeco-button--2artdeco-button--primary ember-view ieSHXhFfVTxQfadOJdXYOIDuVKsBXgPtjNxI"type="button">)""" ) parser. add_argument( '--headless' , action= 'store_true' , help = 'Run the bot in headless mode (without opening a browser window).' ) parser. add_argument( '--redis-host' , type = str , default= 'localhost' , help = 'Redis host (default: localhost)' ) parser. add_argument( '--redis-port' , type = int , default= 6379 , help = 'Redis port (default: 6379)' ) parser. add_argument( '--redis-db' , type = int , default= 0 , help = 'Redis database (default: 0)' ) parser. add_argument( '--redis-password' , type = str , default= None , help = 'Redis password (default: None)' ) parser. add_argument( '--redis-max-connections' , type = int , default= 10 , help = 'Redis max connections (default: 10)' ) parser. add_argument( '--max-urls' , type = int , default= 100 , help = 'Maximum number of URLs to process (default: 100)' ) args = parser. parse_args( ) message = get_message( args. message) urls_list = read_urls_list( args. urls) bot = LinkedinMessage( args. cookies, args. headless, button_class= args. button_class) storage_helper = RedisHelper( host= args. redis_host, port= args. redis_port, db= args. redis_db, password= args. redis_password, max_connections= args. redis_max_connections) send_messages( urls_list, message, storage_helper, bot, args. max_urls) if __name__ == "__main__" : main( )