日常开发中实际上使用队列很多,但是真正了解每个队列很少,可能用了有界队列,也不清楚什么是有界队列,生产故障发生后,搜捕大量资料,可能才发现是队列的特性,导致一些任务的丢失,或者一些任务的阻塞等等,本文针对这些情况,简单从队列的特性和适用场景进行介绍,并提供基础的队列在线程池初始化的DEMO。有总结不对处,多包涵、评论区多交流。
目录
常见队列简单介绍
LinkedBlockingQueue-基于链表的阻塞队列
DelayQueue-延时队列
LinkedTransferQueue-基于链表的无界阻塞队列
ConcurrentLinkedQueue-基于链表的无界并发队列
ArrayBlockingQueue-基于数组的阻塞队列
LinkedBlockingDeque-基于链表的双向阻塞队列
PriorityBlockingQueue-基于优先级的阻塞队列
SynchronousQueue-直接提交队列/同步队列
WorkStealingQueue-窃取队列
Java队列对比总结
各队列详细说明与示例代码
1. LinkedBlockingQueue (无界/有界阻塞队列)
2. DelayQueue (延迟无界阻塞队列)
3. LinkedTransferQueue (无界阻塞队列)
4. ConcurrentLinkedQueue (无界非阻塞队列)
5. ArrayBlockingQueue (有界阻塞队列)
6. LinkedBlockingDeque (有界双端阻塞队列)
7. PriorityBlockingQueue (无界优先级阻塞队列)
8. SynchronousQueue (同步队列)
9. WorkStealingPool (工作窃取队列)
关键选择建议:
常见队列简单介绍
LinkedBlockingQueue-基于链表的阻塞队列
特点:它是一个基于链表实现的阻塞队列,默认情况下容量为 Integer.MAX_VALUE,也就是几乎可以看作是无界的(实际受限于系统内存等因素)。当线程池中的线程处理任务速度小于任务提交速度时,任务会不断被添加到这个队列中,理论上不会出现队列满的情况,因此可以避免任务拒绝的情况发生,但如果任务持续快速堆积,可能会导致内存溢出等问题。
适用场景:适用于任务量波动较大,但对任务拒绝比较敏感,希望尽可能容纳所有提交任务的场景,比如一些后台异步任务处理场景,像日志记录异步处理等,只要内存资源允许,尽量接收并处理所有待记录的日志信息。
DelayQueue-延时队列
特点:这是一个支持延时获取元素的无界阻塞队列。只有在指定的延时时间过后,元素才能从队列中删除。DelayQueue适用于需要在特定时间间隔后执行任务的场景,如定时任务或消息中间件中的延迟消息消费等。
适用场景:适用于需要延迟执行任务、处理定时操作或管理超时机制的并发场景,典型应用包括订单超时自动取消、定时任务调度、缓存自动过期和连接超时管理等
LinkedTransferQueue-基于链表的无界阻塞队列
特点:这是一个基于链表的无界阻塞队列,支持生产者-消费者模式。与 LinkedBlockingQueue 类似,它也适用于多线程环境下的任务调度或数据缓冲等场景。
适用场景:高性能的数据传输:当需要在多个线程之间高效地传递数据时,使用 transfer 方法可以减少不必要的等待时间和锁竞争,从而提高性能。例如,在事件处理框架中,一个线程可以立即将事件传递给正在等待的处理器线程。
任务分发系统:在任务分发系统中,可以使用 LinkedTransferQueue 来高效地分配任务给空闲的工作线程。例如,在Web服务器或游戏服务器中,可以使用 transfer 方法将请求快速传递给空闲的处理器线程。
生产者-消费者模式:虽然 LinkedTransferQueue 主要设计用于高效的线程间数据传输,但它也可以用于传统的生产者-消费者场景。当生产者需要确保数据被某个特定的消费者快速消费时,使用 transfer 方法可以避免队列的拥塞。
消息传递系统:在构建消息传递系统时,LinkedTransferQueue 可以作为组件之一,用于在生产者和消费者之间快速传递消息。这种方式比使用传统的消息队列(如 JMS 或 RabbitMQ)在某些情况下更高效,因为它减少了中间存储的需要。
ConcurrentLinkedQueue-基于链表的无界并发队列
特点:这是一个基于链表的无界并发队列,按照先进先出的原则对元素进行排序。它是线程安全的,适用于高并发环境下的任务调度或数据缓冲等场景。
适用场景:高并发任务队列、生产者-消费者模型、事件驱动架构、日志记录、缓存淘汰策略、消息传递
ArrayBlockingQueue-基于数组的阻塞队列
特点:基于数组实现的阻塞队列,在创建时需要指定队列的容量大小。当队列已满时,若再有新的任务提交,提交任务的线程会被阻塞,直到队列有空闲空间为止。它是一个有界的、遵循先进先出(FIFO)原则的队列,保证了任务按照提交的先后顺序依次执行。
适用场景:适用于对资源使用有明确限制,需要控制队列中任务数量的场景,例如在一个资源有限的服务器环境下,对同时处理的网络请求任务数量进行限制,避免过多任务堆积耗尽系统资源,通过设置合适的队列容量,确保系统的稳定性和响应性能。
LinkedBlockingDeque-基于链表的双向阻塞队列
特点:它也是基于链表结构,但与LinkedBlockingQueue不同的是,它是一个双向队列,支持在队列的两端进行插入和移除操作,同样可以设置容量限制成为有界队列。在多线程环境下,这种双向操作特性可以提供更灵活的任务调度方式,比如可以实现将高优先级任务从队头插入优先执行等情况。
适用场景:适合需要灵活调整任务执行顺序,同时又要对队列规模进行控制的场景,比如在一个任务处理系统中,有紧急任务需要插队优先处理时,可以通过在队头插入的方式让其尽快被执行,并且通过设置容量防止过多任务无序堆积。
PriorityBlockingQueue-基于优先级的阻塞队列
特点:这是一个支持优先级排序的无界阻塞队列(虽然说是无界,但实际受系统资源限制),队列中的元素(即任务)需要实现 Comparable 接口或者在创建队列时传入自定义的比较器 Comparator,以此来确定任务的优先级顺序。每次从队列中取出任务时,会优先取出优先级最高的任务进行执行。
适用场景:适用于任务有明显优先级区分的情况,例如在一个监控系统中,告警任务有不同的严重级别,严重级别高的告警任务(如服务器宕机告警)优先级更高,需要优先处理,就可以将这些告警任务放入PriorityBlockingQueue中,按照优先级高低依次执行。
SynchronousQueue-直接提交队列/同步队列
特点:它是一种特殊的队列,内部没有实际的存储容量,每插入一个任务必须等待有线程来获取并执行这个任务,反之,线程来获取任务时,如果没有任务可用,线程会被阻塞等待任务提交。这种队列更像是一种任务传递的媒介,直接将任务从提交者传递到执行线程手上,保证了任务的即时处理,不存在任务排队等待的情况。
适用场景:适用于要求任务提交后能立即被执行,不允许有任务等待堆积的场景,比如在一些对实时性要求极高的交互场景中,像在线实时交易系统中处理下单请求,希望下单任务能马上被线程处理,而不是先放入队列等待,以保障交易的及时性和流畅性。
WorkStealingQueue-窃取队列
特点:是一种在并发编程中广泛使用的数据结构,主要用于实现工作窃取算法(Work-Stealing Algorithm),特别是在使用线程池和并行计算框架时。工作窃取算法是一种用于提高并行任务执行效率的技术,特别是在多核处理器上。其主要思想是允许多个线程从自己的队列中获取任务执行,但如果一个线程的队列为空,它可以尝试从其他线程的队列中“窃取”任务。
适用场景:Java的Fork/Join框架、Apache Spark、C++的Intel Threading Building Blocks (TBB)、自定义并行计算,开发自行实现WorkStealingQueue来管理线程间的工作负载分配。
Java队列对比总结
队列类型 | 边界 | 底层结构 | 线程安全 | 特性 | 适用场景 |
---|---|---|---|---|---|
LinkedBlockingQueue | 可选有界/无界 | 链表 | ✅ (ReentrantLock) | 阻塞操作,分离读写锁,高吞吐量 | 通用生产者-消费者模型 |
DelayQueue | 无界 | 优先级堆 (PriorityQueue) | ✅ (ReentrantLock) | 元素需实现Delayed接口,延迟获取 | 定时任务调度、缓存过期 |
LinkedTransferQueue | 无界 | 链表 | ✅ (CAS) | 支持transfer()生产者等待消费者,高性能无锁 | 高性能的数据传输、任务发布系统 |
ConcurrentLinkedQueue | 无界 | 链表 | ✅ (CAS) | 非阻塞算法,高并发性能,不支持阻塞操作 | 高并发非阻塞队列 |
ArrayBlockingQueue | 有界 | 数组 | ✅ (ReentrantLock) | 固定容量,可选公平锁,单锁控制 | 固定容量的生产者-消费者模型 |
LinkedBlockingDeque | 可选有界/无界 | 双向链表 | ✅ (ReentrantLock) | 双端操作,阻塞方法 | 工作窃取、双端操作场景 |
PriorityBlockingQueue | 无界 | 优先级堆 (数组) | ✅ (ReentrantLock) | 元素可排序,动态扩容 | 需要优先级处理的阻塞队列 |
SynchronousQueue | 容量0 | 栈/队列 | ✅ | 直接传递,不存储元素,put/take必须配对 | 线程间直接传递数据 |
WorkStealingQueue | 工作窃取队列 | 双端队列数组 | ✅ (工作窃取) | 每个线程有自己的队列,空闲线程窃取其他队列任务 | 计算密集型并行任务 |
各队列详细说明与示例代码
1. LinkedBlockingQueue (无界/有界阻塞队列)
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public class LinkedBlockingQueueDemo {public static void main(String[] args) {BlockingQueue<String> queue = new LinkedBlockingQueue<>(3); // 有界队列// 生产者线程new Thread(() -> {try {queue.put("A");queue.put("B");queue.put("C");System.out.println("生产者阻塞...");queue.put("D"); // 阻塞直到有空间System.out.println("生产者继续");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// 消费者线程new Thread(() -> {try {Thread.sleep(2000);System.out.println("消费: " + queue.take());System.out.println("消费: " + queue.take());} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
2. DelayQueue (延迟无界阻塞队列)
import java.util.concurrent.*;class DelayedTask implements Delayed {private final String name;private final long startTime;public DelayedTask(String name, long delayMillis) {this.name = name;this.startTime = System.currentTimeMillis() + delayMillis;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed other) {return Long.compare(this.startTime, ((DelayedTask) other).startTime);}@Overridepublic String toString() {return "Task[" + name + "]";}
}public class DelayQueueDemo {public static void main(String[] args) {BlockingQueue<DelayedTask> queue = new DelayQueue<>();// 生产者new Thread(() -> {queue.add(new DelayedTask("A", 3000));queue.add(new DelayedTask("B", 1000));queue.add(new DelayedTask("C", 2000));}).start();// 消费者new Thread(() -> {while (true) {try {DelayedTask task = queue.take();System.out.println("执行: " + task + " at " + System.currentTimeMillis());} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}).start();}
}
3. LinkedTransferQueue (无界阻塞队列)
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;public class LinkedTransferQueueDemo {public static void main(String[] args) {TransferQueue<String> queue = new LinkedTransferQueue<>();// 消费者线程new Thread(() -> {try {Thread.sleep(1500);System.out.println("消费者等待数据...");String data = queue.take();System.out.println("接收: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// 生产者线程new Thread(() -> {try {System.out.println("生产者传输数据...");queue.transfer("Data123"); // 阻塞直到被消费System.out.println("传输完成");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
4. ConcurrentLinkedQueue (无界非阻塞队列)
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;public class ConcurrentLinkedQueueDemo {public static void main(String[] args) {Queue<String> queue = new ConcurrentLinkedQueue<>();// 生产者线程new Thread(() -> {for (int i = 0; i < 5; i++) {queue.offer("Item-" + i);System.out.println("生产: Item-" + i);}}).start();// 消费者线程new Thread(() -> {for (int i = 0; i < 5; i++) {String item = queue.poll();if (item != null) {System.out.println("消费: " + item);}try {Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}).start();}
}
5. ArrayBlockingQueue (有界阻塞队列)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class ArrayBlockingQueueDemo {public static void main(String[] args) {BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);// 生产者new Thread(() -> {try {queue.put("A");queue.put("B");System.out.println("尝试放入C...");queue.put("C"); // 阻塞System.out.println("放入C成功");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// 消费者new Thread(() -> {try {Thread.sleep(2000);System.out.println("消费: " + queue.take());Thread.sleep(1000);System.out.println("消费: " + queue.take());} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
6. LinkedBlockingDeque (有界双端阻塞队列)
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;public class LinkedBlockingDequeDemo {public static void main(String[] args) {BlockingDeque<String> deque = new LinkedBlockingDeque<>(2);// 前端生产者new Thread(() -> {try {deque.putFirst("Front1");deque.putFirst("Front2");System.out.println("前端阻塞...");deque.putFirst("Front3"); // 阻塞} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// 后端消费者new Thread(() -> {try {Thread.sleep(1500);System.out.println("后端消费: " + deque.takeLast());System.out.println("后端消费: " + deque.takeLast());} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
7. PriorityBlockingQueue (无界优先级阻塞队列)
import java.util.concurrent.PriorityBlockingQueue;public class PriorityBlockingQueueDemo {public static void main(String[] args) {PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();// 生产者new Thread(() -> {queue.add(5);queue.add(1);queue.add(10);queue.add(3);}).start();// 消费者new Thread(() -> {try {System.out.println("消费: " + queue.take()); // 1System.out.println("消费: " + queue.take()); // 3System.out.println("消费: " + queue.take()); // 5System.out.println("消费: " + queue.take()); // 10} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
8. SynchronousQueue (同步队列)
import java.util.concurrent.SynchronousQueue;public class SynchronousQueueDemo {public static void main(String[] args) {SynchronousQueue<String> queue = new SynchronousQueue<>();// 生产者new Thread(() -> {try {String data = "重要数据";System.out.println("生产数据: " + data);queue.put(data); // 阻塞直到被消费System.out.println("数据已被消费");} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();// 消费者new Thread(() -> {try {Thread.sleep(2000);String data = queue.take();System.out.println("消费数据: " + data);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}
}
9. WorkStealingPool (工作窃取队列)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class WorkStealingPoolDemo {public static void main(String[] args) {ExecutorService executor = Executors.newWorkStealingPool(4);for (int i = 0; i < 10; i++) {final int taskId = i;executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 执行任务 " + taskId);try {Thread.sleep(500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 等待任务完成try {Thread.sleep(3000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}executor.shutdown();}
}
关键选择建议:
-
需要阻塞操作 →
LinkedBlockingQueue
,ArrayBlockingQueue
-
延迟任务处理 →
DelayQueue
-
生产者需要等待消费者 →
LinkedTransferQueue
-
超高并发非阻塞 →
ConcurrentLinkedQueue
-
固定容量队列 →
ArrayBlockingQueue
-
双端操作 →
LinkedBlockingDeque
-
优先级处理 →
PriorityBlockingQueue
-
线程间直接传递 →
SynchronousQueue
-
并行计算密集型任务 →
WorkStealingPool
根据具体场景的需求(吞吐量、延迟要求、顺序保证、容量限制等)选择合适的队列实现。