跟上一篇文章比較,這次改進了之前的代碼,使用了Lock Condition 和并發集合.代碼量減了一些,并且更加容易讀了.
這篇代碼是上一篇的改進版,邏輯在前篇有說明,以防大家看不到,我再重現貼一遍.后續會使用高階的線程工具再次改進,以求代碼更簡單.代碼的邏輯:
1)SProducer不停的產生number到queue中.
2)3個carrier不停的取出queue中的number.
3)如果queue中存在10個剩余number時,SProducer會停下來等Carrier消費一些后再生產.
4)如果Carrier發現queue中沒有number時,會等待.
5)如果Carrier取出的數字末尾為4, 則會挑起罷工事件.
6)Carrier罷工會引發一個Negotiation線程進行談判.
7)罷工階段SProducer和所有Carrier均停工.
8)Negotiation如果發現取出的number首位為3或者7,將引發談判失敗.
9)如果談判成功,則恢復工作,如果談判失敗,則破產,所有線程均退出.
注意:使用lock的時候一定要注意, lock()和unlock()方法一定要成對出現. 最好放到try{}finally{}中,這樣,unlock()方法必會調到.倘若沒有使用unlock()就return了,會導致線程死鎖.
package concurrency;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Producer extends Thread {
private final static ArrayBlockingQueue numbers = new ArrayBlockingQueue(10);
private final static ArrayList threads = new ArrayList();
private volatile boolean negotiating = false;
private ReentrantLock negotiationLock = new ReentrantLock();
private Condition negotiationCondition = negotiationLock.newCondition();
private class Negotiation implements Runnable {
private String number;
private Negotiation(String number) {
this.number = number;
}
public void run() {
try {
System.out.println("Start negotiation...");
sleep(5000);
if (number.startsWith("7") || number.startsWith("3")) {
System.out.println("Negotiation broken.");
for (Thread t : threads) {
t.interrupt();
}
System.out.println("Negotiation broken post handle.");
return;
}
System.out.println("Negotiation succeeds.");
} catch (InterruptedException e) {
System.out.println("Middle man is killed.");
}
negotiationLock.lock();
negotiating = false;
negotiationCondition.signalAll();
negotiationLock.unlock();
}
}
private class Carrier implements Runnable {
private String name;
private Carrier(String name) {
this.name = name;
}
public void run() {
while(true) {
try{
negotiationLock.lock();
while(negotiating) {
try {
System.out.println("Carrier ["+name+"] join stricks.");
negotiationCondition.await();
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Carrier [" + name + "] retires.");
return;
}
}
} finally {
negotiationLock.unlock();
}
String number;
try {
number = numbers.take();
System.out.println("Carrier [" + name + "] carries "+ number +" out of List;");
} catch (InterruptedException e1) {
System.out.println("Negotiation fails. Carrier [" + name + "] retires.");
return;
}
if (number.endsWith("4")) {
try {
negotiationLock.lock();
while (negotiating) {
try {
negotiationCondition.await();
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Carrier [" + name + "] retires.");
return;
}
}
negotiating = true;
System.out.println("Stricks happen on number:"+number);
new Thread(new Negotiation(number)).start();
} finally {
negotiationLock.unlock();
}
}
}
}
}
public void run() {
Thread a = new Thread(new Carrier("a"));
Thread b = new Thread(new Carrier("b"));
Thread c = new Thread(new Carrier("c"));
threads.add(this);
threads.add(a);
threads.add(b);
threads.add(c);
a.start();
b.start();
c.start();
this.produceNumbers();
}
private void produceNumbers() {
while (true) {
while(negotiating) {
negotiationLock.lock();
try {
System.out.println("Stricking... Producer waiting for negotiation result.");
negotiationCondition.await();
System.out.println("Negotiation succeeds. Producer happy.");
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Producer breaks up.");
return;
} finally {
negotiationLock.unlock();
}
}
String number = ""+new java.util.Random().nextInt(47);
try {
numbers.put(number);
System.out.println("Produce number " + number + " into List;");
} catch (InterruptedException e) {
System.out.println("Negotiation fails. Producer breaks up.");
return;
}
}
}
public static void main(String[] args) {
new Producer().start();
}
}