一、DelayQueue
DelayQueue是无界队列,延迟的操作,可以向延迟队列追加任务,这个任务需要指定延迟时间,只有延迟时间到了,才可以将任务从队列中获取出来。
任务可以指定延迟时间,所以需要任务满足一定的需求,DelayQueue的任务需要实现Delayed接口,重写getDelay方法和compare方法 。
getDelay:任务什么时候可以出队列。。
compareTo:存放任务到队列时,放在二叉堆的哪个位置。
class Task implements Delayed{private String name;/** 执行时间 (单位毫秒) */private Long time;/**** @param name 任务名称* @param delayTime 传入延迟时间*/public Task(String name, Long delayTime) {this.name = name;this.time = System.currentTimeMillis() + delayTime;}/** 任务可以出队列的核心方法 */@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/** 通过这个方法,来比较,将任务存放到二叉堆的指定位置 */@Overridepublic int compareTo(Delayed o) {// 基于执行时间比较return (int) (this.time - ((Task)o).getTime());}
}
使用案例:
public static void main(String[] args) throws InterruptedException {DelayQueue queue = new DelayQueue();queue.offer(new Task("A",4000L));queue.offer(new Task("B",2000L));queue.offer(new Task("C",3000L));queue.offer(new Task("D",1000L));System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());
}
1.1 源码分析
首先,想掌握延迟队列的源码信息,你需要先掌握优先级队列。 技术路上的苦行僧-CSDN博客
因为DelayQueue是无界队列,空间不够会扩容,生产者不需要挂起线程,空间肯定可以存放下当前的任务节点,只需要查看offer方法即可,其他的方法也是调用offer方法。
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 调用优先队列,添加任务q.offer(e);// 获取第一个数据与当前数据对比,判断当前放入的数据是否是第一个// 如果是,则需要唤醒消费者if (q.peek() == e) {// leader这个属性一会获取的源码会说明leader = null;// 唤醒线程available.signal();}return true;} finally {lock.unlock();}}// 这个是优先级队列的添加,延迟队列是基于优先级队列实现的功能
public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;// 空间不够,扩容数组if (i >= queue.length)grow(i + 1);size = i + 1;if (i == 0)// 放第一个数据,不需要上移queue[0] = e;else// 不是一个数据,判断是否需要上移siftUp(i, e);return true;
}
那么延迟队列是如何获取数据的,接下来我们分析一下获取数据的源码:
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {// 从优先队列中获取第一个任务出来E first = q.peek();// 判断是否有内容 || 判断当前的延迟时间是否已经到期if (first == null || first.getDelay(NANOSECONDS) > 0)return null;else// 如果有数据,并且时间已经到了则直接返回数据return q.poll();} finally {lock.unlock();}}
接下来我们分析一下poll带时间等待的源码:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 获取锁等待超时时间long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 上锁lock.lockInterruptibly();try {for (;;) {// 获取堆顶数据E first = q.peek();// 如果数据为空if (first == null) {// 并且剩余等待时间小于等于0则直接返回if (nanos <= 0)return null;else// 拿不到数据并且剩余等待时间则挂起线程等待nanos = available.awaitNanos(nanos);} else {// 获取得到数据,获取数据延迟时间long delay = first.getDelay(NANOSECONDS);// 如果延迟时间到了直接返回数据if (delay <= 0)return q.poll();// 数据延迟时间还没到并且没有等待时间了那么直接返回if (nanos <= 0)return null;first = null; // don't retain ref while waiting// 如果剩余的等待时间小于任务的延迟时间,这说明肯定不会拿到数据,那么直接挂起等待if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {// 到这里说明,线程的剩余等待时间大于任务的延迟时间Thread thisThread = Thread.currentThread();// 把当前线程设置给leader,说明这个线程是第一个等待的leader = thisThread;try {// 不能睡眠那么久,直接睡眠任务的延迟时间long timeLeft = available.awaitNanos(delay);// 更新一下剩余的等待时间nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {// 若leader为空,并且堆顶存在数据,则唤醒正在等待的线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}
而take方法会一直调用await,一直阻塞,一直等到拿到数据为止。与poll(time,unit)的区别是,poll(time,unit)会计算剩余额的阻塞时间,take不会。
首先知道了DelayQueue如何用代码实现,首先节点就是任务必须实现Delayed接口,重写任务出队的时间以及任务的排序方式.
入队:入队只有一个方法,就是offer,因为DelayQueue是无界队列,所以生产者是不需要阻塞的。
出队:
-
poll:直接拿堆顶数据,堆顶的延迟时间到了,直接返回任务,如果没到时间,返回null。
-
poll(time,unit):
-
直接拿堆顶数据,
-
如果为null,或者阻塞时间已经到了,直接告辞!
-
如果不为null
-
并且延迟时间到了,返回数据
-
如果数据时间没到,查看阻塞剩余的时间到了么,到了直接返回null
-
如果数据的延迟时间没到
-
如果阻塞时间小于延迟时间,或者已经有leader了,直接等待阻塞时间,等待被唤醒即可
-
当前阻塞时间大于等于延迟时间,并且leader为null,这是就阻塞延迟时间即可
-
-
-
-