有很多實現,但是我想描述的是使用純JDK并發框架類: DelayedQueue和Delayed接口。
讓我從定義工作項的簡單(且為空)界面開始。 我跳過諸如屬性和方法之類的實現細節,因為它們并不重要。
package com.example.delayed;public interface WorkItem {// Some properties and methods here
}
我們模型中的下一個類將代表被推遲的工作項并實現Delayed接口。 僅有幾個基本概念需要考慮:延遲本身和相應工作項已提交的實際時間。 這就是到期日的計算方式。 因此,我們通過引入PostponedWorkItem類來做到這一點。
package com.example.delayed;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class PostponedWorkItem implements Delayed {private final long origin;private final long delay;private final WorkItem workItem;public PostponedWorkItem( final WorkItem workItem, final long delay ) {this.origin = System.currentTimeMillis();this.workItem = workItem;this.delay = delay;}@Overridepublic long getDelay( TimeUnit unit ) {return unit.convert( delay - ( System.currentTimeMillis() - origin ), TimeUnit.MILLISECONDS );}@Overridepublic int compareTo( Delayed delayed ) {if( delayed == this ) {return 0;}if( delayed instanceof PostponedWorkItem ) {long diff = delay - ( ( PostponedWorkItem )delayed ).delay;return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? -1 : 1 ) );}long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) );return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? -1 : 1 ) );}
}
如您所見,我們創建該類的新實例,并將當前系統時間保存在內部origin屬性中。 getDelayed方法計算工作項過期之前剩余的實際時間。 延遲是外部設置,它是構造函數參數。 由于Delayed擴展了此接口,因此必須強制實施Comparable <Delayed> 。
現在,我們大部分完成了! 為了完成該示例,讓我們通過實現equals和hashCode來確保不會將相同的工作項兩次提交到工作隊列中(實現非常簡單,不需要任何注釋)。
public class PostponedWorkItem implements Delayed {...@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() );return result;}@Overridepublic boolean equals( Object obj ) {if( this == obj ) {return true;}if( obj == null ) {return false;}if( !( obj instanceof PostponedWorkItem ) ) {return false;}final PostponedWorkItem other = ( PostponedWorkItem )obj;if( workItem == null ) {if( other.workItem != null ) {return false;}} else if( !workItem.equals( other.workItem ) ) {return false;}return true;}
}
最后一步是引入某種管理器,該管理器將安排工作項并定期輪詢過期的項:見WorkItemScheduler類。
package com.example.delayed;import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;public class WorkItemScheduler {private final long delay = 2000; // 2 secondsprivate final BlockingQueue< PostponedWorkItem > delayed =new DelayQueue< PostponedWorkItem >(); public void addWorkItem( final WorkItem workItem ) {final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay );if( !delayed.contains( postponed )) {delayed.offer( postponed );}}public void process() {final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >();delayed.drainTo( expired );for( final PostponedWorkItem postponed: expired ) {// Do some real work here with postponed.getWorkItem()}}
}
使用BlockingQueue可以確保線程安全和高級別的并發性。 該處理方法應定期運行,以排干工作項隊列。 它可以通過Spring Framework中的@ Scheduled注釋或JEE 6中的 EJB的@Schedule注釋進行注釋。
請享用!
參考:在Andriy Redko {devmind}博客上,我們的JCG合作伙伴 Andriy Redko 在實踐中使用了延遲隊列 。
翻譯自: https://www.javacodegeeks.com/2012/04/using-delayed-queues-in-practice.html