- 使用場景
- 使用步驟
- 引入模塊
- 組裝&發送數據
- 消費數據
- 故障轉移
使用場景
-
異步日志處理:將 OpenSIPS 中的 SIP 信令日志、通話記錄(CDR)等數據發送到 Kafka 隊列中。
-
事件通知與監控:利用 OpenSIPS 的 event_interface 模塊將 SIP 事件(如呼叫建立、斷開、注冊等)推送到 Kafka
OpenSIPS中事件接口有以下類型:
- EVENT_DATAGRAM - Publish JSON-RPC notifications using UDP, stable
- EVENT_FLATSTORE - Text/File backend for events, stable
- EVENT_KAFKA - Publish JSON-RPC notifications/generic messages to Apache Kafka , stable
- EVENT_STREAM - Publish JSON-RPC notifications using TCP, stable
- EVENT_ROUTE - Route triggering based on events, stable
- EVENT_ROUTING - Event-based routing, stable
- EVENT_RABBITMQ - Publish JSON-RPC notifications using AMQP over TCP , stable
- EVENT_VIRTUAL - Aggregator of event backends (failover & balancing), stable
- EVENT_XMLRPC - Event XMLRPC client module , stable
-
分布式消息隊列集成:在復雜的 VoIP 架構中,OpenSIPS 可以通過 Kafka 與其他服務(如計費系統、CRM 系統)解耦
-
計費與數據分析:OpenSIPS 生成的 CDR(Call Detail Records)可以通過 Kafka 推送至后端計費系統
-
故障隔離與重試機制:在 OpenSIPS 調用外部服務時(如鑒權、計費接口),如果目標服務不可用,可以將請求暫存到 Kafka
-
微服務架構下的通信橋梁:在基于微服務的 VoIP 架構中,OpenSIPS 作為 SIP 邊界網關,可通過 Kafka 與其他微服務(如認證服務、媒體控制服務)進行異步通信
-
消息廣播與事件驅動架構:OpenSIPS 可將特定的 SIP 事件廣播到 Kafka 的多個主題,供不同的下游服務消費
-
性能優化與流量削峰:在高并發場景下,Kafka 可以作為緩沖層,緩解 OpenSIPS 與后端系統之間的流量壓力
-
自定義業務邏輯擴展:通過 Kafka 與外部業務邏輯模塊解耦,可以在不影響 OpenSIPS 核心邏輯的前提下,靈活擴展新的業務功能
使用步驟
引入模塊
loadmodule "event_kafka.so"
modparam("event_kafka", "broker_id", "[k1]127.0.0.1:9092/opensips?g.linger.ms=100&t.acks=all")
鏈接語法:'kafka:' brokers '/' topic ['?' properties]
properties語法:'g.'|'t.' property '=' value ['&' 'g.'|'t.' property '=' value] ...
可以設置的proroperty參考官方說明
組裝&發送數據
$json(sql_obj) := "{}";$json(sql_obj/table) = 'acc';$json(sql_obj/method) = $param(method);$json(sql_obj/fromTag) = $param(from_tag);$json(sql_obj/toTag) = $param(to_tag);$json(sql_obj/callid) = $param(callid);$json(sql_obj/sipCode) = $param(sip_code);$json(sql_obj/sipReason) = $param(sip_reason);$json(sql_obj/time) = $param(time);$json(sql_obj/duration) = $param(duration);$json(sql_obj/msDuration) = $param(ms_duration);$json(sql_obj/setuptime) = $param(setuptime);$json(sql_obj/created) = $param(created);kafka_publish("k1", $json(sql_obj), $ci, "kafka_report");
可異步監聽回執
route[kafka_report] {xlog("[$avp(kafka_id)] status=$avp(kafka_status) key=$avp(kafka_key) msg=$avp(kafka_msg)\n");...
}
消費數據
$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic opensips --from-beginning{ "status": 1, "dlg_id": "17091151056627", "callid": "1-14809@127.0.0.1", "from_uri": "sip:20250610@127.0.0.1:5060", "from_tag": "1", "to_uri": "sip:tt061013776167200@127.0.0.1:5900", "caller_sock": "127.0.0.1:5900", "caller_contact": "sip:20250610@127.0.0.1:5060", "start_time": 0, "timeout": 0, "caller_in": "20250610", "callee_in": "tt061013776167200", "caller_gateway": "46", "callee_gateway": "9", "src_ip": "127.0.0.1", "dst_ip": "127.0.0.1" }
{ "status": 5, "dlg_id": "17091151056627", "to_uri": "sip:tt061013776167200@127.0.0.1:5900", "to_tag": "1", "callee_contact": "sip:127.0.0.1:5080;transport=UDP", "start_time": "1749459233", "timeout": "1749462833", "caller_in": "20250610", "callee_in": "tt061013776167200", "caller_gateway": "46", "callee_gateway": "9", "src_ip": "127.0.0.1", "dst_ip": "127.0.0.1" }
{ "table": "acc", "method": "INVITE", "fromTag": "1", "toTag": "1", "callid": "1-14809@127.0.0.1", "sipCode": "200", "sipReason": "OK", "time": 1749459233, "duration": 4, "msDuration": 3411, "setuptime": 8, "created": 1749459225, "srcIp": "127.0.0.1", "dstIp": "127.0.0.1", "caller": "20250610", "callee": "331213776167200", "callStartTime": "1749459233.433263", "callEndTime": "1749459236.844957", "callerIn": "20250610", "calleeIn": "tt061013776167200", "callerOut": "20241213ob16701", "calleeOut": "calleeout330613776167200", "callergateway": "46", "calleegateway": "9", "calllevel": "0", "routinglevel": "0", "calleraccount": "1", "calleeaccount": "2", "callerCallid": "1-14809@127.0.0.1", "calleeCallid": "", "area": "", "endSide": "1", "endCode": "9201", "endReason": "caller hang up", "realDuration": "3411", "through_jt": "1", "callerproductid": "", "calleeproductid": "", "routing_path": "9-200-0;", "node_addr": "127.0.0.1:5900", "multi_gw": "", "s_timeout": "", "event_time": "1749459236.847516" }
{ "status": 6, "dlg_id": "17091151056627", "callid": "1-14809@127.0.0.1" }
故障轉移
引入event_virtual/event_flatstore,將事件消息通過隊列傳遞,并且支持故障轉移
異常信息 ->> EVENT ->> KAFKA(故障鏈路 ->> EVENT_VIRTUAL ->> EVENT_FLATSTORE)
loadmodule "event_flatstore.so"
loadmodule "event_kafka.so"
loadmodule "event_virtual.so"startup_route {subscribe_event("E_MY_EVENT", "virtual:FAILOVER kafka:127.0.0.1:9092/opensipsfailover flatstore:/var/log/myevents");
}