設計一個線程池,要求如下:
- 隊列最大容量為10(內存隊列)。
- 當隊列滿了之后,拒絕策略將新的任務寫入數據庫。
- 從隊列中取任務時,若該隊列為空,能夠從數據庫中加載之前被拒絕的任務
模擬數據庫 (TaskDatabase)
- 使用
LinkedBlockingQueue
模擬數據庫存儲 - 線程安全操作(ReentrantLock保證)
- 實際應用需替換為JDBC/MyBatis等持久化方案
static class TaskDatabase {private final BlockingQueue<Runnable> dbQueue = new LinkedBlockingQueue<>();private final ReentrantLock dbLock = new ReentrantLock();public void saveTask(Runnable task) {dbLock.lock();try {System.out.println("[DB] 存儲被拒絕任務到數據庫, 當前數據庫任務數: " + (dbQueue.size() + 1));dbQueue.put(task);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {dbLock.unlock();}}public Runnable loadTask() {dbLock.lock();try {Runnable task = dbQueue.poll();if (task != null) {System.out.println("[DB] 從數據庫加載任務, 剩余數據庫任務: " + dbQueue.size());}return task;} finally {dbLock.unlock();}}}
Runable任務序列化:
public interface SerializableTask extends Serializable {void execute();
}class CustomTask {public static void main(String[] args) {SerializableTask task = () -> System.out.println("Hello, World!");String serializedTask = serializedTask(task);SerializableTask deserialization = deserialization(serializedTask);deserialization.execute();}static String serializedTask(SerializableTask runnable){try(ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos)) {// 序列化任務對象oos.writeObject(runnable);return Base64.getEncoder().encodeToString(baos.toByteArray());}catch (Exception e){throw new RuntimeException("無法序列化");}}static SerializableTask deserialization(String serializedTask){// 反序列化任務byte[] data = Base64.getDecoder().decode(serializedTask);try (ByteArrayInputStream bais = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(bais)) {SerializableTask task = (SerializableTask) ois.readObject();return task;}catch (Exception e){throw new RuntimeException("無法反序列化");}}
}
自定義阻塞隊列 (DatabaseBackedBlockingQueue)
- 繼承
LinkedBlockingQueue
并重寫關鍵方法 - take()方法邏輯 :
- 優先從內存隊列取任務
- 隊列為空時從數據庫加載
- 數據庫也為空時阻塞等待新任務
- offer()方法 :隊列未滿時接受,滿時返回false觸發拒絕策略
// 自定義阻塞隊列(支持從數據庫加載任務)static class DatabaseBackedBlockingQueue extends LinkedBlockingQueue<Runnable> {private final TaskDatabase database;public DatabaseBackedBlockingQueue(int maxLocalCapacity, TaskDatabase database) {super(maxLocalCapacity);this.database = database;}@Overridepublic Runnable take() throws InterruptedException {// 1. 優先檢查本地隊列Runnable task = super.poll();if (task != null) return task;// 2. 本地隊列為空時嘗試從數據庫加載while (true) {Runnable dbTask = database.loadTask();if (dbTask != null) return dbTask;// 3. 數據庫為空則等待新任務if (isEmpty()) {task = super.take(); // 阻塞直到有新任務if (task != null) return task;}}}}
拒絕策略 (DatabaseRejectionHandler)
- 實現
RejectedExecutionHandler
接口 - 當內存隊列滿時將任務存入數據庫
- 任務存入后會被后續的take()方法加載執行
// 自定義拒絕策略(保存到數據庫)static class DatabaseRejectionHandler implements RejectedExecutionHandler {private final TaskDatabase database;public DatabaseRejectionHandler(TaskDatabase database) {this.database = database;}@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println("[Rejected] 線程池隊列已滿,任務轉入數據庫");database.saveTask(task);}}
資源管理 :
- 核心/最大線程數根據容器資源動態調整
- 線程工廠添加命名前綴(便于監控)
- 保活時間控制閑置線程銷毀
// 5. 監控線程池狀態
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {System.out.println("\n[監控] 活躍線程: " + executor.getActiveCount()+ " | 隊列大小: " + executor.getQueue().size()+ " | 總完成任務: " + executor.getCompletedTaskCount());
}, 1, 2, TimeUnit.SECONDS);
自定義線程工廠
// 自定義線程工廠static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger counter = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namePrefix + "-" + counter.getAndIncrement());}}
測試
public static void main(String[] args) throws InterruptedException {// 1. 創建任務數據庫TaskDatabase taskDatabase = new TaskDatabase();// 2. 配置線程池int queueCapacity = 10; // 隊列容量// 3. 創建自定義隊列和拒絕策略DatabaseBackedBlockingQueue workQueue =new DatabaseBackedBlockingQueue(queueCapacity, taskDatabase);ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,0,TimeUnit.SECONDS,workQueue,new NamedThreadFactory("custom-pool"),new DatabaseRejectionHandler(taskDatabase));// 4. 模擬任務提交int totalTasks = 50;System.out.println("開始提交任務, 總數: " + totalTasks);CountDownLatch countDownLatch = new CountDownLatch(50);for (int i = 1; i <= totalTasks; i++) {final int taskId = i;executor.execute(() -> {try {System.out.println(Thread.currentThread().getName()+ " 執行任務: " + taskId);Thread.sleep(1000); // 模擬任務執行countDownLatch.countDown();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 5. 監控線程池狀態ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();monitor.scheduleAtFixedRate(() -> {System.out.println("\n[監控] 活躍線程: " + executor.getActiveCount()+ " | 隊列大小: " + executor.getQueue().size()+ " | 總完成任務: " + executor.getCompletedTaskCount());}, 1, 2, TimeUnit.SECONDS);// 6. 等待任務執行完成countDownLatch.await();executor.shutdown();monitor.shutdown();System.out.println("剩余數據庫任務: " + taskDatabase.dbQueue.size());}