IDEA中的快捷键

zookeeper集群原理分析

  返回  

ArrayBlockingQueue源码解析

2021/7/21 0:59:31 浏览:

什么是阻塞队列

队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。

常用阻塞队列

在这里插入图片描述

ArrayBlockingQueue

一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。

LinkedBlockingQueue

一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。

SynchronousQueue

是一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
                 new SynchronousQueue<Runnable>());
}

LinkedTransferQueue

是 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。

PriorityBlockingQueue

是一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。

DelayQueue

一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
使用场景:
1.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
2.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

LinkedBlockingDeque

一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。

常用方法在这里插入图片描述 抛出异常

是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

返回特殊值

插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null。

一直阻塞

当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

超时退出

当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

源码分析

在这里插入图片描述
在这里插入图片描述

常用属性

//元素数组
final Object[] items;
 // 取元素的指针
int takeIndex;
// 放元素的指针
int putIndex; 
// 元素数量
int count; 
//集合锁
final ReentrantLock lock;
//非空条件等待队列
private final Condition notEmpty;
//未满条件等待队列
private final Condition notFull;

代码分析

package com;
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
        queue.put("aaa");
        queue.take();
     }
}

构造方法

public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        //初始化数组容量为capacity
        this.items = new Object[capacity];
        //初始化锁,true为公平锁,false为非公平锁
        lock = new ReentrantLock(fair);
        //初始化两个条件队列
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

put方法

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //入队时,先加锁,如果被打断会抛异常出来
        lock.lockInterruptibly();
        try {
            /*
             *当队列已经放满时,则把当前线程放入,未满条件等待队列,挂起线程
             *当线程被唤醒时,则再去看队列是否放满
             *未放满,则元素入队,放满则再继续挂起
             */
            while (count == items.length)
                notFull.await();
            //当队列未满时,则把元素入队,
            enqueue(e);
        } finally {
            //添加完成释放锁
            lock.unlock();
        }
    }

take方法

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //出队时,先加锁
        lock.lockInterruptibly();
        try {
            /*
             *当队列为空时,则把该线程放入非空条件等待队列,挂起该线程
             *当线程被唤醒时,则再去看队列是否非空
             *有值,则出队,没有值,则再次挂起该线程
             */
            while (count == 0)
                notEmpty.await();
            //出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

dequeue方法

private void enqueue(E x) {
    final Object[] items = this.items;
    //把元素放入索引为‘放指针’的位置
    items[putIndex] = x;
    //放索引加一,当如果,指针已经到了数组尾部,则指针又回到数组头部,就是重置为0
    if (++putIndex == items.length)
        putIndex = 0;
    //队列元素长度加一
    count++;
    //当元素已经放入,则通知非空条件队列,唤醒该队列线程可以
    notEmpty.signal();
}

涉及技术:
ReentrantLock源码分析
Condition源码分析

总结

该队列使用的是数组来存放数据,如果当数据放满后,使用put方法则,阻塞住该线程,把该线程移入到未满条件队列中,等待取数据后被唤醒;如果取数据时,队列为空,则把该线程放入非空条件队列,等待放数据后被唤醒;因为使用的是数组,用写指针和读指针来标记位置,如果到了队尾,则会重新跳到队头;相当于是一个环;

联系我们

如果您对我们的服务有兴趣,请及时和我们联系!

服务热线:18288888888
座机:18288888888
传真:
邮箱:888888@qq.com
地址:郑州市文化路红专路93号