public class Send {
public static final String routingKey = "wuqidi_task_durable";
/*工作隊列 也叫任務隊列 目的是將任務發送到隊列中 由工作者進行處理 在后臺的多個工作者中 任務是共享的*/
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//消息持久化 :在聲明隊列過程中 第二個參數 是設置持久化 這是為了 當rabbitmq崩潰 關閉
//消息和隊列是否刪除 也就是再次開機會不會保留原來信息和隊列
//這里設置為true是保證隊列不會消失
channel.queueDeclare(routingKey, true, false, false, null);
String con = getTask();
System.out.println(con);
//這里設置的是保證消息不會消失 但也不是完全保證 可能會在內存中
channel.basicPublish("", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, con.getBytes());
channel.close();
connection.close();
}
/*在這里隨機生成 task 個數*/
private static String getTask(){
Random r = new Random();
int len = r.nextInt(10);
StringBuffer sb = new StringBuffer();
for(int i=0; i< 15; i++){
sb.append("task ");
}
if(sb.length()<1){
sb.append("task");
}
return sb.substring(0, sb.length()-1).toString();
}
}
?
?
/*循環隊列 使用工作隊列的一個優點就是可以啟動多個接收端 就是工作者,它可以并行工作。采用輪訓的方式進行分配任務。*/
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(Send.routingKey, true, false, false, null);
//公平調度:
//在同一時刻 發給同一工作者不準超過1條任務,直到處理完消息作出相應。可以先發送給空閑的工作者。
channel.basicQos(1);
Consumer callback = new Consumer() {
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
}
@Override
public void handleRecoverOk(String consumerTag) {
}
@Override
public void handleDelivery(String arg0, Envelope arg1,
BasicProperties arg2, byte[] arg3) throws IOException {
String tasks = new String(arg3, "utf-8");
String[] taskss = tasks.split(" ");
for(String tem : taskss){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tem);
}
//這里執行消息確認 ,為的是待任務執行完畢后才能執行刪除操作,如果任務執行過程中
//任務執行失敗了 rabbitmq還可以鏈接其他工作者 進行工作
channel.basicAck(arg1.getDeliveryTag(), false);
}
@Override
public void handleConsumeOk(String consumerTag) {
}
@Override
public void handleCancelOk(String consumerTag) {
}
@Override
public void handleCancel(String consumerTag) throws IOException {
}
};
channel.basicConsume(Send.routingKey, false, callback);
channel.close();
connection.close();
}
}
在實際項目中目前我還沒有用到。。。