2019獨角獸企業重金招聘Python工程師標準>>>
一、Routing(路由) (using the Java client)
在前面的學習中,構建了一個簡單的日志記錄系統,能夠廣播所有的日志給多個接收者,在該部分學習中,將添加一個新的特點,就是可以只訂閱一個特定的消息源,也就是說能夠直接把關鍵的錯誤日志消息發送到日志文件保存起來,不重要的日志信息文件不保存在磁盤中,但是仍然能夠在控制臺輸出,那么這便是我們這部分要學習的消息的路由分發機制。
二、Bindings(綁定)
在前面的學習中已經創建了綁定(bindings),代碼如下:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一個綁定就是一個關于exchange和queue的關系,它可以簡單的被理解為:隊列是從這個exchange中獲取消息的。
綁定可以采取一個額外的routingKey的參數,為了避免與basicPublish參數沖突,稱之為一個綁定Key,這是如何創建一個帶routingKey的綁定的關鍵。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
一個綁定Key依賴于exchange的類型,像之前使用fanout類型的exchange,完全忽略了該綁定key的值。
前面實現的日志記錄系統中廣播所有的消息給所有的消費者,現在對其進行擴展,允許根據信息的嚴重程度來對消息進行過濾,比如,希望一個程序寫入到磁盤的日志消息只接收錯誤的消息,而不是浪費磁盤保存所有的日志消息。
為了實現這個目標,使用一個fanout類型的exchange,顯然是不能夠滿足這樣的需求的,因為它只能廣播所有的消息。
為此將使用一個direct exchange來代替fanout exchange,direct exchange使用簡單的路由算法,將消息通過綁定的Key匹配將要到達的隊列。
從上面的結構圖中可以看出direct exchange X綁定著兩個queue(Q1,Q2),第一個queue綁定的routingKey為orange,第二個有兩個routingKey被綁定,一個routingKey為black,另外一個routingKey為green.
說明:發送帶有routingKey為orange的消息到X(exchange)中,X將該消息路由到Q1中,發送帶有routingKey為black和green的消息都將被路由到Q2中,其他所有消息將會被丟棄。
四、Multiple bindings(多綁定)
多個隊列綁定相同的routingKey是允許的,在上述實例中,可以把X和Q1用routingKey:black綁定起來,這種情況下,direct exchange將像fanout類型的exchange一樣會將消息廣播都到所有匹配的queues中,即一個routingKey為black的消息將會被發送到Q1和Q2中。
五、Emitting logs(發送的日志)
使用direct代替fanout類型的exchange,發送消息到一個direct exchange中,將根據消息的重要程度作為routingKey,這樣接收程序能夠選擇它想要接收的日志信息,首先必須先創建一個exchange.
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
其次,發送一條信息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
為了簡化程序,將severity設定為info、warning、error三種類型中的一種。
String queueName = channel.queueDeclare().getQueue();for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
EmitLogDirect.java代碼清單如下:
public?class?EmitLogDirect?{?? ?? ????private?static?final?String?EXCHANGE_NAME?=?"direct_logs";?? ?? ????public?static?void?main(String[]?argv)?? ??????????????????throws?java.io.IOException?{?? ?? ????????ConnectionFactory?factory?=?new?ConnectionFactory();?? ????????factory.setHost("localhost");?? ????????Connection?connection?=?factory.newConnection();?? ????????Channel?channel?=?connection.createChannel();?? ?? ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");?? ?? ????????String?severity?=?getSeverity(argv);?? ????????String?message?=?getMessage(argv);?? ?? ????????channel.basicPublish(EXCHANGE_NAME,?severity,?null,?message.getBytes());?? ????????System.out.println("?[x]?Sent?'"?+?severity?+?"':'"?+?message?+?"'");?? ?? ????????channel.close();?? ????????connection.close();?? ????}?? ????//..?? }??
ReceiveLogsDirect代碼 清單如下:
public?class?ReceiveLogsDirect?{?? ?? ????private?static?final?String?EXCHANGE_NAME?=?"direct_logs";?? ?? ????public?static?void?main(String[]?argv)?? ??????????????????throws?java.io.IOException,?? ??????????????????java.lang.InterruptedException?{?? ?? ????????ConnectionFactory?factory?=?new?ConnectionFactory();?? ????????factory.setHost("localhost");?? ????????Connection?connection?=?factory.newConnection();?? ????????Channel?channel?=?connection.createChannel();?? ?? ????????channel.exchangeDeclare(EXCHANGE_NAME,?"direct");?? ????????String?queueName?=?channel.queueDeclare().getQueue();?? ?? ????????if?(argv.length?<?1){?? ????????????System.err.println("Usage:?ReceiveLogsDirect?[info]?[warning]?[error]");?? ????????????System.exit(1);?? ????????}?? ?? ????????for(String?severity?:?argv){?? ????????????channel.queueBind(queueName,?EXCHANGE_NAME,?severity);?? ????????}?? ?? ????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");?? ?? ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);?? ????????channel.basicConsume(queueName,?true,?consumer);?? ?? ????????while?(true)?{?? ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();?? ????????????String?message?=?new?String(delivery.getBody());?? ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();?? ?? ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message?+?"'");?? ????????}?? ????}?? }??
編譯和往常一樣(參見以往教程用于編譯和類路徑的建議)。現在,為了方便起見,我們將使用一個環境變量$CP(%CP%在Windows上)的運行時類路徑的例子。
如果你只想保存 “警告”和“錯誤”(而不是“信息”)日志消息到一個文件,打開一個控制臺和type:
[*] Waiting for logs. To exit press CTRL+C
[x] Sent 'error':'Run. Run. Or it will explode.'
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
$ java -cp $CP ReceiveLogsDirect info warning error
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."