在我們使用多線程編程時,很多時候需要根據業務場景設計一套業務功能。其實,在多線程編程中,本身就存在很多成熟的功能設計模式,學好它們,用好它們,那就是如虎添翼了。今天我就帶你了解幾種并發編程中常用的設計模式。
1、線程上下文設計模式
線程上下文是指貫穿線程整個生命周期的對象中的一些全局信息。例如,我們比較熟悉的 Spring 中的 ApplicationContext 就是一個關于上下文的類,它在整個系統的生命周期中保存了配置信息、用戶信息以及注冊的 bean 等上下文信息。
這樣的解釋可能有點抽象,我們不妨通過一個具體的案例,來看看到底在什么的場景下才需要上下文呢?
在執行一個比較長的請求任務時,這個請求可能會經歷很多層的方法調用,假設我們需要將最開始的方法的中間結果傳遞到末尾的方法中進行計算,一個簡單的實現方式就是在每個函數中新增這個中間結果的參數,依次傳遞下去。代碼如下:
public class ContextTest {// 上下文類public class Context {private String name;private long idpublic long getId() {return id;}public void setId(long id) {this.id = id;}public String getName() {return this.name;}public void setName(String name) {this.name = name;}}// 設置上下文名字public class QueryNameAction {public void execute(Context context) {try {Thread.sleep(1000L);String name = Thread.currentThread().getName();context.setName(name);} catch (InterruptedException e) {e.printStackTrace();}}}// 設置上下文 IDpublic class QueryIdAction {public void execute(Context context) {try {Thread.sleep(1000L);long id = Thread.currentThread().getId();context.setId(id);} catch (InterruptedException e) {e.printStackTrace();}}}// 執行方法public class ExecutionTask implements Runnable {private QueryNameAction queryNameAction = new QueryNameAction();private QueryIdAction queryIdAction = new QueryIdAction();@Overridepublic void run() {final Context context = new Context();queryNameAction.execute(context);System.out.println("The name query successful");queryIdAction.execute(context);System.out.println("The id query successful");System.out.println("The Name is " + context.getName() + " and id " + context.getId());}}public static void main(String[] args) {IntStream.range(1, 5).forEach(i -> new Thread(new ContextTest().new ExecutionTask()).start());}
}
執行結果:
The name query successful
The name query successful
The name query successful
The name query successful
The id query successful
The id query successful
The id query successful
The id query successful
The Name is Thread-1 and id 11
The Name is Thread-2 and id 12
The Name is Thread-3 and id 13
The Name is Thread-0 and id 10
然而這種方式太笨拙了,每次調用方法時,都需要傳入 Context 作為參數,而且影響一些中間公共方法的封裝。
那能不能設置一個全局變量呢?如果是在多線程情況下,需要考慮線程安全,這樣的話就又涉及到了鎖競爭。
除了以上這些方法,其實我們還可以使用 ThreadLocal 實現上下文。ThreadLocal 是線程本地變量,可以實現多線程的數據隔離。ThreadLocal 為每一個使用該變量的線程都提供一份獨立的副本,線程間的數據是隔離的,每一個線程只能訪問各自內部的副本變量。
ThreadLocal 中有三個常用的方法:set、get、initialValue,我們可以通過以下一個簡單的例子來看看 ThreadLocal 的使用:
private void testThreadLocal() {Thread t = new Thread() {ThreadLocal<String> mStringThreadLocal = new ThreadLocal<String>();@Overridepublic void run() {super.run();mStringThreadLocal.set("test");mStringThreadLocal.get();}};t.start();
}
接下來,我們使用 ThreadLocal 來重新實現最開始的上下文設計。你會發現,我們在兩個方法中并沒有通過變量來傳遞上下文,只是通過 ThreadLocal 獲取了當前線程的上下文信息:
public class ContextTest {// 上下文類public static class Context {private String name;private long id;public long getId() {return id;}public void setId(long id) {this.id = id;}public String getName() {return this.name;}public void setName(String name) {this.name = name;}}// 復制上下文到 ThreadLocal 中public final static class ActionContext {private static final ThreadLocal<Context> threadLocal = new ThreadLocal<Context>() {@Overrideprotected Context initialValue() {return new Context();}};public static ActionContext getActionContext() {return ContextHolder.actionContext;}public Context getContext() {return threadLocal.get();}// 獲取 ActionContext 單例public static class ContextHolder {private final static ActionContext actionContext = new ActionContext();}}// 設置上下文名字public class QueryNameAction {public void execute() {try {Thread.sleep(1000L);String name = Thread.currentThread().getName();ActionContext.getActionContext().getContext().setName(name);} catch (InterruptedException e) {e.printStackTrace();}}}// 設置上下文 IDpublic class QueryIdAction {public void execute() {try {Thread.sleep(1000L);long id = Thread.currentThread().getId();ActionContext.getActionContext().getContext().setId(id);} catch (InterruptedException e) {e.printStackTrace();}}}// 執行方法public class ExecutionTask implements Runnable {private QueryNameAction queryNameAction = new QueryNameAction();private QueryIdAction queryIdAction = new QueryIdAction();@Overridepublic void run() {queryNameAction.execute();// 設置線程名System.out.println("The name query successful");queryIdAction.execute();// 設置線程 IDSystem.out.println("The id query successful");System.out.println("The Name is " + ActionContext.getActionContext().getContext().getName() + " and id " + ActionContext.getActionContext().getContext().getId())}}public static void main(String[] args) {IntStream.range(1, 5).forEach(i -> new Thread(new ContextTest().new ExecutionTask()).start());}
}
運行結果:
The name query successful
The name query successful
The name query successful
The name query successful
The id query successful
The id query successful
The id query successful
The id query successful
The Name is Thread-2 and id 12
The Name is Thread-0 and id 10
The Name is Thread-1 and id 11
The Name is Thread-3 and id 13
2、Thread-Per-Message 設計模式
Thread-Per-Message 設計模式翻譯過來的意思就是每個消息一個線程的意思。例如,我們在處理 Socket 通信的時候,通常是一個線程處理事件監聽以及 I/O 讀寫,如果 I/O 讀寫操作非常耗時,這個時候便會影響到事件監聽處理事件。
這個時候 Thread-Per-Message 模式就可以很好地解決這個問題,一個線程監聽 I/O 事件,每當監聽到一個 I/O 事件,則交給另一個處理線程執行 I/O 操作。下面,我們還是通過一個例子來學習下該設計模式的實現。
//IO 處理
public class ServerHandler implements Runnable{private Socket socket;public ServerHandler(Socket socket) {this.socket = socket;}public void run() {BufferedReader in = null;PrintWriter out = null;String msg = null;try {in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(),true);while ((msg = in.readLine()) != null && msg.length()!=0) {// 當連接成功后在此等待接收消息(掛起,進入阻塞狀態)System.out.println("server received : " + msg);out.print("received~\n");out.flush();}} catch (Exception e) {e.printStackTrace();} finally {try {in.close();} catch (IOException e) {e.printStackTrace();}try {out.close();} catch (Exception e) {e.printStackTrace();}try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
//Socket 啟動服務
public class Server {private static int DEFAULT_PORT = 12345;private static ServerSocket server;public static void start() throws IOException {start(DEFAULT_PORT);}public static void start(int port) throws IOException {if (server != null) {return;}try {// 啟動服務server = new ServerSocket(port);// 通過無線循環監聽客戶端連接while (true) {Socket socket = server.accept();// 當有新的客戶端接入時,會執行下面的代碼long start = System.currentTimeMillis();new Thread(new ServerHandler(socket)).start();long end = System.currentTimeMillis();System.out.println("Spend time is " + (end - start));}} finally {if (server != null) {System.out.println(" 服務器已關閉。");server.close();}}}public static void main(String[] args) throws InterruptedException{// 運行服務端new Thread(new Runnable() {public void run() {try {Server.start();} catch (IOException e) {e.printStackTrace();}}}).start();}
}
以上,我們是完成了一個使用 Thread-Per-Message 設計模式實現的 Socket 服務端的代碼。但這里是有一個問題的,你發現了嗎?
使用這種設計模式,如果遇到大的高并發,就會出現嚴重的性能問題。如果針對每個 I/O 請求都創建一個線程來處理,在有大量請求同時進來時,就會創建大量線程,而此時 JVM 有可能會因為無法處理這么多線程,而出現內存溢出的問題。
退一步講,即使是不會有大量線程的場景,每次請求過來也都需要創建和銷毀線程,這對系統來說,也是一筆不小的性能開銷。
面對這種情況,我們可以使用線程池來代替線程的創建和銷毀,這樣就可以避免創建大量線程而帶來的性能問題,是一種很好的調優方法。
3、Worker-Thread 設計模式
這里的 Worker 是工人的意思,代表在 Worker Thread 設計模式中,會有一些工人(線程)不斷輪流處理過來的工作,當沒有工作時,工人則會處于等待狀態,直到有新的工作進來。除了工人角色,Worker Thread 設計模式中還包括了流水線和產品。
這種設計模式相比 Thread-Per-Message 設計模式,可以減少頻繁創建、銷毀線程所帶來的性能開銷,還有無限制地創建線程所帶來的內存溢出風險。
我們可以假設一個場景來看下該模式的實現,通過 Worker Thread 設計模式來完成一個物流分揀的作業。
假設一個物流倉庫的物流分揀流水線上有 8 個機器人,它們不斷從流水線上獲取包裹并對其進行包裝,送其上車。當倉庫中的商品被打包好后,會投放到物流分揀流水線上,而不是直接交給機器人,機器人會再從流水線中隨機分揀包裹。代碼如下:
// 包裹類
public class Package {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}public void execute() {System.out.println(Thread.currentThread().getName()+" executed "+this);}
}
// 流水線
public class PackageChannel {private final static int MAX_PACKAGE_NUM = 100;private final Package[] packageQueue;private final Worker[] workerPool;private int head;private int tail;private int count;public PackageChannel(int workers) {this.packageQueue = new Package[MAX_PACKAGE_NUM];this.head = 0;this.tail = 0;this.count = 0;this.workerPool = new Worker[workers];this.init();}private void init() {for (int i = 0; i < workerPool.length; i++) {workerPool[i] = new Worker("Worker-" + i, this);}}/*** push switch to start all of worker to work*/public void startWorker() {Arrays.asList(workerPool).forEach(Worker::start);}public synchronized void put(Package packagereq) {while (count >= packageQueue.length) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}this.packageQueue[tail] = packagereq;this.tail = (tail + 1) % packageQueue.length;this.count++;this.notifyAll();}public synchronized Package take() {while (count <= 0) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}Package request = this.packageQueue[head];this.head = (this.head + 1) % this.packageQueue.length;this.count--;this.notifyAll();return request;}}
// 機器人
public class Worker extends Thread{private static final Random random = new Random(System.currentTimeMillis());private final PackageChannel channel;public Worker(String name, PackageChannel channel) {super(name);this.channel = channel;}@Overridepublic void run() {while (true) {channel.take().execute();try {Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}
public class Test {public static void main(String[] args) {// 新建 8 個工人final PackageChannel channel = new PackageChannel(8);// 開始工作channel.startWorker();// 為流水線添加包裹for(int i=0; i<100; i++) {Package packagereq = new Package();packagereq.setAddress("test");packagereq.setName("test");channel.put(packagereq);}}
}
我們可以看到,這里有 8 個工人在不斷地分揀倉庫中已經包裝好的商品。
4、總結
平時,如果需要傳遞或隔離一些線程變量時,我們可以考慮使用上下文設計模式。在數據庫讀寫分離的業務場景中,則經常會用到 ThreadLocal 實現動態切換數據源操作。但在使用 ThreadLocal 時,我們需要注意內存泄漏問題,在之前的[第 25 講]中,我們已經討論過這個問題了。
當主線程處理每次請求都非常耗時時,就可能出現阻塞問題,這時候我們可以考慮將主線程業務分工到新的業務線程中,從而提高系統的并行處理能力。而 Thread-Per-Message 設計模式以及 Worker-Thread 設計模式則都是通過多線程分工來提高系統并行處理能力的設計模式。
5、思考題
除了以上這些多線程的設計模式,平時你還使用過其它的設計模式來優化多線程業務嗎?