要確認 ACC_JSON 模塊是否已經成功將計費信息推送到消息隊列(MQueue),以及如何從隊列中取值,可以按照以下步驟進行操作:
1. 確認 ACC_JSON 已推送到隊列
1.1 配置 ACC_JSON
確保 ACC_JSON 模塊已正確配置并啟用。以下是一個示例配置:
loadmodule "acc_json.so"
modparam("acc_json", "log_flag", 1) # 啟用 JSON 記錄
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)") # 記錄額外信息route {if (method == "INVITE") {setflag(1); # 設置計費標志t_relay(); # 轉發請求}
}
1.2 檢查 Kamailio 日志
- 啟動 Kamailio 并觀察日志輸出。
- 如果 ACC_JSON 模塊成功將數據推送到隊列,日志中會顯示類似以下內容:
INFO: acc_json: JSON accounting data pushed to MQueue
1.3 檢查消息隊列
- ACC_JSON 模塊使用 Kamailio 的消息隊列(MQueue)來存儲 JSON 數據。
- 默認情況下,消息隊列的數據會存儲在 Kamailio 的共享內存中。
- 你可以使用 Kamailio 的 MI(Management Interface) 或 RPC(Remote Procedure Call) 命令來檢查隊列狀態。
2. 從隊列中取值
2.1 使用 MI 命令
Kamailio 提供了 MI 命令來管理消息隊列。以下是一些常用的 MI 命令:
2.1.1 檢查隊列狀態
kamcmd mq.stats
- 輸出示例:
{"queues": {"acc_json_queue": {"size": 10, # 隊列中當前的消息數量"max_size": 1000, # 隊列的最大容量"dropped": 0 # 丟棄的消息數量}} }
2.1.2 從隊列中讀取消息
kamcmd mq.read acc_json_queue
- 輸出示例:
{"messages": [{"method": "INVITE","from_tag": "abc123","to_tag": "xyz456","callid": "12345","sip_code": "200","sip_reason": "OK","time": "2025-02-01 12:34:56","ua": "SomeUserAgent/1.0","uuid": "12345"},...] }
2.2 使用 RPC 命令
Kamailio 也支持通過 RPC 命令管理消息隊列。以下是一些常用的 RPC 命令:
2.2.1 檢查隊列狀態
kamctl rpc mq.stats
- 輸出示例:
{"queues": {"acc_json_queue": {"size": 10,"max_size": 1000,"dropped": 0}} }
2.2.2 從隊列中讀取消息
kamctl rpc mq.read acc_json_queue
- 輸出示例:
{"messages": [{"method": "INVITE","from_tag": "abc123","to_tag": "xyz456","callid": "12345","sip_code": "200","sip_reason": "OK","time": "2025-02-01 12:34:56","ua": "SomeUserAgent/1.0","uuid": "12345"},...] }
3. 自定義隊列處理
如果默認的消息隊列功能無法滿足需求,可以通過以下方式自定義隊列處理:
3.1 使用事件路由
Kamailio 支持通過事件路由(Event Route)處理消息隊列中的數據。例如:
event_route[mq:acc_json_queue] {xlog("L_INFO", "Received JSON accounting data: $mqk($mqv)\n");# 在這里處理隊列中的數據
}
3.2 編寫外部腳本
可以編寫外部腳本(如 Python、Shell 腳本)通過 Kamailio 的 MI 或 RPC 接口定期從隊列中讀取數據,并進行進一步處理。
4. 調試和驗證
- 調試日志:啟用 Kamailio 的調試日志,觀察 ACC_JSON 模塊的行為:
kamailio -dd -E -e
- 隊列監控:使用 MI 或 RPC 命令定期檢查隊列狀態,確保數據被正確推送和讀取。
總結
- 通過 Kamailio 的 MI 或 RPC 命令可以確認 ACC_JSON 是否成功將數據推送到隊列,并從隊列中讀取數據。
- 如果需要更復雜的處理,可以使用事件路由或編寫外部腳本。
- 確保 Kamailio 配置正確,并定期監控隊列狀態以避免數據丟失。
要實現 ACC_JSON 模塊將計費信息推送到隊列,并由 Go 語言從隊列中取值并寫入數據庫,可以按照以下步驟操作:
1. Kamailio 配置
1.1 加載 ACC_JSON 模塊
在 Kamailio 配置文件中加載 ACC_JSON 模塊,并配置消息隊列:
loadmodule "acc_json.so"# 配置 ACC_JSON
modparam("acc_json", "log_flag", 1) # 啟用 JSON 記錄
modparam("acc_json", "log_extra", "ua=$hdr(User-Agent);uuid=$avp(i:123)") # 記錄額外信息# 配置消息隊列
modparam("mq", "mq_size", 1000) # 設置隊列大小
modparam("mq", "mq_name", "acc_json_queue") # 設置隊列名稱route {if (method == "INVITE") {setflag(1); # 設置計費標志t_relay(); # 轉發請求}
}
1.2 驗證數據推送
啟動 Kamailio 并驗證數據是否成功推送到隊列:
kamcmd mq.stats
- 如果隊列中有數據,說明 ACC_JSON 模塊已成功推送。
2. Go 語言實現
Go 語言程序需要從 Kamailio 的消息隊列中讀取數據,并將其寫入數據庫。以下是詳細實現思路和代碼示例。
2.1 實現思路
- 連接 Kamailio:通過 Kamailio 的 RPC 接口連接到消息隊列。
- 讀取隊列數據:定期從隊列中讀取 JSON 格式的計費信息。
- 解析 JSON 數據:將讀取的 JSON 數據解析為 Go 結構體。
- 寫入數據庫:將解析后的數據寫入數據庫(如 MySQL、PostgreSQL 等)。
2.2 代碼示例
2.2.1 安裝依賴
首先,安裝 Go 語言的相關依賴:
go get github.com/zero-os/gorpc # Kamailio RPC 客戶端
go get github.com/go-sql-driver/mysql # MySQL 驅動
2.2.2 Go 代碼實現
以下是一個完整的 Go 程序示例:
package mainimport ("database/sql""encoding/json""fmt""log""time""github.com/zero-os/gorpc"_ "github.com/go-sql-driver/mysql"
)// 定義計費信息結構體
type AccountingRecord struct {Method string `json:"method"`FromTag string `json:"from_tag"`ToTag string `json:"to_tag"`CallID string `json:"callid"`SipCode string `json:"sip_code"`SipReason string `json:"sip_reason"`Time string `json:"time"`UserAgent string `json:"ua"`UUID string `json:"uuid"`
}// 數據庫配置
const (dbDriver = "mysql"dbUser = "root"dbPass = "password"dbName = "kamailio_acc"
)func main() {// 連接 Kamailio RPCclient := gorpc.NewClient("tcp", "127.0.0.1:2049") // Kamailio RPC 地址defer client.Close()// 連接數據庫db, err := sql.Open(dbDriver, fmt.Sprintf("%s:%s@/%s", dbUser, dbPass, dbName))if err != nil {log.Fatalf("Failed to connect to database: %v", err)}defer db.Close()// 定期從隊列中讀取數據for {// 從隊列中讀取消息var result map[string]interface{}err := client.Call("mq.read", "acc_json_queue", &result)if err != nil {log.Printf("Failed to read from queue: %v", err)time.Sleep(5 * time.Second) // 等待 5 秒后重試continue}// 解析 JSON 數據messages, ok := result["messages"].([]interface{})if !ok {log.Println("No messages in queue")time.Sleep(5 * time.Second)continue}// 處理每條消息for _, msg := range messages {msgJSON, err := json.Marshal(msg)if err != nil {log.Printf("Failed to marshal message: %v", err)continue}var record AccountingRecordif err := json.Unmarshal(msgJSON, &record); err != nil {log.Printf("Failed to unmarshal message: %v", err)continue}// 將數據寫入數據庫if err := saveToDatabase(db, record); err != nil {log.Printf("Failed to save record to database: %v", err)}}time.Sleep(1 * time.Second) // 每隔 1 秒檢查一次隊列}
}// 將數據寫入數據庫
func saveToDatabase(db *sql.DB, record AccountingRecord) error {query := `INSERT INTO acc_records (method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`_, err := db.Exec(query,record.Method,record.FromTag,record.ToTag,record.CallID,record.SipCode,record.SipReason,record.Time,record.UserAgent,record.UUID,)return err
}
3. 數據庫表設計
在 MySQL 中創建用于存儲計費信息的表:
CREATE TABLE acc_records (id INT AUTO_INCREMENT PRIMARY KEY,method VARCHAR(50) NOT NULL,from_tag VARCHAR(100) NOT NULL,to_tag VARCHAR(100) NOT NULL,callid VARCHAR(100) NOT NULL,sip_code VARCHAR(10) NOT NULL,sip_reason VARCHAR(100) NOT NULL,time DATETIME NOT NULL,ua VARCHAR(255),uuid VARCHAR(100)
);
4. 運行程序
- 啟動 Kamailio,確保 ACC_JSON 模塊正常運行。
- 運行 Go 程序:
go run main.go
- 程序會定期從 Kamailio 的消息隊列中讀取數據,并將其寫入數據庫。
5. 驗證結果
- 檢查數據庫中的
acc_records
表,確認數據是否成功寫入。 - 觀察 Go 程序的日志輸出,確保沒有錯誤。
總結
通過以上步驟,可以實現 ACC_JSON 模塊將計費信息推送到隊列,并由 Go 語言程序從隊列中讀取數據并寫入數據庫。這種方法適用于需要將 Kamailio 的計費信息與外部系統集成的場景。
Gin框架代碼實際
如果使用 Gin 框架,可以將 Go 程序組織成一個更結構化的 Web 服務,同時保留從 Kamailio 消息隊列中讀取數據并寫入數據庫的功能。以下是重新整理的代碼示例:
1. 項目結構
kamailio-acc/
├── main.go
├── handlers/
│ └── acc_handler.go
├── models/
│ └── acc_model.go
├── services/
│ └── queue_service.go
├── config/
│ └── config.go
└── go.mod
2. 代碼實現
2.1 main.go
主程序入口,初始化 Gin 框架并啟動服務。
package mainimport ("kamailio-acc/config""kamailio-acc/handlers""kamailio-acc/services""log""time""github.com/gin-gonic/gin"
)func main() {// 加載配置cfg, err := config.LoadConfig()if err != nil {log.Fatalf("Failed to load config: %v", err)}// 初始化數據庫db, err := config.InitDB(cfg)if err != nil {log.Fatalf("Failed to initialize database: %v", err)}// 初始化 Kamailio RPC 客戶端client := services.NewKamailioClient(cfg.KamailioRPCAddr)defer client.Close()// 啟動隊列監聽服務go services.StartQueueListener(client, db)// 初始化 Gin 框架r := gin.Default()// 注冊路由handlers.RegisterRoutes(r, db)// 啟動 Web 服務if err := r.Run(cfg.ServerAddr); err != nil {log.Fatalf("Failed to start server: %v", err)}
}
2.2 config/config.go
配置文件加載和數據庫初始化。
package configimport ("database/sql""fmt""log"_ "github.com/go-sql-driver/mysql""github.com/spf13/viper"
)type Config struct {ServerAddr string `mapstructure:"SERVER_ADDR"`KamailioRPCAddr string `mapstructure:"KAMAILIO_RPC_ADDR"`DBDriver string `mapstructure:"DB_DRIVER"`DBUser string `mapstructure:"DB_USER"`DBPassword string `mapstructure:"DB_PASSWORD"`DBName string `mapstructure:"DB_NAME"`
}func LoadConfig() (*Config, error) {viper.SetConfigFile(".env")if err := viper.ReadInConfig(); err != nil {return nil, fmt.Errorf("failed to read config file: %v", err)}var cfg Configif err := viper.Unmarshal(&cfg); err != nil {return nil, fmt.Errorf("failed to unmarshal config: %v", err)}return &cfg, nil
}func InitDB(cfg *Config) (*sql.DB, error) {dsn := fmt.Sprintf("%s:%s@/%s", cfg.DBUser, cfg.DBPassword, cfg.DBName)db, err := sql.Open(cfg.DBDriver, dsn)if err != nil {return nil, fmt.Errorf("failed to connect to database: %v", err)}if err := db.Ping(); err != nil {return nil, fmt.Errorf("failed to ping database: %v", err)}log.Println("Connected to database")return db, nil
}
2.3 models/acc_model.go
定義數據模型和數據庫操作方法。
package modelsimport ("database/sql""log"
)type AccountingRecord struct {Method string `json:"method"`FromTag string `json:"from_tag"`ToTag string `json:"to_tag"`CallID string `json:"callid"`SipCode string `json:"sip_code"`SipReason string `json:"sip_reason"`Time string `json:"time"`UserAgent string `json:"ua"`UUID string `json:"uuid"`
}func SaveRecord(db *sql.DB, record AccountingRecord) error {query := `INSERT INTO acc_records (method, from_tag, to_tag, callid, sip_code, sip_reason, time, ua, uuid) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`_, err := db.Exec(query,record.Method,record.FromTag,record.ToTag,record.CallID,record.SipCode,record.SipReason,record.Time,record.UserAgent,record.UUID,)if err != nil {log.Printf("Failed to save record: %v", err)return err}log.Printf("Saved record: %+v", record)return nil
}
2.4 services/queue_service.go
從 Kamailio 消息隊列中讀取數據的服務。
package servicesimport ("encoding/json""kamailio-acc/models""log""time""github.com/zero-os/gorpc"
)type KamailioClient struct {client *gorpc.Client
}func NewKamailioClient(addr string) *KamailioClient {return &KamailioClient{client: gorpc.NewClient("tcp", addr),}
}func (kc *KamailioClient) Close() {kc.client.Close()
}func (kc *KamailioClient) ReadQueue(queueName string) ([]models.AccountingRecord, error) {var result map[string]interface{}if err := kc.client.Call("mq.read", queueName, &result); err != nil {return nil, err}messages, ok := result["messages"].([]interface{})if !ok {return nil, nil}var records []models.AccountingRecordfor _, msg := range messages {msgJSON, err := json.Marshal(msg)if err != nil {log.Printf("Failed to marshal message: %v", err)continue}var record models.AccountingRecordif err := json.Unmarshal(msgJSON, &record); err != nil {log.Printf("Failed to unmarshal message: %v", err)continue}records = append(records, record)}return records, nil
}func StartQueueListener(client *KamailioClient, db *sql.DB) {for {records, err := client.ReadQueue("acc_json_queue")if err != nil {log.Printf("Failed to read from queue: %v", err)time.Sleep(5 * time.Second)continue}for _, record := range records {if err := models.SaveRecord(db, record); err != nil {log.Printf("Failed to save record: %v", err)}}time.Sleep(1 * time.Second)}
}
2.5 handlers/acc_handler.go
定義 HTTP 路由和處理程序。
package handlersimport ("database/sql""net/http""github.com/gin-gonic/gin""kamailio-acc/models"
)func RegisterRoutes(r *gin.Engine, db *sql.DB) {r.GET("/records", func(c *gin.Context) {// 示例:從數據庫中獲取所有記錄var records []models.AccountingRecordrows, err := db.Query("SELECT * FROM acc_records")if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})return}defer rows.Close()for rows.Next() {var record models.AccountingRecordif err := rows.Scan(&record.Method,&record.FromTag,&record.ToTag,&record.CallID,&record.SipCode,&record.SipReason,&record.Time,&record.UserAgent,&record.UUID,); err != nil {c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})return}records = append(records, record)}c.JSON(http.StatusOK, records)})
}
3. 配置文件 .env
SERVER_ADDR=:8080
KAMAILIO_RPC_ADDR=127.0.0.1:2049
DB_DRIVER=mysql
DB_USER=root
DB_PASSWORD=password
DB_NAME=kamailio_acc
4. 運行程序
- 啟動 Kamailio,確保 ACC_JSON 模塊正常運行。
- 運行 Go 程序:
go run main.go
- 訪問
http://localhost:8080/records
,查看數據庫中的計費記錄。
總結
通過 Gin 框架,將 Kamailio 的計費信息處理邏輯組織成一個結構化的 Web 服務。Go 程序從 Kamailio 的消息隊列中讀取數據并寫入數據庫,同時提供 HTTP 接口供外部系統查詢數據。