要求:
輸出兩個程序,一個命令行程序(命令行參數用flag)和一個服務端程序。
命令行程序支持通過命令行參數配置下發IP或IP段、端口、掃描帶寬,然后將消息推送到kafka里面。
服務端程序:
- 從kafka消費者接收掃描任務信息
- 通過調用masscan啟動探測任務,獲取進度和結果信息,進度寫入Redis,結果信息寫入Kafka。
- 要求對啟動任務、kafka、整理流程進行封裝。
- 要求啟動2個server端,通過命令行程序下發2個不同網段,可以均勻的分配到2個server上面執行完成。
測試要求:
- 啟動兩個server端程序。
- 通過命令行程序下發兩個任務,IP不一樣。
- 看server端程序日志,是否均勻的掃描了兩個任務。
前置準備:
安裝docker
思路:
1. 系統架構設計
采用生產者-消費者模式:
- 命令行客戶端作為生產者,將掃描任務發布到Kafka
- 兩個服務端實例作為消費者,從Kafka獲取任務并執行
2. 關鍵組件設計
-
任務表示:
- 使用JSON格式表示掃描任務,包含:
- IP范圍(單個IP或CIDR格式)
- 端口范圍
- 掃描帶寬限制
- 任務狀態
- 進度信息
- 使用JSON格式表示掃描任務,包含:
-
Kafka設計:
- 創建一個主題(如
scan-tasks
) - 使用單個分區確保任務順序性(或根據需求設計分區策略)
- 考慮使用消費者組實現兩個服務端的負載均衡
- 創建一個主題(如
-
Redis設計:
- 存儲任務進度信息
- 使用Hash結構存儲每個任務的進度百分比
- 設置適當的TTL防止數據無限增長
-
服務端負載均衡:
- 兩個服務端加入同一個Kafka消費者組
- Kafka會自動將任務均勻分配給兩個消費者
3. 執行流程
-
客戶端流程:
- 解析命令行參數(IP范圍、端口、帶寬)
- 驗證輸入格式
- 創建Kafka生產者
- 將任務發布到Kafka主題
-
服務端流程:
- 初始化Kafka消費者(加入消費者組)
- 初始化Redis連接
- 循環消費任務:
a. 從Kafka獲取任務
b. 更新Redis中任務狀態為"running"
c. 調用masscan執行掃描:- 構造masscan命令行參數
- 啟動masscan進程
- 監控進程輸出和退出狀態
d. 實時解析masscan輸出,更新Redis中的進度
e. 掃描完成后: - 更新Redis中任務狀態為"completed"
- 將完整結果發布到另一個Kafka主題(如
scan-result
)
4. 關鍵技術點
-
Masscan集成:
- 使用
exec.Command
啟動masscan進程 - 實時解析masscan的標準輸出和錯誤輸出
- 根據輸出計算掃描進度
- 使用
-
錯誤處理:
- 處理無效IP格式
- 處理masscan執行失敗
- 處理Kafka/Redis連接問題
-
日志記錄:
- 記錄服務端操作日志
- 記錄任務執行狀態變化
- 記錄錯誤信息
5. 測試驗證思路
- 啟動兩個服務端實例
- 使用客戶端提交兩個不同網段的任務
- 觀察:
- 兩個服務端的日志輸出
- 任務是否被均勻分配(一個服務端處理一個任務)
- 掃描進度是否正確更新
- 最終結果是否正確輸出
6. 擴展考慮
-
任務優先級:
- 可以在任務中添加優先級字段
- 服務端根據優先級處理任務
-
任務超時:
- 添加任務超時機制
- 超時后重新分配任務
-
結果存儲:
- 可以考慮將結果存入數據庫而不僅是Kafka
-
水平擴展:
- 設計支持更多服務端實例的擴展方案
這個設計實現了基本的分布式掃描任務調度系統,核心是利用Kafka的消息隊列特性實現任務分發,通過消費者組機制實現負載均衡,使用Redis作為共享狀態存儲。
實現:
項目結構:

kafka:
consumer
package kafkaimport ("context""errors""fmt""github.com/IBM/sarama""log""sync"
)type MessageHandler func([]byte) errortype SaramaConsumer struct {client sarama.ConsumerGrouphandlers map[string]MessageHandlerready chan boolctx context.Contextcancel context.CancelFuncconsuming sync.WaitGroupmemberId stringgroupId string
}func NewKafkaConsumer(brokers []string, groupId string, topic []string) (*SaramaConsumer, error) {config := sarama.NewConfig()config.Version = sarama.V2_5_0_0 // 使用適當的 Kafka 版本config.Consumer.Offse