`
haibin369
  • 浏览: 58607 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Java源码:阻塞队列(ArrayBlockingQueue)

    博客分类:
  • Java
阅读更多

一、简介

所谓阻塞队列,其实就是支持下面这两种阻塞功能的队列:

  • 当队列为空时,读取该队列可以阻塞直到队列不为空;
  • 当队列已满时,写入该队列可以阻塞直到队列不为满;

这种阻塞队列主要用于可以用来构建生产者-消费者模型,生产者只需要往队列中发送消息,而消费者也只需要专注于从队列中读取消息,剩下的同步、阻塞细节都交给阻塞队列把。


Java提供了下面7种阻塞队列,区别于底层数据结构的不同:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的接口是:java.util.concurrent.BlockingQueue,主要提供了以下存取方法(根据队列空或者满时的响应方式,可分成3类):
  立即返回结果值 超时返回结果值 阻塞 抛出异常
插入 offer(e) offer(e,time,unit) put(e) add(e)
移除 poll() poll(time,unit) take() remove()
读取 peek() element()

二、源码

  • ArrayBlockingQueue

    下面以ArrayBlockingQueue为例看看JDK的源码,其他的实现类,有先看一下它的主要属性:
        public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
                implements BlockingQueue<E>, java.io.Serializable{  
            /** 底层用于存放队列元素的数组 */  
            final Object[] items;  
          
            /** 下一个获取索引,take,poll,peek和remove会用到 */  
            int takeIndex;  
          
            /** 下一个插入索引,put,offer和add方法会用到 */  
            int putIndex;  
          
            /** 当前队列中元素的个数 */  
            int count;  
          
            /** 用于控制并发操作队列的锁对象 */  
            final ReentrantLock lock;  
            /** 队列为非空的条件对象,用于唤醒阻塞中的读操作 */  
            private final Condition notEmpty;  
            /** 队列为非满的条件对象,用于唤醒阻塞中的写操作 */  
            private final Condition notFull;  
        }  
  • 写入

    1. offer(e)与offer(e,time,unit)
    首先是offer(e),逻辑比较简单,上锁、判断插入(不满则插入,满了则返回)、解锁。
        public boolean offer(E e) {  
            if (e == null) throw new NullPointerException();  
            final ReentrantLock lock = this.lock;  
            //锁住队列,防止在插入过程中的并发读写  
            lock.lock();  
            try {  
                //满了,返回false  
                if (count == items.length)  
                    return false;  
                else {  
                    //执行插入  
                    insert(e);  
                    return true;  
                }  
            } finally {  
                //释放锁  
                lock.unlock();  
            }  
        }  

    而offer(e,time,unit)相比offer(e)则多了阻塞给定时间的功能,注意下面的无条件for循环是为了防止多个线程同时被唤醒操作队列,因此每次都需要判断队列是否已满,是的话继续阻塞:
        public boolean offer(E e, long timeout, TimeUnit unit)  
                throws InterruptedException {  
          
            if (e == null) throw new NullPointerException();  
                long nanos = unit.toNanos(timeout);  
            //使用lockInterruptibly锁住队列,可以被中断  
            final ReentrantLock lock = this.lock;  
            lock.lockInterruptibly();  
            try {  
                for (;;) {  
                    //如果队列不满,则执行插入并返回true  
                    if (count != items.length) {  
                        insert(e);  
                        return true;  
                    }  
          
                    //如果nanos<=0,说明已经阻塞超过了给定时间了,直接返回false  
                    if (nanos <= 0)  
                        return false;  
                    try {  
                        //若该条件没被唤醒或者该线程没被中断,等待给定时间  
                        //如果等待中被唤醒,返回剩余的等待时间  
                        nanos = notFull.awaitNanos(nanos);  
                    } catch (InterruptedException ie) {  
                        notFull.signal(); // propagate to non-interrupted thread  
                        throw ie;  
                    }  
                }  
            } finally {  
                //释放锁  
                lock.unlock();  
            }  
        }  

    看到两个方法都是调用的insert(e)执行实际的插入,insert方法也比较简单,插入、非空唤醒
        private void insert(E x) {  
            items[putIndex] = x;  
            putIndex = inc(putIndex);  
            ++count;  
            notEmpty.signal();  
        }  

    2.add(e)
    add方法其实是调用了父类AbstractQueue的add方法:
        public boolean add(E e) {  
            if (offer(e))  
                return true;  
            else  
                throw new IllegalStateException("Queue full");  
        }  
    很简单,其实就是通过offer判断当前队列是否满了,是就立刻抛出异常。

    3. put(e) 其实put就相当与无限阻塞的offer(e,time,unit),也是在无限循环里面判断队列是否已满并插入,否则阻塞:
        public void put(E e) throws InterruptedException {  
            if (e == null) throw new NullPointerException();  
            final E[] items = this.items;  
            final ReentrantLock lock = this.lock;  
            lock.lockInterruptibly();  
            try {  
                try {  
                    //每次被唤醒都判断一下队列是否满了,是则继续阻塞  
                    while (count == items.length)  
                        notFull.await();  
                } catch (InterruptedException ie) {  
                    notFull.signal(); // propagate to non-interrupted thread  
                    throw ie;  
                }  
                insert(e);  
            } finally {  
                lock.unlock();  
            }  
        }  
  • 移除

    读取操作和写入其实大同小异,底层和核心逻辑都差不多,如果理解了上面关于读取的几个方法的话,读取就无需过多解释了。
    1.poll()与poll(time,unit)
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == 0)
                    return null;
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //无限循环,确保每次唤醒都要判断当前是否为空
                for (; ; ) {
                    if (count != 0) {
                        E x = extract();
                        return x;
                    }
                    if (nanos <= 0)
                        return null;
                    try {
                        nanos = notEmpty.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        notEmpty.signal(); // propagate to non-interrupted thread
                        throw ie;
                    }
    
                }
            } finally {
                lock.unlock();
            }
        }
    
        private E extract() {
            //读取队列头元素并唤醒都有等待非满线程
            final E[] items = this.items;
            E x = items[takeIndex];
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }

    2.take()
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    //为空的情况下,无限阻塞
                    while (count == 0)
                        notEmpty.await();
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }

    3.remove()
        public boolean remove(Object o) {
            if (o == null) return false;
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //遍历队列中的元素,使用equals方法判定相等则移除
                int i = takeIndex;
                int k = 0;
                for (;;) {
                    if (k++ >= count)
                        return false;
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    i = inc(i);
                }
    
            } finally {
                lock.unlock();
            }
        }
  • 读取

    首先说明一下,这下面的读取方法都是指读取队列头部的元素,因为如果构建消息队列,都是尾部插入,头部读取。读取的两个方法的核心逻辑其实都在peek()里:上锁 - 读取 - 解锁:
        public E element() {
            E x = peek();
            if (x != null)
                return x;
            else
                throw new NoSuchElementException();
        }
    
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //takeIndex会一直指向队列的头
                return (count == 0) ? null : items[takeIndex];
            } finally {
                lock.unlock();
            }
        }
以上就是我对ArrayBlockingQueue的理解,水平有限,恳请指教。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics