Java Concurrency 简明教程

Java Concurrency - BlockingQueue Interface

java.util.concurrent.BlockingQueue 接口是 Queue 接口的子接口,并且另外支持涉及在检索某个元素之前等待队列变为非空,并在存储某个元素之前等待队列中可用的空间等操作。

A java.util.concurrent.BlockingQueue interface is a subinterface of Queue interface, and additionally supports operations such as waiting for the queue to become non-empty before retrieving an element, and wait for space to become available in the queue before storing an element.

BlockingQueue Methods

Sr.No.

Method & Description

1

boolean add(E e) Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available.

2

boolean contains(Object o) Returns true if this queue contains the specified element.

3

int drainTo(Collection<? super E> c) Removes all available elements from this queue and adds them to the given collection.

4

int drainTo(Collection<? super E> c, int maxElements) Removes at most the given number of available elements from this queue and adds them to the given collection.

5

boolean offer(E e) Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available.

6

boolean offer(E e, long timeout, TimeUnit unit) Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.

7

E poll(long timeout, TimeUnit unit) Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.

8

void put(E e) Inserts the specified element into this queue, waiting if necessary for space to become available.

9

int remainingCapacity() Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.

10

boolean remove(Object o) Removes a single instance of the specified element from this queue, if it is present.

11

E take() Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

Example

以下 TestThread 程序演示了线程基础环境中 BlockingQueue 接口的用法。

The following TestThread program shows usage of BlockingQueue interface in thread based environment.

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {

         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27