【Java】多线程和高并发编程(四):阻塞队列(上)基础概念、ArrayBlockingQueue
- 其他
- 2025-08-23 03:21:02

文章目录 四、阻塞队列1、基础概念1.1 生产者消费者概念1.2 JUC阻塞队列的存取方法 2、ArrayBlockingQueue2.1 ArrayBlockingQueue的基本使用2.2 生产者方法实现原理2.2.1 ArrayBlockingQueue的常见属性2.2.2 add方法实现2.2.3 offer方法实现2.2.4 offer(time,unit)方法2.2.5 put方法 2.3 消费者方法实现原理2.3.1 remove方法2.4.2 poll方法2.4.3 poll(time,unit)方法2.4.4 take方法2.4.5 虚假唤醒
个人主页:道友老李 欢迎加入社区:道友老李的学习社区
四、阻塞队列 1、基础概念 1.1 生产者消费者概念生产者消费者是设计模式的一种。让生产者和消费者基于一个容器来解决强耦合问题。
生产者 消费者彼此之间不会直接通讯的,而是通过一个容器(队列)进行通讯。
所以生产者生产完数据后扔到容器中,不通用等待消费者来处理。
消费者不需要去找生产者要数据,直接从容器中获取即可。
而这种容器最常用的结构就是队列。
1.2 JUC阻塞队列的存取方法常用的存取方法都是来自于JUC包下的BlockingQueue
生产者存储方法
add(E) // 添加数据到队列,如果队列满了,无法存储,抛出异常 offer(E) // 添加数据到队列,如果队列满了,返回false offer(E,timeout,unit) // 添加数据到队列,如果队列满了,阻塞timeout时间,如果阻塞一段时间,依然没添加进入,返回false put(E) // 添加数据到队列,如果队列满了,挂起线程,等到队列中有位置,再扔数据进去,死等!消费者取数据方法
remove() // 从队列中移除数据,如果队列为空,抛出异常 poll() // 从队列中移除数据,如果队列为空,返回null,么的数据 poll(timeout,unit) // 从队列中移除数据,如果队列为空,挂起线程timeout时间,等生产者扔数据,再获取 take() // 从队列中移除数据,如果队列为空,线程挂起,一直等到生产者扔数据,再获取 2、ArrayBlockingQueue 2.1 ArrayBlockingQueue的基本使用ArrayBlockingQueue在初始化的时候,必须指定当前队列的长度。
因为ArrayBlockingQueue是基于数组实现的队列结构,数组长度不可变,必须提前设置数组长度信息。
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // 必须设置队列的长度 ArrayBlockingQueue queue = new ArrayBlockingQueue(4); // 生产者扔数据 queue.add("1"); queue.offer("2"); queue.offer("3",2,TimeUnit.SECONDS); queue.put("2"); // 消费者取数据 System.out.println(queue.remove()); System.out.println(queue.poll()); System.out.println(queue.poll(2,TimeUnit.SECONDS)); System.out.println(queue.take()); } 2.2 生产者方法实现原理生产者添加数据到队列的方法比较多,需要一个一个查看
2.2.1 ArrayBlockingQueue的常见属性ArrayBlockingQueue中的成员变量
lock = 就是一个ReentrantLock count = 就是当前数组中元素的个数 iterms = 就是数组本身 # 基于putIndex和takeIndex将数组结构实现为了队列结构 putIndex = 存储数据时的下标 takeIndex = 去数据时的下标 notEmpty = 消费者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify) notFull = 生产者挂起线程和唤醒线程用到的Condition(看成sync的wait和notify) 2.2.2 add方法实现add方法本身就是调用了offer方法,如果offer方法返回false,直接抛出异常
public boolean add(E e) { if (offer(e)) return true; else // 抛出的异常 throw new IllegalStateException("Queue full"); } 2.2.3 offer方法实现 public boolean offer(E e) { // 要求存储的数据不允许为null,为null就抛出空指针 checkNotNull(e); // 当前阻塞队列的lock锁 final ReentrantLock lock = this.lock; // 为了保证线程安全,加锁 lock.lock(); try { // 如果队列中的元素已经存满了, if (count == items.length) // 返回false return false; else { // 队列没满,执行enqueue将元素添加到队列中 enqueue(e); // 返回true return true; } } finally { // 操作完释放锁 lock.unlock(); } } //========================================================== private void enqueue(E x) { // 拿到数组的引用 final Object[] items = this.items; // 将元素放到指定位置 items[putIndex] = x; // 对inputIndex进行++操作,并且判断是否已经等于数组长度,需要归位 if (++putIndex == items.length) // 将索引设置为0 putIndex = 0; // 元素添加成功,进行++操作。 count++; // 将一个Condition中阻塞的线程唤醒。 notEmpty.signal(); } 2.2.4 offer(time,unit)方法生产者在添加数据时,如果队列已经满了,阻塞一会。
阻塞到消费者消费了消息,然后唤醒当前阻塞线程阻塞到了time时间,再次判断是否可以添加,不能,直接告辞。 // 如果线程在挂起的时候,如果对当前阻塞线程的中断标记位进行设置,此时会抛出异常直接结束 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 非空检验 checkNotNull(e); // 将时间单位转换为纳秒 long nanos = unit.toNanos(timeout); // 加锁 final ReentrantLock lock = this.lock; // 允许线程中断并排除异常的加锁方式 lock.lockInterruptibly(); try { // 为什么是while(虚假唤醒) // 如果元素个数和数组长度一致,队列慢了 while (count == items.length) { // 判断等待的时间是否还充裕 if (nanos <= 0) // 不充裕,直接添加失败 return false; // 挂起等待,会同时释放锁资源(对标sync的wait方法) // awaitNanos会挂起线程,并且返回剩余的阻塞时间 // 恢复执行时,需要重新获取锁资源 nanos = notFull.awaitNanos(nanos); } // 说明队列有空间了,enqueue将数据扔到阻塞队列中 enqueue(e); return true; } finally { // 释放锁资源 lock.unlock(); } } 2.2.5 put方法如果队列是满的, 就一直挂起,直到被唤醒,或者被中断
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // await方法一直阻塞,直到被唤醒或者中断标记位 notFull.await(); enqueue(e); } finally { lock.unlock(); } } 2.3 消费者方法实现原理 2.3.1 remove方法 // remove方法就是调用了poll public E remove() { E x = poll(); // 如果有数据,直接返回 if (x != null) return x; // 没数据抛出异常 else throw new NoSuchElementException(); } 2.4.2 poll方法 // 拉取数据 public E poll() { // 加锁操作 final ReentrantLock lock = this.lock; lock.lock(); try { // 如果没有数据,直接返回null,如果有数据,执行dequeue,取出数据并返回 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //========================================================== // 取出数据 private E dequeue() { // 将成员变量引用到局部变量 final Object[] items = this.items; // 直接获取指定索引位置的数据 E x = (E) items[takeIndex]; // 将数组上指定索引位置设置为null items[takeIndex] = null; // 设置下次取数据时的索引位置 if (++takeIndex == items.length) takeIndex = 0; // 对count进行--操作 count--; // 迭代器内容,先跳过 if (itrs != null) itrs.elementDequeued(); // signal方法,会唤醒当前Condition中排队的一个Node。 // signalAll方法,会将Condition中所有的Node,全都唤醒 notFull.signal(); // 返回数据。 return x; } 2.4.3 poll(time,unit)方法 public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 转换时间单位 long nanos = unit.toNanos(timeout); // 竞争锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果没有数据 while (count == 0) { if (nanos <= 0) // 没数据,也无法阻塞了,返回null return null; // 没数据,挂起消费者线程 nanos = notEmpty.awaitNanos(nanos); } // 取数据 return dequeue(); } finally { lock.unlock(); } } 2.4.4 take方法 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 虚假唤醒 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } 2.4.5 虚假唤醒阻塞队列中,如果需要线程挂起操作,判断有无数据的位置采用的是while循环 ,为什么不能换成if
肯定是不能换成if逻辑判断
线程A,线程B,线程E,线程C。 其中ABE生产者,C属于消费者
假如线程的队列是满的
// E,拿到锁资源,还没有走while判断 while (count == items.length) // A醒了 // B挂起 notFull.await(); enqueue(e);C此时消费一条数据,执行notFull.signal()唤醒一个线程,A线程被唤醒
E走判断,发现有空余位置,可以添加数据到队列,E添加数据,走enqueue
如果判断是if,A在E释放锁资源后,拿到锁资源,直接走enqueue方法。
此时A线程就是在putIndex的位置,覆盖掉之前的数据,造成数据安全问题
【Java】多线程和高并发编程(四):阻塞队列(上)基础概念、ArrayBlockingQueue由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【Java】多线程和高并发编程(四):阻塞队列(上)基础概念、ArrayBlockingQueue”