一、简介
所谓阻塞队列,其实就是支持下面这两种阻塞功能的队列:
- 当队列为空时,读取该队列可以阻塞直到队列不为空;
- 当队列已满时,写入该队列可以阻塞直到队列不为满;
这种阻塞队列主要用于可以用来构建生产者-消费者模型,生产者只需要往队列中发送消息,而消费者也只需要专注于从队列中读取消息,剩下的同步、阻塞细节都交给阻塞队列把。
Java提供了下面7种阻塞队列,区别于底层数据结构的不同:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
立即返回结果值 | 超时返回结果值 | 阻塞 | 抛出异常 | |
插入 | offer(e) | offer(e,time,unit) | put(e) | add(e) |
移除 | poll() | poll(time,unit) | take() | remove() |
读取 | peek() | 无 | 无 | element() |
二、源码
-
ArrayBlockingQueue
下面以ArrayBlockingQueue为例看看JDK的源码,其他的实现类,有先看一下它的主要属性:public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable{ /** 底层用于存放队列元素的数组 */ final Object[] items; /** 下一个获取索引,take,poll,peek和remove会用到 */ int takeIndex; /** 下一个插入索引,put,offer和add方法会用到 */ int putIndex; /** 当前队列中元素的个数 */ int count; /** 用于控制并发操作队列的锁对象 */ final ReentrantLock lock; /** 队列为非空的条件对象,用于唤醒阻塞中的读操作 */ private final Condition notEmpty; /** 队列为非满的条件对象,用于唤醒阻塞中的写操作 */ private final Condition notFull; }
-
写入
1. offer(e)与offer(e,time,unit)
首先是offer(e),逻辑比较简单,上锁、判断插入(不满则插入,满了则返回)、解锁。public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; //锁住队列,防止在插入过程中的并发读写 lock.lock(); try { //满了,返回false if (count == items.length) return false; else { //执行插入 insert(e); return true; } } finally { //释放锁 lock.unlock(); } }
而offer(e,time,unit)相比offer(e)则多了阻塞给定时间的功能,注意下面的无条件for循环是为了防止多个线程同时被唤醒操作队列,因此每次都需要判断队列是否已满,是的话继续阻塞:public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); //使用lockInterruptibly锁住队列,可以被中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //如果队列不满,则执行插入并返回true if (count != items.length) { insert(e); return true; } //如果nanos<=0,说明已经阻塞超过了给定时间了,直接返回false if (nanos <= 0) return false; try { //若该条件没被唤醒或者该线程没被中断,等待给定时间 //如果等待中被唤醒,返回剩余的等待时间 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { //释放锁 lock.unlock(); } }
看到两个方法都是调用的insert(e)执行实际的插入,insert方法也比较简单,插入、非空唤醒private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
2.add(e)
add方法其实是调用了父类AbstractQueue的add方法:public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
很简单,其实就是通过offer判断当前队列是否满了,是就立刻抛出异常。
3. put(e) 其实put就相当与无限阻塞的offer(e,time,unit),也是在无限循环里面判断队列是否已满并插入,否则阻塞:public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { //每次被唤醒都判断一下队列是否满了,是则继续阻塞 while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
-
移除
读取操作和写入其实大同小异,底层和核心逻辑都差不多,如果理解了上面关于读取的几个方法的话,读取就无需过多解释了。
1.poll()与poll(time,unit)public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //无限循环,确保每次唤醒都要判断当前是否为空 for (; ; ) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } private E extract() { //读取队列头元素并唤醒都有等待非满线程 final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
2.take()public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { //为空的情况下,无限阻塞 while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
3.remove()public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //遍历队列中的元素,使用equals方法判定相等则移除 int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } }
-
读取
首先说明一下,这下面的读取方法都是指读取队列头部的元素,因为如果构建消息队列,都是尾部插入,头部读取。读取的两个方法的核心逻辑其实都在peek()里:上锁 - 读取 - 解锁:public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //takeIndex会一直指向队列的头 return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } }
相关推荐
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue常用方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue功能简介,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,是ArrayBlockingQueue, ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先...
java中,常用的阻塞式队列Demo。包含:ArrayBlockingQueue、LinkedQueue、PriorityBlockingQueue
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
ArrayBlockingQueue源码分析.docx
数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9...
数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理,动力节点口口相传的Java黄埔军校
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...
主要介绍了java中LinkedBlockingQueue与ArrayBlockingQueue的异同,需要的朋友可以参考下
BlockingQueue接口 – 阻塞队列2.1 ArrayBlockingQueue类(有界阻塞队列)2.2 LinkedBlockingQueue类(无界阻塞队列)3. 源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public ...
java 队列使用,次例子是一个模拟网络爬虫工作大致流程的小例子,里面没有具体的爬取的实现,只是对爬取的流程的模拟,使用到了java 的 ArrayBlockingQueue、ConcurrentHashMap、 这2个类和java 的 volatile 关键字...
主要介绍了java并发之ArrayBlockingQueue详细介绍的相关资料,需要的朋友可以参考下
囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue(ArrayBlockingQueue、...