Java Concurrency 简明教程

Java Concurrency - BlockingQueue Interface

java.util.concurrent.BlockingQueue 接口是 Queue 接口的子接口,并且另外支持涉及在检索某个元素之前等待队列变为非空,并在存储某个元素之前等待队列中可用的空间等操作。

BlockingQueue Methods

Sr.No.

Method & Description

1

boolean add(E e) 如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,则返回 true,如果当前没有可用空间,则返回 true 并抛出 IllegalStateException。

2

boolean contains(Object o) 如果此队列包含指定的元素,则返回 true。

3

int drainTo(Collection<? super E> c) 从此队列中删除所有可用的元素,并将它们添加到给定的集合中。

4

int drainTo(Collection<? super E> c, int maxElements) 从此队列中最多删除给定数量的可用的元素,并将它们添加到给定的集合中。

5

boolean offer(E e) 如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,则返回 true,如果当前没有可用空间,则返回 false。

6

boolean offer(E e, long timeout, TimeUnit unit) 插入指定的元素到此队列中,如果需要,等待相应的时间直到有可用空间为止。

7

E poll(long timeout, TimeUnit unit) 检索并删除此队列的头部,如果需要,等待相应的时间直到有元素可用为止。

8

void put(E e) 插入指定的元素到此队列中,如果需要,等待直到有可用空间为止。

9

int remainingCapacity() 返回此队列在理想条件下(不存在内存或资源限制)可以接受的元素数量,如果没有固有限制,则返回 Integer.MAX_VALUE。

10

boolean remove(Object o) 从此队列中删除指定的元素的一个实例(如果存在的话)。

11

E take() 检索并删除此队列的头部,如果需要,等待直到有元素可用为止。

Example

以下 TestThread 程序演示了线程基础环境中 BlockingQueue 接口的用法。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {

         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

Output

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27