欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > 并发专题(9)之JUC阻塞队列源码分析

并发专题(9)之JUC阻塞队列源码分析

2025/6/30 5:15:26 来源:https://blog.csdn.net/jokeMqc/article/details/144274282  浏览:    关键词:并发专题(9)之JUC阻塞队列源码分析

一、DelayQueue

        DelayQueue是无界队列,延迟的操作,可以向延迟队列追加任务,这个任务需要指定延迟时间,只有延迟时间到了,才可以将任务从队列中获取出来。

        任务可以指定延迟时间,所以需要任务满足一定的需求,DelayQueue的任务需要实现Delayed接口,重写getDelay方法和compare方法 。

        getDelay:任务什么时候可以出队列。。

        compareTo:存放任务到队列时,放在二叉堆的哪个位置。

class Task implements Delayed{private String name;/**  执行时间 (单位毫秒) */private Long time;/**** @param name  任务名称* @param delayTime  传入延迟时间*/public Task(String name, Long delayTime) {this.name = name;this.time = System.currentTimeMillis() + delayTime;}/** 任务可以出队列的核心方法 */@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/** 通过这个方法,来比较,将任务存放到二叉堆的指定位置 */@Overridepublic int compareTo(Delayed o) {// 基于执行时间比较return (int) (this.time - ((Task)o).getTime());}
}

        使用案例:

public static void main(String[] args) throws InterruptedException {DelayQueue queue = new DelayQueue();queue.offer(new Task("A",4000L));queue.offer(new Task("B",2000L));queue.offer(new Task("C",3000L));queue.offer(new Task("D",1000L));System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());
}

1.1 源码分析

         首先,想掌握延迟队列的源码信息,你需要先掌握优先级队列。 技术路上的苦行僧-CSDN博客

         因为DelayQueue是无界队列,空间不够会扩容,生产者不需要挂起线程,空间肯定可以存放下当前的任务节点,只需要查看offer方法即可,其他的方法也是调用offer方法。

    public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 调用优先队列,添加任务q.offer(e);// 获取第一个数据与当前数据对比,判断当前放入的数据是否是第一个// 如果是,则需要唤醒消费者if (q.peek() == e) {// leader这个属性一会获取的源码会说明leader = null;// 唤醒线程available.signal();}return true;} finally {lock.unlock();}}// 这个是优先级队列的添加,延迟队列是基于优先级队列实现的功能
public boolean offer(E e) {if (e == null)throw new NullPointerException();modCount++;int i = size;// 空间不够,扩容数组if (i >= queue.length)grow(i + 1);size = i + 1;if (i == 0)// 放第一个数据,不需要上移queue[0] = e;else// 不是一个数据,判断是否需要上移siftUp(i, e);return true;
}

        那么延迟队列是如何获取数据的,接下来我们分析一下获取数据的源码:

    public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {// 从优先队列中获取第一个任务出来E first = q.peek();// 判断是否有内容  || 判断当前的延迟时间是否已经到期if (first == null || first.getDelay(NANOSECONDS) > 0)return null;else// 如果有数据,并且时间已经到了则直接返回数据return q.poll();} finally {lock.unlock();}}

        接下来我们分析一下poll带时间等待的源码:

 public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 获取锁等待超时时间long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 上锁lock.lockInterruptibly();try {for (;;) {// 获取堆顶数据E first = q.peek();// 如果数据为空if (first == null) {// 并且剩余等待时间小于等于0则直接返回if (nanos <= 0)return null;else// 拿不到数据并且剩余等待时间则挂起线程等待nanos = available.awaitNanos(nanos);} else {// 获取得到数据,获取数据延迟时间long delay = first.getDelay(NANOSECONDS);// 如果延迟时间到了直接返回数据if (delay <= 0)return q.poll();// 数据延迟时间还没到并且没有等待时间了那么直接返回if (nanos <= 0)return null;first = null; // don't retain ref while waiting// 如果剩余的等待时间小于任务的延迟时间,这说明肯定不会拿到数据,那么直接挂起等待if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {// 到这里说明,线程的剩余等待时间大于任务的延迟时间Thread thisThread = Thread.currentThread();// 把当前线程设置给leader,说明这个线程是第一个等待的leader = thisThread;try {// 不能睡眠那么久,直接睡眠任务的延迟时间long timeLeft = available.awaitNanos(delay);// 更新一下剩余的等待时间nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {// 若leader为空,并且堆顶存在数据,则唤醒正在等待的线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

        而take方法会一直调用await,一直阻塞,一直等到拿到数据为止。与poll(time,unit)的区别是,poll(time,unit)会计算剩余额的阻塞时间,take不会。

         首先知道了DelayQueue如何用代码实现,首先节点就是任务必须实现Delayed接口,重写任务出队的时间以及任务的排序方式.

        入队:入队只有一个方法,就是offer,因为DelayQueue是无界队列,所以生产者是不需要阻塞的。

        出队:

  • poll:直接拿堆顶数据,堆顶的延迟时间到了,直接返回任务,如果没到时间,返回null。

  • poll(time,unit):

    • 直接拿堆顶数据,

      • 如果为null,或者阻塞时间已经到了,直接告辞!

      • 如果不为null

        • 并且延迟时间到了,返回数据

        • 如果数据时间没到,查看阻塞剩余的时间到了么,到了直接返回null

        • 如果数据的延迟时间没到

          • 如果阻塞时间小于延迟时间,或者已经有leader了,直接等待阻塞时间,等待被唤醒即可

          • 当前阻塞时间大于等于延迟时间,并且leader为null,这是就阻塞延迟时间即可

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词