Java Concurrency 简明教程

Java Concurrency - Quick Guide

Java Concurrency - Overview

Java 是一种多线程编程语言,这意味着我们可以使用 Java 开发多线程程序。多线程程序包含两个或更多可以同时运行的部分,每个部分都可以在同一时间处理不同的任务,从而优化利用可用资源,特别是在您的计算机有多个 CPU 时。

Java is a multi-threaded programming language which means we can develop multi-threaded program using Java. A multi-threaded program contains two or more parts that can run concurrently and each part can handle a different task at the same time making optimal use of the available resources specially when your computer has multiple CPUs.

根据定义,多任务处理是多个进程共享公共处理资源(如 CPU)时发生的情况。多线程将多任务处理的概念扩展到应用程序中,您可以在其中将单个应用程序内的特定操作细分为各个线程。每个线程都可以并行运行。操作系统不仅在不同应用程序之间划分处理时间,还在应用程序内的每个线程之间划分处理时间。

By definition, multitasking is when multiple processes share common processing resources such as a CPU. Multi-threading extends the idea of multitasking into applications where you can subdivide specific operations within a single application into individual threads. Each of the threads can run in parallel. The OS divides processing time not only among different applications, but also among each thread within an application.

通过使用多线程,您可以以在相同程序中可以并行执行多项活动的方式来编写代码。

Multi-threading enables you to write in a way where multiple activities can proceed concurrently in the same program.

Life Cycle of a Thread

线程在其生命周期中经过不同的阶段。例如,一个线程诞生、启动、运行,然后死亡。下图显示了线程的完整生命周期。

A thread goes through various stages in its life cycle. For example, a thread is born, started, runs, and then dies. The following diagram shows the complete life cycle of a thread.

Thread Life Cycle

以下是生命周期的阶段 -

Following are the stages of the life cycle −

  1. New − A new thread begins its life cycle in the new state. It remains in this state until the program starts the thread. It is also referred to as a born thread.

  2. Runnable − After a newly born thread is started, the thread becomes runnable. A thread in this state is considered to be executing its task.

  3. Waiting − Sometimes, a thread transitions to the waiting state while the thread waits for another thread to perform a task. A thread transitions back to the runnable state only when another thread signals the waiting thread to continue executing.

  4. Timed Waiting − A runnable thread can enter the timed waiting state for a specified interval of time. A thread in this state transitions back to the runnable state when that time interval expires or when the event it is waiting for occurs.

  5. Terminated (Dead) − A runnable thread enters the terminated state when it completes its task or otherwise terminates.

Thread Priorities

每个 Java 线程都有一个优先级,这有助于操作系统确定线程调度的顺序。

Every Java thread has a priority that helps the operating system determine the order in which threads are scheduled.

Java 线程优先级介于 MIN_PRIORITY(常量为 1)和 MAX_PRIORITY(常量为 10)之间。默认情况下,每个线程都获得 NORM_PRIORITY(常量为 5)优先级。

Java thread priorities are in the range between MIN_PRIORITY (a constant of 1) and MAX_PRIORITY (a constant of 10). By default, every thread is given priority NORM_PRIORITY (a constant of 5).

具有较高优先级的线程对程序而言更为重要,并且应该在较低优先级的线程之前分配处理器时间。但是,线程优先级不能保证线程执行的顺序,并且很大程度上取决于平台。

Threads with higher priority are more important to a program and should be allocated processor time before lower-priority threads. However, thread priorities cannot guarantee the order in which threads execute and are very much platform dependent.

Create a Thread by Implementing a Runnable Interface

如果准备将您的类作为线程执行,那么可以通过实现 Runnable 接口实现此目的。您需要遵循三个基本步骤 −

If your class is intended to be executed as a thread then you can achieve this by implementing a Runnable interface. You will need to follow three basic steps −

Step 1

第一步,您需要实现 Runnable 接口提供的 run() 方法。此方法为线程提供了一个入口点,您需要将完整业务逻辑放入此方法内。以下是 run() 方法的一个简单语法:

As a first step, you need to implement a run() method provided by a Runnable interface. This method provides an entry point for the thread and you will put your complete business logic inside this method. Following is a simple syntax of the run() method −

public void run( )

Step 2

作为第二步,您将使用以下构造函数实例化一个 Thread 对象 −

As a second step, you will instantiate a Thread object using the following constructor −

Thread(Runnable threadObj, String threadName);

其中,threadObj 是实现 Runnable 接口的类的实例,threadName 是赋予新线程的名称。

Where, threadObj is an instance of a class that implements the Runnable interface and threadName is the name given to the new thread.

Step 3

一旦创建了 Thread 对象,就可以通过调用 start() 方法来启动它,它执行对 run( ) 方法的调用。以下是 start() 方法的简单语法 -

Once a Thread object is created, you can start it by calling start() method, which executes a call to run( ) method. Following is a simple syntax of start() method −

void start();

Example

下面是创建一个新线程并开始运行它的示例:

Here is an example that creates a new thread and starts running it −

class RunnableDemo implements Runnable {
   private Thread t;
   private String threadName;

   RunnableDemo(String name) {
      threadName = name;
      System.out.println("Creating " +  threadName );
   }

   public void run() {
      System.out.println("Running " +  threadName );

      try {

         for(int i = 4; i > 0; i--) {
            System.out.println("Thread: " + threadName + ", " + i);

            // Let the thread sleep for a while.
            Thread.sleep(50);
         }
      } catch (InterruptedException e) {
         System.out.println("Thread " +  threadName + " interrupted.");
      }
      System.out.println("Thread " +  threadName + " exiting.");
   }

   public void start () {
      System.out.println("Starting " +  threadName );

      if (t == null) {
         t = new Thread (this, threadName);
         t.start ();
      }
   }
}

public class TestThread {

   public static void main(String args[]) {
      RunnableDemo R1 = new RunnableDemo("Thread-1");
      R1.start();

      RunnableDemo R2 = new RunnableDemo("Thread-2");
      R2.start();
   }
}

这会产生以下结果 −

This will produce the following result −

Output

Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.

Create a Thread by Extending a Thread Class

创建线程的第二种方法是创建一个扩展 Thread 类的类,使用以下两个简单步骤。这种方法在使用 Thread 类中可用方法创建多个线程时提供了更多的灵活性。

The second way to create a thread is to create a new class that extends Thread class using the following two simple steps. This approach provides more flexibility in handling multiple threads created using available methods in Thread class.

Step 1

您需要覆盖 Thread 类中可用的 run( ) 方法。此方法为线程提供一个入口点,您将把您的完整业务逻辑放在这个方法中。以下是 run() 方法的简单语法:

You will need to override run( ) method available in Thread class. This method provides an entry point for the thread and you will put your complete business logic inside this method. Following is a simple syntax of run() method −

public void run( )

Step 2

一旦创建 Thread 对象,便可以通过调用 start() 方法来启动它,该调用会执行一个 run( ) 方法调用。以下是 start() 方法的一个简单语法 −

Once Thread object is created, you can start it by calling start() method, which executes a call to run( ) method. Following is a simple syntax of start() method −

void start( );

Example

以下是按照 Thread 扩展重写的先前程序:

Here is the preceding program rewritten to extend the Thread −

class ThreadDemo extends Thread {
   private Thread t;
   private String threadName;

   ThreadDemo(String name) {
      threadName = name;
      System.out.println("Creating " +  threadName );
   }

   public void run() {
      System.out.println("Running " +  threadName );

      try {

         for(int i = 4; i > 0; i--) {
            System.out.println("Thread: " + threadName + ", " + i);

            // Let the thread sleep for a while.
            Thread.sleep(50);
         }
      } catch (InterruptedException e) {
         System.out.println("Thread " +  threadName + " interrupted.");
      }
      System.out.println("Thread " +  threadName + " exiting.");
   }

   public void start () {
      System.out.println("Starting " +  threadName );

      if (t == null) {
         t = new Thread (this, threadName);
         t.start ();
      }
   }
}

public class TestThread {

   public static void main(String args[]) {
      ThreadDemo T1 = new ThreadDemo("Thread-1");
      T1.start();

      ThreadDemo T2 = new ThreadDemo("Thread-2");
      T2.start();
   }
}

这会产生以下结果 −

This will produce the following result −

Output

Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.

Java Concurrency - Environment Setup

在本章,我们将讨论为 Java 设置宜人环境的事物的不同方面。

In this chapter, we will discuss on the different aspects of setting up a congenial environment for Java.

Local Environment Setup

如果您仍然希望为 Java 编程语言设置您的环境,本文将指导您如何在您的机器上下载和设置 Java。以下是设置环境的步骤。

If you are still willing to set up your environment for Java programming language, then this section guides you on how to download and set up Java on your machine. Following are the steps to set up the environment.

Java SE 可以通过链接 Download Java 免费获得。您可以根据您的操作系统下载一个版本。

Java SE is freely available from the link Download Java. You can download a version based on your operating system.

按照说明下载 Java 并运行 .exe 以在您的机器上安装 Java。一旦您在您的机器上安装了 Java,您将需要设置环境变量来指向正确的安装目录 −

Follow the instructions to download Java and run the .exe to install Java on your machine. Once you installed Java on your machine, you will need to set environment variables to point to correct installation directories −

Setting Up the Path for Windows

假设你已将 Java 安装在 c:\Program Files\java\jdk 目录中 −

Assuming you have installed Java in c:\Program Files\java\jdk directory −

  1. Right-click on 'My Computer' and select 'Properties'.

  2. Click the 'Environment variables' button under the 'Advanced' tab.

  3. Now, alter the 'Path' variable so that it also contains the path to the Java executable. Example, if the path is currently set to 'C:\WINDOWS\SYSTEM32', then change your path to read 'C:\WINDOWS\SYSTEM32;c:\Program Files\java\jdk\bin'.

Setting Up the Path for Linux, UNIX, Solaris, FreeBSD

应该设置环境变量 PATH 以指向已安装 Java 二进制文件的位置。如果您遇到困难,请参阅您的 shell 文档。

Environment variable PATH should be set to point to where the Java binaries have been installed. Refer to your shell documentation, if you have trouble doing this.

例如,如果您使用 bash 作为您的 shell,那么您将向 '.bashrc 的尾部添加以下行: export PATH = /path/to/java:$PATH'

Example, if you use bash as your shell, then you would add the following line to the end of your '.bashrc: export PATH = /path/to/java:$PATH'

要编写 Java 程序,您将需要一个文本编辑器。市场上有更复杂的 IDE。但现在,您可以考虑以下内容之一 −

To write your Java programs, you will need a text editor. There are even more sophisticated IDEs available in the market. But for now, you can consider one of the following −

  1. Notepad − On Windows machine, you can use any simple text editor like Notepad (Recommended for this tutorial), TextPad.

  2. Netbeans − A Java IDE that is open-source and free which can be downloaded from https://netbeans.org/index.html.

  3. Eclipse − A Java IDE developed by the eclipse open-source community and can be downloaded from https://www.eclipse.org/.

Java Concurrency - Major Operations

核心 Java 对多线程程序提供了完全控制。您可以开发多线程程序,根据您的要求完全暂停、恢复或停止。有各种静态方法,您可以在线程对象上使用它们来控制它们的 behavior。下表列出了这些方法 −

Core Java provides complete control over multithreaded program. You can develop a multithreaded program which can be suspended, resumed, or stopped completely based on your requirements. There are various static methods which you can use on thread objects to control their behavior. Following table lists down those methods −

Sr.No.

Method & Description

1

public void suspend() This method puts a thread in the suspended state and can be resumed using resume() method.

2

public void stop() This method stops a thread completely.

3

public void resume() This method resumes a thread, which was suspended using suspend() method.

4

public void wait() Causes the current thread to wait until another thread invokes the notify().

5

public void notify() Wakes up a single thread that is waiting on this object’s monitor.

注意,Java 的最新版本已声明 suspend( )、resume( ) 和 stop( ) 方法已弃用,因此你需要使用可用的替代方法。

Be aware that the latest versions of Java has deprecated the usage of suspend( ), resume( ), and stop( ) methods and so you need to use available alternatives.

Example

class RunnableDemo implements Runnable {
   public Thread t;
   private String threadName;
   boolean suspended = false;

   RunnableDemo(String name) {
      threadName = name;
      System.out.println("Creating " +  threadName );
   }

   public void run() {
      System.out.println("Running " +  threadName );

      try {

         for(int i = 10; i > 0; i--) {
            System.out.println("Thread: " + threadName + ", " + i);

            // Let the thread sleep for a while.
            Thread.sleep(300);

            synchronized(this) {

               while(suspended) {
                  wait();
               }
            }
         }
      } catch (InterruptedException e) {
         System.out.println("Thread " +  threadName + " interrupted.");
      }
      System.out.println("Thread " +  threadName + " exiting.");
   }

   public void start () {
      System.out.println("Starting " +  threadName );

      if (t == null) {
         t = new Thread (this, threadName);
         t.start ();
      }
   }

   void suspend() {
      suspended = true;
   }

   synchronized void resume() {
      suspended = false;
      notify();
   }
}

public class TestThread {

   public static void main(String args[]) {
      RunnableDemo R1 = new RunnableDemo("Thread-1");
      R1.start();

      RunnableDemo R2 = new RunnableDemo("Thread-2");
      R2.start();

      try {
         Thread.sleep(1000);
         R1.suspend();
         System.out.println("Suspending First Thread");
         Thread.sleep(1000);
         R1.resume();
         System.out.println("Resuming First Thread");

         R2.suspend();
         System.out.println("Suspending thread Two");
         Thread.sleep(1000);
         R2.resume();
         System.out.println("Resuming thread Two");
      } catch (InterruptedException e) {
         System.out.println("Main thread Interrupted");
      } try {
         System.out.println("Waiting for threads to finish.");
         R1.t.join();
         R2.t.join();
      } catch (InterruptedException e) {
         System.out.println("Main thread Interrupted");
      }
      System.out.println("Main thread exiting.");
   }
}

以上程序产生以下输出 −

The above program produces the following output −

Output

Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 10
Running Thread-2
Thread: Thread-2, 10
Thread: Thread-1, 9
Thread: Thread-2, 9
Thread: Thread-1, 8
Thread: Thread-2, 8
Thread: Thread-1, 7
Thread: Thread-2, 7
Suspending First Thread
Thread: Thread-2, 6
Thread: Thread-2, 5
Thread: Thread-2, 4
Resuming First Thread
Suspending thread Two
Thread: Thread-1, 6
Thread: Thread-1, 5
Thread: Thread-1, 4
Thread: Thread-1, 3
Resuming thread Two
Thread: Thread-2, 3
Waiting for threads to finish.
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
Main thread exiting.

Interthread Communication

如果您了解进程间通信,那么您将很容易理解线程间通信。当您开发两个或更多线程交换某些信息的应用程序时,线程间通信非常重要。

If you are aware of interprocess communication then it will be easy for you to understand interthread communication. Interthread communication is important when you develop an application where two or more threads exchange some information.

有三个简单的方法和一个使线程通信成为可能的小技巧。所有三个方法如下所列 −

There are three simple methods and a little trick which makes thread communication possible. All the three methods are listed below −

Sr.No.

Method & Description

1

public void wait() Causes the current thread to wait until another thread invokes the notify().

2

public void notify() Wakes up a single thread that is waiting on this object’s monitor.

3

public void notifyAll() Wakes up all the threads that called wait( ) on the same object.

这些方法已作为 final 中的对象方法实现,因此在所有类中均可用。所有三个方法只能在 synchronized 上下文中调用。

These methods have been implemented as final methods in Object, so they are available in all the classes. All three methods can be called only from within a synchronized context.

Example

此示例显示了两个线程如何使用 wait()notify() 方法进行通信。您可以使用相同概念创建复杂系统。

This examples shows how two threads can communicate using wait() and notify() method. You can create a complex system using the same concept.

class Chat {
   boolean flag = false;

   public synchronized void Question(String msg) {

      if (flag) {

         try {
            wait();
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
      System.out.println(msg);
      flag = true;
      notify();
   }

   public synchronized void Answer(String msg) {

      if (!flag) {

         try {
            wait();
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
      System.out.println(msg);
      flag = false;
      notify();
   }
}

class T1 implements Runnable {
   Chat m;
   String[] s1 = { "Hi", "How are you ?", "I am also doing fine!" };

   public T1(Chat m1) {
      this.m = m1;
      new Thread(this, "Question").start();
   }

   public void run() {

      for (int i = 0; i < s1.length; i++) {
         m.Question(s1[i]);
      }
   }
}

class T2 implements Runnable {
   Chat m;
   String[] s2 = { "Hi", "I am good, what about you?", "Great!" };

   public T2(Chat m2) {
      this.m = m2;
      new Thread(this, "Answer").start();
   }

   public void run() {

      for (int i = 0; i < s2.length; i++) {
         m.Answer(s2[i]);
      }
   }
}

public class TestThread {

   public static void main(String[] args) {
      Chat m = new Chat();
      new T1(m);
      new T2(m);
   }
}

当上面的程序被编译并执行时,它会产生以下结果 -

When the above program is complied and executed, it produces the following result −

Output

Hi
Hi
How are you ?
I am good, what about you?
I am also doing fine!
Great!

本文中的示例已从 [[role="bare"] [role="bare"]https://stackoverflow.com/questions/2170520/inter-thread-communication-in-java ] 中提取并修改。

Above example has been taken and then modified from [[role="bare"]https://stackoverflow.com/questions/2170520/inter-thread-communication-in-java]

Java Concurrency - Synchronization

Multithreading Example with Synchronization

这里有一个在序列中打印 counter 值的相同示例,我们每次运行它时,它都会产生相同的结果。

Here is the same example which prints counter value in sequence and every time we run it, it produces the same result.

Example

class PrintDemo {

   public void printCount() {

      try {

         for(int i = 5; i > 0; i--) {
            System.out.println("Counter   ---   "  + i );
         }
      } catch (Exception e) {
         System.out.println("Thread  interrupted.");
      }
   }
}

class ThreadDemo extends Thread {
   private Thread t;
   private String threadName;
   PrintDemo  PD;

   ThreadDemo(String name,  PrintDemo pd) {
      threadName = name;
      PD = pd;
   }

   public void run() {

      synchronized(PD) {
         PD.printCount();
      }
      System.out.println("Thread " +  threadName + " exiting.");
   }

   public void start () {
      System.out.println("Starting " +  threadName );

      if (t == null) {
         t = new Thread (this, threadName);
         t.start ();
      }
   }
}

public class TestThread {

   public static void main(String args[]) {
      PrintDemo PD = new PrintDemo();

      ThreadDemo T1 = new ThreadDemo("Thread - 1 ", PD);
      ThreadDemo T2 = new ThreadDemo("Thread - 2 ", PD);

      T1.start();
      T2.start();

      // wait for threads to end
      try {
         T1.join();
         T2.join();
      } catch (Exception e) {
         System.out.println("Interrupted");
      }
   }
}

每次运行此程序都会产生相同的结果 −

This produces the same result every time you run this program −

Output

Starting Thread - 1
Starting Thread - 2
Counter   ---   5
Counter   ---   4
Counter   ---   3
Counter   ---   2
Counter   ---   1
Thread Thread - 1  exiting.
Counter   ---   5
Counter   ---   4
Counter   ---   3
Counter   ---   2
Counter   ---   1
Thread Thread - 2  exiting.

Java Concurrency - Deadlock

死锁描述了两个或多个线程被永久阻塞的情况,等待彼此。当多个线程需要相同锁但按不同顺序获取它们时,会发生死锁。Java 多线程程序可能会受到死锁条件的影响,因为 synchronized 关键字会导致执行线程在等待与指定对象关联的锁或监视器的同时阻塞。这里有一个例子。

Deadlock describes a situation where two or more threads are blocked forever, waiting for each other. Deadlock occurs when multiple threads need the same locks but obtain them in different order. A Java multithreaded program may suffer from the deadlock condition because the synchronized keyword causes the executing thread to block while waiting for the lock, or monitor, associated with the specified object. Here is an example.

Example

public class TestThread {
   public static Object Lock1 = new Object();
   public static Object Lock2 = new Object();

   public static void main(String args[]) {
      ThreadDemo1 T1 = new ThreadDemo1();
      ThreadDemo2 T2 = new ThreadDemo2();
      T1.start();
      T2.start();
   }

   private static class ThreadDemo1 extends Thread {

      public void run() {

         synchronized (Lock1) {
            System.out.println("Thread 1: Holding lock 1...");

            try {
               Thread.sleep(10);
            } catch (InterruptedException e) {}
            System.out.println("Thread 1: Waiting for lock 2...");

            synchronized (Lock2) {
               System.out.println("Thread 1: Holding lock 1 & 2...");
            }
         }
      }
   }

   private static class ThreadDemo2 extends Thread {

      public void run() {

         synchronized (Lock2) {
            System.out.println("Thread 2: Holding lock 2...");

            try {
               Thread.sleep(10);
            } catch (InterruptedException e) {}
            System.out.println("Thread 2: Waiting for lock 1...");

            synchronized (Lock1) {
               System.out.println("Thread 2: Holding lock 1 & 2...");
            }
         }
      }
   }
}

当您编译并执行上述程序时,您会发现死锁情况,以下是程序产生的输出 −

When you compile and execute the above program, you find a deadlock situation and following is the output produced by the program −

Output

Thread 1: Holding lock 1...
Thread 2: Holding lock 2...
Thread 1: Waiting for lock 2...
Thread 2: Waiting for lock 1...

上述程序将永久挂起,因为该位置的线程既无法进行,又等待彼此释放锁,所以您可以通过按 CTRL+C 退出程序。

The above program will hang forever because neither of the threads in position to proceed and waiting for each other to release the lock, so you can come out of the program by pressing CTRL+C.

Deadlock Solution Example

让我们更改锁的顺序并运行相同的程序,看看这两个线程是否仍在互相等待 −

Let’s change the order of the lock and run of the same program to see if both the threads still wait for each other −

Example

public class TestThread {
   public static Object Lock1 = new Object();
   public static Object Lock2 = new Object();

   public static void main(String args[]) {
      ThreadDemo1 T1 = new ThreadDemo1();
      ThreadDemo2 T2 = new ThreadDemo2();
      T1.start();
      T2.start();
   }

   private static class ThreadDemo1 extends Thread {

      public void run() {

         synchronized (Lock1) {
            System.out.println("Thread 1: Holding lock 1...");

            try {
               Thread.sleep(10);
            } catch (InterruptedException e) {}
            System.out.println("Thread 1: Waiting for lock 2...");

            synchronized (Lock2) {
               System.out.println("Thread 1: Holding lock 1 & 2...");
            }
         }
      }
   }

   private static class ThreadDemo2 extends Thread {

      public void run() {

         synchronized (Lock1) {
            System.out.println("Thread 2: Holding lock 1...");

            try {
               Thread.sleep(10);
            } catch (InterruptedException e) {}
            System.out.println("Thread 2: Waiting for lock 2...");

            synchronized (Lock2) {
               System.out.println("Thread 2: Holding lock 1 & 2...");
            }
         }
      }
   }
}

因此,仅仅更改锁的顺序就可以防止程序进入死锁情况,并完成以下结果 −

So just changing the order of the locks prevent the program in going into a deadlock situation and completes with the following result −

Output

Thread 1: Holding lock 1...
Thread 1: Waiting for lock 2...
Thread 1: Holding lock 1 & 2...
Thread 2: Holding lock 1...
Thread 2: Waiting for lock 2...
Thread 2: Holding lock 1 & 2...

上述示例只是为了说明这个概念,然而,这是一个复杂的概念,在您开发应用程序以处理死锁情况之前,您应该深入了解它。

The above example is to just make the concept clear, however, it is a complex concept and you should deep dive into it before you develop your applications to deal with deadlock situations.

Java Concurrency - ThreadLocal Class

ThreadLocal 类用于创建线程局部变量,该变量只能被同一线程读写。例如,如果两个线程正在访问对同一 threadLocal 变量有引用的代码,那么每个线程将看不到其他线程对 threadLocal 变量所做的任何修改。

The ThreadLocal class is used to create thread local variables which can only be read and written by the same thread. For example, if two threads are accessing code having reference to same threadLocal variable then each thread will not see any modification to threadLocal variable done by other thread.

ThreadLocal Methods

以下是 ThreadLocal 类中可用的重要方法列表。

Following is the list of important methods available in the ThreadLocal class.

Sr.No.

Method & Description

1

public T get() Returns the value in the current thread’s copy of this thread-local variable.

2

protected T initialValue() Returns the current thread’s "initial value" for this thread-local variable.

3

public void remove() Removes the current thread’s value for this thread-local variable.

4

public void set(T value) Sets the current thread’s copy of this thread-local variable to the specified value.

Example

以下 TestThread 程序展示了 ThreadLocal 类的其中一些方法。此处我们使用了两个计数器变量,一个是普通变量,另一个是 ThreadLocal。

The following TestThread program demonstrates some of these methods of the ThreadLocal class. Here we’ve used two counter variable, one is normal variable and another one is ThreadLocal.

class RunnableDemo implements Runnable {
   int counter;
   ThreadLocal<Integer> threadLocalCounter = new ThreadLocal<Integer>();

   public void run() {
      counter++;

      if(threadLocalCounter.get() != null) {
         threadLocalCounter.set(threadLocalCounter.get().intValue() + 1);
      } else {
         threadLocalCounter.set(0);
      }
      System.out.println("Counter: " + counter);
      System.out.println("threadLocalCounter: " + threadLocalCounter.get());
   }
}

public class TestThread {

   public static void main(String args[]) {
      RunnableDemo commonInstance = new RunnableDemo();

      Thread t1 = new Thread(commonInstance);
      Thread t2 = new Thread(commonInstance);
      Thread t3 = new Thread(commonInstance);
      Thread t4 = new Thread(commonInstance);

      t1.start();
      t2.start();
      t3.start();
      t4.start();

      // wait for threads to end
      try {
         t1.join();
         t2.join();
         t3.join();
         t4.join();
      } catch (Exception e) {
         System.out.println("Interrupted");
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Counter: 1
threadLocalCounter: 0
Counter: 2
threadLocalCounter: 0
Counter: 3
threadLocalCounter: 0
Counter: 4
threadLocalCounter: 0

可以看到计数器的值由每个线程增加,而 threadLocalCounter 对于每个线程都保持为 0。

You can see the value of counter is increased by each thread, but threadLocalCounter remains 0 for each thread.

ThreadLocalRandom Class

java.util.concurrent.ThreadLocalRandom 是从 jdk 1.7 引入的一个实用程序类,当多个线程或 ForkJoinTasks 需要生成随机数时非常有用。它能提升性能,争用比 Math.random() 方法少。

A java.util.concurrent.ThreadLocalRandom is a utility class introduced from jdk 1.7 onwards and is useful when multiple threads or ForkJoinTasks are required to generate random numbers. It improves performance and have less contention than Math.random() method.

ThreadLocalRandom Methods

以下是 ThreadLocalRandom 类中重要的可用方法的列表。

Following is the list of important methods available in the ThreadLocalRandom class.

Sr.No.

Method & Description

1

public static ThreadLocalRandom current() Returns the current thread’s ThreadLocalRandom.

2

protected int next(int bits) Generates the next pseudorandom number.

3

public double nextDouble(double n) Returns a pseudorandom, uniformly distributed double value between 0 (inclusive) and the specified value (exclusive).

4

public double nextDouble(double least, double bound) Returns a pseudorandom, uniformly distributed value between the given least value (inclusive) and bound (exclusive).

5

public int nextInt(int least, int bound) Returns a pseudorandom, uniformly distributed value between the given least value (inclusive) and bound (exclusive).

6

public long nextLong(long n) Returns a pseudorandom, uniformly distributed value between 0 (inclusive) and the specified value (exclusive).

7

public long nextLong(long least, long bound) Returns a pseudorandom, uniformly distributed value between the given least value (inclusive) and bound (exclusive).

8

public void setSeed(long seed) Throws UnsupportedOperationException.

Example

以下 TestThread 程序展示了 Lock 接口的其中一些方法。此处我们使用了 lock() 获取锁,使用了 unlock() 释放锁。

The following TestThread program demonstrates some of these methods of the Lock interface. Here we’ve used lock() to acquire the lock and unlock() to release the lock.

import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadLocalRandom;

public class TestThread {

   public static void main(final String[] arguments) {
      System.out.println("Random Integer: " + new Random().nextInt());
      System.out.println("Seeded Random Integer: " + new Random(15).nextInt());
      System.out.println(
         "Thread Local Random Integer: " + ThreadLocalRandom.current().nextInt());

      final ThreadLocalRandom random = ThreadLocalRandom.current();
      random.setSeed(15); //exception will come as seeding is not allowed in ThreadLocalRandom.
      System.out.println("Seeded Thread Local Random Integer: " + random.nextInt());
   }
}

这将产生以下结果。

This will produce the following result.

Output

Random Integer: 1566889198
Seeded Random Integer: -1159716814
Thread Local Random Integer: 358693993
Exception in thread "main" java.lang.UnsupportedOperationException
        at java.util.concurrent.ThreadLocalRandom.setSeed(Unknown Source)
        at TestThread.main(TestThread.java:21)

此处我们使用了 ThreadLocalRandom 和 Random 类获取随机数。

Here we’ve used ThreadLocalRandom and Random classes to get random numbers.

Java Concurrency - Lock Interface

java.util.concurrent.locks.Lock 接口用作与同步块类似的线程同步机制。新的锁定机制更加灵活,并提供比同步块更多的选项。Lock 与同步块之间的主要区别如下 −

A java.util.concurrent.locks.Lock interface is used to as a thread synchronization mechanism similar to synchronized blocks. New Locking mechanism is more flexible and provides more options than a synchronized block. Main differences between a Lock and a synchronized block are following −

  1. Guarantee of sequence − Synchronized block does not provide any guarantee of sequence in which waiting thread will be given access. Lock interface handles it.

  2. No timeout − Synchronized block has no option of timeout if lock is not granted. Lock interface provides such option.

  3. Single method − Synchronized block must be fully contained within a single method whereas a lock interface’s methods lock() and unlock() can be called in different methods.

Lock Methods

以下是 Lock 类中可用的重要方法列表。

Following is the list of important methods available in the Lock class.

Sr.No.

Method & Description

1

public void lock() Acquires the lock.

2

public void lockInterruptibly() Acquires the lock unless the current thread is interrupted.

3

public Condition newCondition() Returns a new Condition instance that is bound to this Lock instance.

4

public boolean tryLock() Acquires the lock only if it is free at the time of invocation.

5

public boolean tryLock(long time, TimeUnit unit) Acquires the lock if it is free within the given waiting time and the current thread has not been interrupted.

6

public void unlock() Releases the lock.

Example

以下 TestThread 程序展示了 Lock 接口的其中一些方法。此处我们使用了 lock() 获取锁,使用了 unlock() 释放锁。

The following TestThread program demonstrates some of these methods of the Lock interface. Here we’ve used lock() to acquire the lock and unlock() to release the lock.

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class PrintDemo {
   private final Lock queueLock = new ReentrantLock();

   public void print() {
      queueLock.lock();

      try {
         Long duration = (long) (Math.random() * 10000);
         System.out.println(Thread.currentThread().getName()
            + "  Time Taken " + (duration / 1000) + " seconds.");
         Thread.sleep(duration);
      } catch (InterruptedException e) {
         e.printStackTrace();
      } finally {
         System.out.printf(
            "%s printed the document successfully.\n", Thread.currentThread().getName());
         queueLock.unlock();
      }
   }
}

class ThreadDemo extends Thread {
   PrintDemo  printDemo;

   ThreadDemo(String name,  PrintDemo printDemo) {
      super(name);
      this.printDemo = printDemo;
   }

   @Override
   public void run() {
      System.out.printf(
         "%s starts printing a document\n", Thread.currentThread().getName());
      printDemo.print();
   }
}

public class TestThread {

   public static void main(String args[]) {
      PrintDemo PD = new PrintDemo();

      ThreadDemo t1 = new ThreadDemo("Thread - 1 ", PD);
      ThreadDemo t2 = new ThreadDemo("Thread - 2 ", PD);
      ThreadDemo t3 = new ThreadDemo("Thread - 3 ", PD);
      ThreadDemo t4 = new ThreadDemo("Thread - 4 ", PD);

      t1.start();
      t2.start();
      t3.start();
      t4.start();
   }
}

这将产生以下结果。

This will produce the following result.

Output

Thread - 1  starts printing a document
Thread - 4  starts printing a document
Thread - 3  starts printing a document
Thread - 2  starts printing a document
Thread - 1   Time Taken 4 seconds.
Thread - 1  printed the document successfully.
Thread - 4   Time Taken 3 seconds.
Thread - 4  printed the document successfully.
Thread - 3   Time Taken 5 seconds.
Thread - 3  printed the document successfully.
Thread - 2   Time Taken 4 seconds.
Thread - 2  printed the document successfully.

这里我们使用 ReentrantLock 类作为 Lock 接口的实现。ReentrantLock 类允许一个线程锁定方法,即使它已经在另一个方法上拥有锁。

We’ve use ReentrantLock class as an implementation of Lock interface here. ReentrantLock class allows a thread to lock a method even if it already have the lock on other method.

Java Concurrency - ReadWriteLock Interface

一个 java.util.concurrent.locks.ReadWriteLock 接口允许多个线程一次读取,但一次只能有一个线程写入。

A java.util.concurrent.locks.ReadWriteLock interface allows multiple threads to read at a time but only one thread can write at a time.

  1. Read Lock − If no thread has locked the ReadWriteLock for writing then multiple thread can access the read lock.

  2. Write Lock − If no thread is reading or writing, then one thread can access the write lock.

Lock Methods

以下是 Lock 类中可用的重要方法列表。

Following is the list of important methods available in the Lock class.

Sr.No.

Method & Description

1

public Lock readLock() Returns the lock used for reading.

2

public Lock writeLock() Returns the lock used for writing.

Example

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class TestThread {
   private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
   private static String message = "a";

   public static void main(String[] args) throws InterruptedException {
      Thread t1 = new Thread(new WriterA());
      t1.setName("Writer A");

      Thread t2 = new Thread(new WriterB());
      t2.setName("Writer B");

      Thread t3 = new Thread(new Reader());
      t3.setName("Reader");
      t1.start();
      t2.start();
      t3.start();
      t1.join();
      t2.join();
      t3.join();
   }

   static class Reader implements Runnable {

      public void run() {

         if(lock.isWriteLocked()) {
            System.out.println("Write Lock Present.");
         }
         lock.readLock().lock();

         try {
            Long duration = (long) (Math.random() * 10000);
            System.out.println(Thread.currentThread().getName()
               + "  Time Taken " + (duration / 1000) + " seconds.");
            Thread.sleep(duration);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            System.out.println(Thread.currentThread().getName() +": "+ message );
            lock.readLock().unlock();
         }
      }
   }

   static class WriterA implements Runnable {

      public void run() {
         lock.writeLock().lock();

         try {
            Long duration = (long) (Math.random() * 10000);
            System.out.println(Thread.currentThread().getName()
               + "  Time Taken " + (duration / 1000) + " seconds.");
            Thread.sleep(duration);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            message = message.concat("a");
            lock.writeLock().unlock();
         }
      }
   }

   static class WriterB implements Runnable {

      public void run() {
         lock.writeLock().lock();

         try {
            Long duration = (long) (Math.random() * 10000);
            System.out.println(Thread.currentThread().getName()
               + "  Time Taken " + (duration / 1000) + " seconds.");
            Thread.sleep(duration);
         } catch (InterruptedException e) {
            e.printStackTrace();
         } finally {
            message = message.concat("b");
            lock.writeLock().unlock();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Writer A  Time Taken 6 seconds.
Write Lock Present.
Writer B  Time Taken 2 seconds.
Reader  Time Taken 0 seconds.
Reader: aab

Java Concurrency - Condition Interface

java.util.concurrent.locks.Condition 接口为线程提供暂停其执行的能力,直到给定的条件为真。一个 Condition 对象必然绑定到一个 Lock,并且通过 newCondition() 方法获得。

A java.util.concurrent.locks.Condition interface provides a thread ability to suspend its execution, until the given condition is true. A Condition object is necessarily bound to a Lock and to be obtained using the newCondition() method.

Condition Methods

以下是 Condition 类中可用的重要方法列表。

Following is the list of important methods available in the Condition class.

Sr.No.

Method & Description

1

public void await() Causes the current thread to wait until it is signalled or interrupted.

2

public boolean await(long time, TimeUnit unit) Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.

3

public long awaitNanos(long nanosTimeout) Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses.

4

public long awaitUninterruptibly() Causes the current thread to wait until it is signalled.

5

public long awaitUntil() Causes the current thread to wait until it is signalled or interrupted, or the specified deadline elapses.

6

public void signal() Wakes up one waiting thread.

7

public void signalAll() Wakes up all waiting threads.

Example

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestThread {

   public static void main(String[] args) throws InterruptedException {
      ItemQueue itemQueue = new ItemQueue(10);

      //Create a producer and a consumer.
      Thread producer = new Producer(itemQueue);
      Thread consumer = new Consumer(itemQueue);

      //Start both threads.
      producer.start();
      consumer.start();

      //Wait for both threads to terminate.
      producer.join();
      consumer.join();
   }

   static class ItemQueue {
      private Object[] items = null;
      private int current = 0;
      private int placeIndex = 0;
      private int removeIndex = 0;

      private final Lock lock;
      private final Condition isEmpty;
      private final Condition isFull;

      public ItemQueue(int capacity) {
         this.items = new Object[capacity];
         lock = new ReentrantLock();
         isEmpty = lock.newCondition();
         isFull = lock.newCondition();
      }

      public void add(Object item) throws InterruptedException {
         lock.lock();

         while(current >= items.length)
            isFull.await();

         items[placeIndex] = item;
         placeIndex = (placeIndex + 1) % items.length;
         ++current;

         //Notify the consumer that there is data available.
         isEmpty.signal();
         lock.unlock();
      }

      public Object remove() throws InterruptedException {
         Object item = null;

         lock.lock();

         while(current <= 0) {
            isEmpty.await();
         }
         item = items[removeIndex];
         removeIndex = (removeIndex + 1) % items.length;
         --current;

         //Notify the producer that there is space available.
         isFull.signal();
         lock.unlock();

         return item;
      }

      public boolean isEmpty() {
         return (items.length == 0);
      }
   }

   static class Producer extends Thread {
      private final ItemQueue queue;

      public Producer(ItemQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         String[] numbers =
            {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"};

         try {

            for(String number: numbers) {
               System.out.println("[Producer]: " + number);
            }
            queue.add(null);
         } catch (InterruptedException ex) {
            ex.printStackTrace();
         }
      }
   }

   static class Consumer extends Thread {
      private final ItemQueue queue;

      public Consumer(ItemQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {

         try {

            do {
               Object number = queue.remove();
               System.out.println("[Consumer]: " + number);

               if(number == null) {
                  return;
               }
            } while(!queue.isEmpty());
         } catch (InterruptedException ex) {
            ex.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

[Producer]: 1
[Producer]: 2
[Producer]: 3
[Producer]: 4
[Producer]: 5
[Producer]: 6
[Producer]: 7
[Producer]: 8
[Producer]: 9
[Producer]: 10
[Producer]: 11
[Producer]: 12
[Consumer]: null

Java Concurrency - AtomicInteger Class

java.util.concurrent.atomic.AtomicInteger 类提供对底层 int 值的操作,该值可以原子地读写,还包含高级原子操作。AtomicInteger 支持对底层 int 变量的原子操作。它有 get 和 set 方法,就像对 volatile 变量进行读写一样。也就是说,set 与同一变量上的任何后续 get 都有 happens-before 关系。atomic compareAndSet 方法也有这些内存一致性特性。

A java.util.concurrent.atomic.AtomicInteger class provides operations on underlying int value that can be read and written atomically, and also contains advanced atomic operations. AtomicInteger supports atomic operations on underlying int variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicInteger Methods

以下是 AtomicInteger 类中可用的重要方法的列表。

Following is the list of important methods available in the AtomicInteger class.

Sr.No.

Method & Description

1

public int addAndGet(int delta) Atomically adds the given value to the current value.

2

public boolean compareAndSet(int expect, int update) Atomically sets the value to the given updated value if the current value is same as the expected value.

3

public int decrementAndGet() Atomically decrements by one the current value.

4

public double doubleValue() Returns the value of the specified number as a double.

5

public float floatValue() Returns the value of the specified number as a float.

6

public int get() Gets the current value.

7

public int getAndAdd(int delta) Atomiclly adds the given value to the current value.

8

public int getAndDecrement() Atomically decrements by one the current value.

9

public int getAndIncrement() Atomically increments by one the current value.

10

public int getAndSet(int newValue) Atomically sets to the given value and returns the old value.

11

public int incrementAndGet() Atomically increments by one the current value.

12

public int intValue() Returns the value of the specified number as an int.

13

public void lazySet(int newValue) Eventually sets to the given value.

14

public long longValue() Returns the value of the specified number as a long.

15

public void set(int newValue) Sets to the given value.

16

public String toString() Returns the String representation of the current value.

17

public boolean weakCompareAndSet(int expect, int update) Atomically sets the value to the given updated value if the current value is same as the expected value.

Example

以下 TestThread 程序显示了在线程环境中计数器的非安全实现。

The following TestThread program shows a unsafe implementation of counter in thread based environment.

public class TestThread {

   static class Counter {
      private int c = 0;

      public void increment() {
         c++;
      }

      public int value() {
         return c;
      }
   }

   public static void main(final String[] arguments) throws InterruptedException {
      final Counter counter = new Counter();

      //1000 threads
      for(int i = 0; i < 1000 ; i++) {

         new Thread(new Runnable() {

            public void run() {
               counter.increment();
            }
         }).start();
      }
      Thread.sleep(6000);
      System.out.println("Final number (should be 1000): " + counter.value());
   }
}

这可能会根据计算机速度和线程交错产生以下结果。

This may produce the following result depending upon computer’s speed and thread interleaving.

Output

Final number (should be 1000): 1000

Example

import java.util.concurrent.atomic.AtomicInteger;

public class TestThread {

   static class Counter {
      private AtomicInteger c = new AtomicInteger(0);

      public void increment() {
         c.getAndIncrement();
      }

      public int value() {
         return c.get();
      }
   }

   public static void main(final String[] arguments) throws InterruptedException {
      final Counter counter = new Counter();

      //1000 threads
      for(int i = 0; i < 1000 ; i++) {

         new Thread(new Runnable() {
            public void run() {
               counter.increment();
            }
         }).start();
      }
      Thread.sleep(6000);
      System.out.println("Final number (should be 1000): " + counter.value());
   }
}

这将产生以下结果。

This will produce the following result.

Output

Final number (should be 1000): 1000

Java Concurrency - AtomicLong Class

java.util.concurrent.atomic.AtomicLong 类在可以读取和写入原子的底层长值上提供操作,还包含高级原子操作。AtomicLong 支持对底层长变量的原子操作。它具有像读写 volatile 变量那样的 get 和 set 方法。也就是说,一个 set 与对同一变量的任何后续 get 都具有 happens-before 关系。atomic compareAndSet 方法还具有这些内存一致性特性。

A java.util.concurrent.atomic.AtomicLong class provides operations on underlying long value that can be read and written atomically, and also contains advanced atomic operations. AtomicLong supports atomic operations on underlying long variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicLong Methods

以下是 AtomicLong 类中提供的最重要方法列表。

Following is the list of important methods available in the AtomicLong class.

Sr.No.

Method & Description

1

public long addAndGet(long delta) Atomically adds the given value to the current value.

2

public boolean compareAndSet(long expect, long update) Atomically sets the value to the given updated value if the current value is same as the expected value.

3

public long decrementAndGet() Atomically decrements by one the current value.

4

public double doubleValue() Returns the value of the specified number as a double.

5

public float floatValue() Returns the value of the specified number as a float.

6

public long get() Gets the current value.

7

public long getAndAdd(long delta) Atomiclly adds the given value to the current value.

8

public long getAndDecrement() Atomically decrements by one the current value.

9

public long getAndIncrement() Atomically increments by one the current value.

10

public long getAndSet(long newValue) Atomically sets to the given value and returns the old value.

11

public long incrementAndGet() Atomically increments by one the current value.

12

public int intValue() Returns the value of the specified number as an int.

13

public void lazySet(long newValue) Eventually sets to the given value.

14

public long longValue() Returns the value of the specified number as a long.

15

public void set(long newValue) Sets to the given value.

16

public String toString() Returns the String representation of the current value.

17

public boolean weakCompareAndSet(long expect, long update) Atomically sets the value to the given updated value if the current value is same as the expected value.

Example

TestThread 程序展示了在基于线程的环境中使用 AtomicLong 来安全实现计数器的过程。

The following TestThread program shows a safe implementation of counter using AtomicLong in thread based environment.

import java.util.concurrent.atomic.AtomicLong;

public class TestThread {

   static class Counter {
      private AtomicLong c = new AtomicLong(0);

      public void increment() {
         c.getAndIncrement();
      }

      public long value() {
         return c.get();
      }
   }

   public static void main(final String[] arguments) throws InterruptedException {
      final Counter counter = new Counter();

      //1000 threads
      for(int i = 0; i < 1000 ; i++) {

         new Thread(new Runnable() {

            public void run() {
               counter.increment();
            }
         }).start();
      }
      Thread.sleep(6000);
      System.out.println("Final number (should be 1000): " + counter.value());
   }
}

这将产生以下结果。

This will produce the following result.

Output

Final number (should be 1000): 1000

Java Concurrency - AtomicBoolean Class

java.util.concurrent.atomic.AtomicBoolean 类在可以读取和写入原子的底层布尔值上提供操作,还包含高级原子操作。AtomicBoolean 支持对底层布尔变量的原子操作。它具有像读写 volatile 变量那样的 get 和 set 方法。也就是说,一个 set 与对同一变量的任何后续 get 都具有 happens-before 关系。atomic compareAndSet 方法还具有这些内存一致性特性。

A java.util.concurrent.atomic.AtomicBoolean class provides operations on underlying boolean value that can be read and written atomically, and also contains advanced atomic operations. AtomicBoolean supports atomic operations on underlying boolean variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicBoolean Methods

以下是 AtomicBoolean 类中可用的重要方法列表。

Following is the list of important methods available in the AtomicBoolean class.

Sr.No.

Method & Description

1

public boolean compareAndSet(boolean expect, boolean update) Atomically sets the value to the given updated value if the current value == the expected value.

2

public boolean get() Returns the current value.

3

public boolean getAndSet(boolean newValue) Atomically sets to the given value and returns the previous value.

4

public void lazySet(boolean newValue) Eventually sets to the given value.

5

public void set(boolean newValue) Unconditionally sets to the given value.

6

public String toString() Returns the String representation of the current value.

7

public boolean weakCompareAndSet(boolean expect, boolean update) Atomically sets the value to the given updated value if the current value == the expected value.

Example

以下 TestThread 程序显示了在基于线程的环境中使用 AtomicBoolean 变量。

The following TestThread program shows usage of AtomicBoolean variable in thread based environment.

import java.util.concurrent.atomic.AtomicBoolean;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      final AtomicBoolean atomicBoolean = new AtomicBoolean(false);

      new Thread("Thread 1") {

         public void run() {

            while(true) {
               System.out.println(Thread.currentThread().getName()
                  +" Waiting for Thread 2 to set Atomic variable to true. Current value is "
                  + atomicBoolean.get());

               if(atomicBoolean.compareAndSet(true, false)) {
                  System.out.println("Done!");
                  break;
               }
            }
         };
      }.start();

      new Thread("Thread 2") {

         public void run() {
            System.out.println(Thread.currentThread().getName() +
               ", Atomic Variable: " +atomicBoolean.get());
            System.out.println(Thread.currentThread().getName() +
               " is setting the variable to true ");
            atomicBoolean.set(true);
            System.out.println(Thread.currentThread().getName() +
               ", Atomic Variable: " +atomicBoolean.get());
         };
      }.start();
   }
}

这将产生以下结果。

This will produce the following result.

Output

Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 2, Atomic Variable: false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 2 is setting the variable to true
Thread 2, Atomic Variable: true
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Done!

Java Concurrency - AtomicReference Class

java.util.concurrent.atomic.AtomicReference 类提供对基础对象引用的操作,该引用可被原子性地读写,并且还包含高级原子操作。AtomicReference 支持对基础对象引用变量的原子操作。它有类似对 volatile 变量的读写操作的 get 和 set 方法。也就是说,set 操作与任何对相同变量的后续 get 操作之间存在先行发生关系。原子 compareAndSet 方法也有这些内存一致性特征。

A java.util.concurrent.atomic.AtomicReference class provides operations on underlying object reference that can be read and written atomically, and also contains advanced atomic operations. AtomicReference supports atomic operations on underlying object reference variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicReference Methods

以下是 AtomicReference 类中可用的重要方法列表。

Following is the list of important methods available in the AtomicReference class.

Sr.No.

Method & Description

1

public boolean compareAndSet(V expect, V update) Atomically sets the value to the given updated value if the current value == the expected value.

2

public boolean get() Returns the current value.

3

public boolean getAndSet(V newValue) Atomically sets to the given value and returns the previous value.

4

public void lazySet(V newValue) Eventually sets to the given value.

5

public void set(V newValue) Unconditionally sets to the given value.

6

public String toString() Returns the String representation of the current value.

7

public boolean weakCompareAndSet(V expect, V update) Atomically sets the value to the given updated value if the current value == the expected value.

Example

以下 TestThread 程序显示了在基于线程的环境中使用 AtomicReference 变量。

The following TestThread program shows usage of AtomicReference variable in thread based environment.

import java.util.concurrent.atomic.AtomicReference;

public class TestThread {
   private static String message = "hello";
   private static AtomicReference<String> atomicReference;

   public static void main(final String[] arguments) throws InterruptedException {
      atomicReference = new AtomicReference<String>(message);

      new Thread("Thread 1") {

         public void run() {
            atomicReference.compareAndSet(message, "Thread 1");
            message = message.concat("-Thread 1!");
         };
      }.start();

      System.out.println("Message is: " + message);
      System.out.println("Atomic Reference of Message is: " + atomicReference.get());
   }
}

这将产生以下结果。

This will produce the following result.

Output

Message is: hello
Atomic Reference of Message is: Thread 1

Java Concurrency - AtomicIntegerArray Class

java.util.concurrent.atomic.AtomicIntegerArray 类提供对基础 int 数组的操作,该数组可被原子性地读写,并且还包含高级原子操作。AtomicIntegerArray 支持对基础 int 数组变量的原子操作。它有类似对 volatile 变量的读写操作的 get 和 set 方法。也就是说,set 操作与任何对相同变量的后续 get 操作之间存在先行发生关系。原子 compareAndSet 方法也有这些内存一致性特征。

A java.util.concurrent.atomic.AtomicIntegerArray class provides operations on underlying int array that can be read and written atomically, and also contains advanced atomic operations. AtomicIntegerArray supports atomic operations on underlying int array variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicIntegerArray Methods

以下是 AtomicIntegerArray 类中可用的重要方法列表。

Following is the list of important methods available in the AtomicIntegerArray class.

Sr.No.

Method & Description

1

public int addAndGet(int i, int delta) Atomically adds the given value to the element at index i.

2

public boolean compareAndSet(int i, int expect, int update) Atomically sets the element at position i to the given updated value if the current value == the expected value.

3

public int decrementAndGet(int i) Atomically decrements by one the element at index i.

4

public int get(int i) Gets the current value at position i.

5

public int getAndAdd(int i, int delta) Atomically adds the given value to the element at index i.

6

public int getAndDecrement(int i) Atomically decrements by one the element at index i.

7

public int getAndIncrement(int i) Atomically increments by one the element at index i.

8

public int getAndSet(int i, int newValue) Atomically sets the element at position i to the given value and returns the old value.

9

public int incrementAndGet(int i) Atomically increments by one the element at index i.

10

public void lazySet(int i, int newValue) Eventually sets the element at position i to the given value.

11

public int length() Returns the length of the array.

12

public void set(int i, int newValue) Sets the element at position i to the given value.

13

public String toString() Returns the String representation of the current values of array.

14

public boolean weakCompareAndSet(int i, int expect, int update) Atomically sets the element at position i to the given updated value if the current value == the expected value.

Example

以下 TestThread 程序演示了在基于线程的环境中使用 AtomicIntegerArray 变量。

The following TestThread program shows usage of AtomicIntegerArray variable in thread based environment.

import java.util.concurrent.atomic.AtomicIntegerArray;

public class TestThread {
   private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

   public static void main(final String[] arguments) throws InterruptedException {

      for (int i = 0; i<atomicIntegerArray.length(); i++) {
         atomicIntegerArray.set(i, 1);
      }

      Thread t1 = new Thread(new Increment());
      Thread t2 = new Thread(new Compare());
      t1.start();
      t2.start();

      t1.join();
      t2.join();

      System.out.println("Values: ");

      for (int i = 0; i<atomicIntegerArray.length(); i++) {
         System.out.print(atomicIntegerArray.get(i) + " ");
      }
   }

   static class Increment implements Runnable {

      public void run() {

         for(int i = 0; i<atomicIntegerArray.length(); i++) {
            int add = atomicIntegerArray.incrementAndGet(i);
            System.out.println("Thread " + Thread.currentThread().getId()
               + ", index " +i + ", value: "+ add);
         }
      }
   }

   static class Compare implements Runnable {

      public void run() {

         for(int i = 0; i<atomicIntegerArray.length(); i++) {
            boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);

            if(swapped) {
               System.out.println("Thread " + Thread.currentThread().getId()
                  + ", index " +i + ", value: 3");
            }
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Thread 10, index 0, value: 2
Thread 10, index 1, value: 2
Thread 10, index 2, value: 2
Thread 11, index 0, value: 3
Thread 10, index 3, value: 2
Thread 11, index 1, value: 3
Thread 11, index 2, value: 3
Thread 10, index 4, value: 2
Thread 11, index 3, value: 3
Thread 10, index 5, value: 2
Thread 10, index 6, value: 2
Thread 11, index 4, value: 3
Thread 10, index 7, value: 2
Thread 11, index 5, value: 3
Thread 10, index 8, value: 2
Thread 11, index 6, value: 3
Thread 10, index 9, value: 2
Thread 11, index 7, value: 3
Thread 11, index 8, value: 3
Thread 11, index 9, value: 3
Values:
3 3 3 3 3 3 3 3 3 3

Java Concurrency - AtomicLongArray Class

java.util.concurrent.atomic.AtomicLongArray 类提供了可在底层长数组进行原子读取和写入的操作,还包含一些高级原子操作。AtomicLongArray 支持对底层 long 数组变量的原子操作。它有类似于对 volatile 变量进行读写操作的 get 和 set 方法。这意味着 set 与对同一变量的任何后续 get 具有先发生后发生关系。atomic compareAndSet 方法也有这些内存一致性特性。

A java.util.concurrent.atomic.AtomicLongArray class provides operations on underlying long array that can be read and written atomically, and also contains advanced atomic operations. AtomicLongArray supports atomic operations on underlying long array variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicLongArray Methods

以下是 AtomicLongArray 类中提供的一些重要方法。

Following is the list of important methods available in the AtomicLongArray class.

Sr.No.

Method & Description

1

public long addAndGet(int i, long delta) Atomically adds the given value to the element at index i.

2

public boolean compareAndSet(int i, long expect, long update) Atomically sets the element at position i to the given updated value if the current value == the expected value.

3

public long decrementAndGet(int i) Atomically decrements by one the element at index i.

4

public long get(int i) Gets the current value at position i.

5

public long getAndAdd(int i, long delta) Atomically adds the given value to the element at index i.

6

public long getAndDecrement(int i) Atomically decrements by one the element at index i.

7

public long getAndIncrement(int i) Atomically increments by one the element at index i.

8

public long getAndSet(int i, long newValue) Atomically sets the element at position i to the given value and returns the old value.

9

public long incrementAndGet(int i) Atomically increments by one the element at index i.

10

public void lazySet(int i, long newValue) Eventually sets the element at position i to the given value.

11

public int length() Returns the length of the array.

12

public void set(int i, long newValue) Sets the element at position i to the given value.

13

public String toString() Returns the String representation of the current values of array.

14

public boolean weakCompareAndSet(int i, long expect, long update) Atomically sets the element at position i to the given updated value if the current value == the expected value.

Example

以下 TestThread 程序演示了在基于线程的环境中使用 AtomicIntegerArray 变量。

The following TestThread program shows usage of AtomicIntegerArray variable in thread based environment.

import java.util.concurrent.atomic.AtomicLongArray;

public class TestThread {
   private static AtomicLongArray atomicLongArray = new AtomicLongArray(10);

   public static void main(final String[] arguments) throws InterruptedException {

      for (int i = 0; i<atomicLongArray.length(); i++) {
         atomicLongArray.set(i, 1);
      }

      Thread t1 = new Thread(new Increment());
      Thread t2 = new Thread(new Compare());
      t1.start();
      t2.start();

      t1.join();
      t2.join();

      System.out.println("Values: ");

      for (int i = 0; i<atomicLongArray.length(); i++) {
         System.out.print(atomicLongArray.get(i) + " ");
      }
   }

   static class Increment implements Runnable {

      public void run() {

         for(int i = 0; i<atomicLongArray.length(); i++) {
            long add = atomicLongArray.incrementAndGet(i);
            System.out.println("Thread " + Thread.currentThread().getId()
               + ", index " +i + ", value: "+ add);
         }
      }
   }

   static class Compare implements Runnable {

      public void run() {

         for(int i = 0; i<atomicLongArray.length(); i++) {
            boolean swapped = atomicLongArray.compareAndSet(i, 2, 3);

            if(swapped) {
               System.out.println("Thread " + Thread.currentThread().getId()
                  + ", index " +i + ", value: 3");
            }
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Thread 9, index 0, value: 2
Thread 10, index 0, value: 3
Thread 9, index 1, value: 2
Thread 9, index 2, value: 2
Thread 9, index 3, value: 2
Thread 9, index 4, value: 2
Thread 10, index 1, value: 3
Thread 9, index 5, value: 2
Thread 10, index 2, value: 3
Thread 9, index 6, value: 2
Thread 10, index 3, value: 3
Thread 9, index 7, value: 2
Thread 10, index 4, value: 3
Thread 9, index 8, value: 2
Thread 9, index 9, value: 2
Thread 10, index 5, value: 3
Thread 10, index 6, value: 3
Thread 10, index 7, value: 3
Thread 10, index 8, value: 3
Thread 10, index 9, value: 3
Values:
3 3 3 3 3 3 3 3 3 3

AtomicReferenceArray Class

java.util.concurrent.atomic.AtomicReferenceArray 类在基础引用数组上提供操作,这些操作可以原子地进行读和写,还包含高级原子操作。AtomicReferenceArray 支持对基础引用数组变量进行原子操作。它有 get 和 set 方法,这些方法的工作方式类似于对 volatile 变量进行读取和写入。也就是说,set 与对同一变量的任何后续 get 具有存在先行关系。原子 compareAndSet 方法也具有这些内存一致性特性。

A java.util.concurrent.atomic.AtomicReferenceArray class provides operations on underlying reference array that can be read and written atomically, and also contains advanced atomic operations. AtomicReferenceArray supports atomic operations on underlying reference array variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.

AtomicReferenceArray Methods

以下是 AtomicReferenceArray 类中可用一些重要方法的列表。

Following is the list of important methods available in the AtomicReferenceArray class.

Sr.No.

Method & Description

1

public boolean compareAndSet(int i, E expect, E update) Atomically sets the element at position i to the given updated value if the current value == the expected value.

2

public E get(int i) Gets the current value at position i.

3

public E getAndSet(int i, E newValue) Atomically sets the element at position i to the given value and returns the old value.

4

public void lazySet(int i, E newValue) Eventually sets the element at position i to the given value.

5

public int length() Returns the length of the array.

6

public void set(int i, E newValue) Sets the element at position i to the given value.

7

public String toString() Returns the String representation of the current values of array.

8

public boolean weakCompareAndSet(int i, E expect, E update) Atomically sets the element at position i to the given updated value if the current value == the expected value.

Example

以下 TestThread 程序展示了在基于线程的环境中,AtomicReferenceArray 变量的用法。

The following TestThread program shows usage of AtomicReferenceArray variable in thread based environment.

import java.util.concurrent.atomic.AtomicReferenceArray;

public class TestThread {
   private static String[] source = new String[10];
   private static AtomicReferenceArray<String> atomicReferenceArray
      = new AtomicReferenceArray<String>(source);

   public static void main(final String[] arguments) throws InterruptedException {

      for (int i = 0; i<atomicReferenceArray.length(); i++) {
         atomicReferenceArray.set(i, "item-2");
      }

      Thread t1 = new Thread(new Increment());
      Thread t2 = new Thread(new Compare());
      t1.start();
      t2.start();

      t1.join();
      t2.join();
   }

   static class Increment implements Runnable {

      public void run() {

         for(int i = 0; i<atomicReferenceArray.length(); i++) {
            String add = atomicReferenceArray.getAndSet(i,"item-"+ (i+1));
            System.out.println("Thread " + Thread.currentThread().getId()
               + ", index " +i + ", value: "+ add);
         }
      }
   }

   static class Compare implements Runnable {

      public void run() {

         for(int i = 0; i<atomicReferenceArray.length(); i++) {
            System.out.println("Thread " + Thread.currentThread().getId()
               + ", index " +i + ", value: "+ atomicReferenceArray.get(i));
            boolean swapped = atomicReferenceArray.compareAndSet(i, "item-2", "updated-item-2");
            System.out.println("Item swapped: " + swapped);

            if(swapped) {
               System.out.println("Thread " + Thread.currentThread().getId()
                  + ", index " +i + ", updated-item-2");
            }
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Thread 9, index 0, value: item-2
Thread 10, index 0, value: item-1
Item swapped: false
Thread 10, index 1, value: item-2
Item swapped: true
Thread 9, index 1, value: updated-item-2
Thread 10, index 1, updated-item-2
Thread 10, index 2, value: item-3
Item swapped: false
Thread 10, index 3, value: item-2
Item swapped: true
Thread 10, index 3, updated-item-2
Thread 10, index 4, value: item-2
Item swapped: true
Thread 10, index 4, updated-item-2
Thread 10, index 5, value: item-2
Item swapped: true
Thread 10, index 5, updated-item-2
Thread 10, index 6, value: item-2
Thread 9, index 2, value: item-2
Item swapped: true
Thread 9, index 3, value: updated-item-2
Thread 10, index 6, updated-item-2
Thread 10, index 7, value: item-2
Thread 9, index 4, value: updated-item-2
Item swapped: true
Thread 9, index 5, value: updated-item-2
Thread 10, index 7, updated-item-2
Thread 9, index 6, value: updated-item-2
Thread 10, index 8, value: item-2
Thread 9, index 7, value: updated-item-2
Item swapped: true
Thread 9, index 8, value: updated-item-2
Thread 10, index 8, updated-item-2
Thread 9, index 9, value: item-2
Thread 10, index 9, value: item-10
Item swapped: false

Java Concurrency - Executor Interface

java.util.concurrent.Executor 接口是一个支持启动新任务的简单接口。

A java.util.concurrent.Executor interface is a simple interface to support launching new tasks.

ExecutorService Methods

Sr.No.

Method & Description

1

void execute(Runnable command) Executes the given command at some time in the future.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 Executor 接口。

The following TestThread program shows usage of Executor interface in thread based environment.

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      Executor executor = Executors.newCachedThreadPool();
      executor.execute(new Task());
      ThreadPoolExecutor pool = (ThreadPoolExecutor)executor;
      pool.shutdown();
   }

   static class Task implements Runnable {

      public void run() {

         try {
            Long duration = (long) (Math.random() * 5);
            System.out.println("Running Task!");
            TimeUnit.SECONDS.sleep(duration);
            System.out.println("Task Completed");
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Running Task!
Task Completed

ExecutorService Interface

java.util.concurrent.ExecutorService 接口是 Executor 接口的子接口,它增加了在单个任务和执行器本身上管理生命周期所需的功能。

A java.util.concurrent.ExecutorService interface is a subinterface of Executor interface, and adds features to manage the lifecycle, both of the individual tasks and of the executor itself.

ExecutorService Methods

Sr.No.

Method & Description

1

boolean awaitTermination(long timeout, TimeUnit unit) Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

2

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) Executes the given tasks, returning a list of Futures holding their status and results when all complete.

3

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires, whichever happens first.

4

<T> T invokeAny(Collection<? extends Callable<T>> tasks) Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do.

5

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) Executes the given tasks, returning the result of one that has completed successfully (i.e., without throwing an exception), if any do before the given timeout elapses.

6

boolean isShutdown() Returns true if this executor has been shut down.

7

boolean isTerminated() Returns true if all tasks have completed following shut down.

8

void shutdown() Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

9

List<Runnable> shutdownNow() Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

10

<T> Future<T> submit(Callable<T> task) Submits a value-returning task for execution and returns a Future representing the pending results of the task.

11

Future<?> submit(Runnable task) Submits a Runnable task for execution and returns a Future representing that task.

12

<T> Future<T> submit(Runnable task, T result) Submits a Runnable task for execution and returns a Future representing that task.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 ExecutorService 接口。

The following TestThread program shows usage of ExecutorService interface in thread based environment.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      ExecutorService executor = Executors.newSingleThreadExecutor();

      try {
         executor.submit(new Task());
         System.out.println("Shutdown executor");
         executor.shutdown();
         executor.awaitTermination(5, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
         System.err.println("tasks interrupted");
      } finally {

         if (!executor.isTerminated()) {
            System.err.println("cancel non-finished tasks");
         }
         executor.shutdownNow();
         System.out.println("shutdown finished");
      }
   }

   static class Task implements Runnable {

      public void run() {

         try {
            Long duration = (long) (Math.random() * 20);
            System.out.println("Running Task!");
            TimeUnit.SECONDS.sleep(duration);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:302)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
	at TestThread$Task.run(TestThread.java:39)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)

ScheduledExecutorService Interface

java.util.concurrent.ScheduledExecutorService 接口是 ExecutorService 接口的子接口,并支持任务的未来和/或定期执行。

A java.util.concurrent.ScheduledExecutorService interface is a subinterface of ExecutorService interface, and supports future and/or periodic execution of tasks.

ScheduledExecutorService Methods

Sr.No.

Method & Description

1

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) Creates and executes a ScheduledFuture that becomes enabled after the given delay.

2

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) Creates and executes a one-shot action that becomes enabled after the given delay.

3

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.

4

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 ScheduledExecutorService 接口。

The following TestThread program shows usage of ScheduledExecutorService interface in thread based environment.

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

      final ScheduledFuture<?> beepHandler =
         scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);

      scheduler.schedule(new Runnable() {

         @Override
         public void run() {
            beepHandler.cancel(true);
            scheduler.shutdown();
         }
      }, 10, TimeUnit.SECONDS);
   }

   static class BeepTask implements Runnable {

      public void run() {
         System.out.println("beep");
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

beep
beep
beep
beep

newFixedThreadPool Method

可以通过调用 Executors 类的静态 newFixedThreadPool() 方法获得一个固定线程池。

A fixed thread pool can be obtainted by calling the static newFixedThreadPool() method of Executors class.

Syntax

ExecutorService fixedPool = Executors.newFixedThreadPool(2);

其中

where

  1. Maximum 2 threads will be active to process tasks.

  2. If more than 2 threads are submitted then they are held in a queue until threads become available.

  3. A new thread is created to take its place if a thread terminates due to failure during execution shutdown on executor is not yet called.

  4. Any thread exists till the pool is shutdown.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 newFixedThreadPool 方法。

The following TestThread program shows usage of newFixedThreadPool method in thread based environment.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(2);

      // Cast the object to its class type
      ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;

      //Stats before tasks execution
      System.out.println("Largest executions: "
         + pool.getLargestPoolSize());
      System.out.println("Maximum allowed threads: "
         + pool.getMaximumPoolSize());
      System.out.println("Current threads in pool: "
         + pool.getPoolSize());
      System.out.println("Currently executing threads: "
         + pool.getActiveCount());
      System.out.println("Total number of threads(ever scheduled): "
         + pool.getTaskCount());

      executor.submit(new Task());
      executor.submit(new Task());

      //Stats after tasks execution
      System.out.println("Core threads: " + pool.getCorePoolSize());
      System.out.println("Largest executions: "
         + pool.getLargestPoolSize());
      System.out.println("Maximum allowed threads: "
         + pool.getMaximumPoolSize());
      System.out.println("Current threads in pool: "
         + pool.getPoolSize());
      System.out.println("Currently executing threads: "
         + pool.getActiveCount());
      System.out.println("Total number of threads(ever scheduled): "
         + pool.getTaskCount());

      executor.shutdown();
   }

   static class Task implements Runnable {

      public void run() {

         try {
            Long duration = (long) (Math.random() * 5);
            System.out.println("Running Task! Thread Name: " +
               Thread.currentThread().getName());
               TimeUnit.SECONDS.sleep(duration);

            System.out.println("Task Completed! Thread Name: " +
               Thread.currentThread().getName());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Largest executions: 0
Maximum allowed threads: 2
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 2
Largest executions: 2
Maximum allowed threads: 2
Current threads in pool: 2
Currently executing threads: 1
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-1
Running Task! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-1

newCachedThreadPool Method

可以通过调用 Executors 类的静态 newCachedThreadPool() 方法获得一个缓存线程池。

A cached thread pool can be obtainted by calling the static newCachedThreadPool() method of Executors class.

Syntax

ExecutorService executor = Executors.newCachedThreadPool();

其中

where

  1. newCachedThreadPool method creates an executor having an expandable thread pool.

  2. Such an executor is suitable for applications that launch many short-lived tasks.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 newCachedThreadPool 方法。

The following TestThread program shows usage of newCachedThreadPool method in thread based environment.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      ExecutorService executor = Executors.newCachedThreadPool();

      // Cast the object to its class type
      ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;

      //Stats before tasks execution
      System.out.println("Largest executions: "
         + pool.getLargestPoolSize());
      System.out.println("Maximum allowed threads: "
         + pool.getMaximumPoolSize());
      System.out.println("Current threads in pool: "
         + pool.getPoolSize());
      System.out.println("Currently executing threads: "
         + pool.getActiveCount());
      System.out.println("Total number of threads(ever scheduled): "
         + pool.getTaskCount());

      executor.submit(new Task());
      executor.submit(new Task());

      //Stats after tasks execution
      System.out.println("Core threads: " + pool.getCorePoolSize());
      System.out.println("Largest executions: "
         + pool.getLargestPoolSize());
      System.out.println("Maximum allowed threads: "
         + pool.getMaximumPoolSize());
      System.out.println("Current threads in pool: "
         + pool.getPoolSize());
      System.out.println("Currently executing threads: "
         + pool.getActiveCount());
      System.out.println("Total number of threads(ever scheduled): "
         + pool.getTaskCount());

      executor.shutdown();
   }

   static class Task implements Runnable {

      public void run() {

         try {
            Long duration = (long) (Math.random() * 5);
            System.out.println("Running Task! Thread Name: " +
               Thread.currentThread().getName());
               TimeUnit.SECONDS.sleep(duration);
            System.out.println("Task Completed! Thread Name: " +
               Thread.currentThread().getName());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Largest executions: 0
Maximum allowed threads: 2147483647
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 0
Largest executions: 2
Maximum allowed threads: 2147483647
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-1
Running Task! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-1

newScheduledThreadPool Method

可以通过调用 Executors 类的静态 newScheduledThreadPool() 方法获得预定线程池。

A scheduled thread pool can be obtainted by calling the static newScheduledThreadPool() method of Executors class.

Syntax

ExecutorService executor = Executors.newScheduledThreadPool(1);

Example

以下 TestThread 程序显示了在线程环境中 newScheduledThreadPool 方法的用法。

The following TestThread program shows usage of newScheduledThreadPool method in thread based environment.

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

      final ScheduledFuture<?> beepHandler =
         scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);

      scheduler.schedule(new Runnable() {

         @Override
         public void run() {
            beepHandler.cancel(true);
            scheduler.shutdown();
         }
      }, 10, TimeUnit.SECONDS);
   }

   static class BeepTask implements Runnable {

      public void run() {
         System.out.println("beep");
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

beep
beep
beep
beep

newSingleThreadExecutor Method

可以通过调用 Executors 类的静态 newSingleThreadExecutor() 方法获得一个单线程池。

A single thread pool can be obtainted by calling the static newSingleThreadExecutor() method of Executors class.

Syntax

ExecutorService executor = Executors.newSingleThreadExecutor();

其中 newSingleThreadExecutor 方法创建一个一次执行一个任务的执行器。

Where newSingleThreadExecutor method creates an executor that executes a single task at a time.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 newSingleThreadExecutor 方法。

The following TestThread program shows usage of newSingleThreadExecutor method in thread based environment.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      ExecutorService executor = Executors.newSingleThreadExecutor();

      try {
         executor.submit(new Task());
         System.out.println("Shutdown executor");
         executor.shutdown();
         executor.awaitTermination(5, TimeUnit.SECONDS);
      } catch (InterruptedException e) {
         System.err.println("tasks interrupted");
      } finally {

         if (!executor.isTerminated()) {
            System.err.println("cancel non-finished tasks");
         }
         executor.shutdownNow();
         System.out.println("shutdown finished");
      }
   }

   static class Task implements Runnable {

      public void run() {

         try {
            Long duration = (long) (Math.random() * 20);
            System.out.println("Running Task!");
            TimeUnit.SECONDS.sleep(duration);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
	at java.lang.Thread.sleep(Thread.java:302)
	at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
	at TestThread$Task.run(TestThread.java:39)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)

ThreadPoolExecutor Class

java.util.concurrent.ThreadPoolExecutor 是 ExecutorService,使用多个池线程中一个线程(通常由 Executors 工厂方法配置)执行每个提交的任务。它还提供各种实用方法来检查当前线程统计信息并控制它们。

java.util.concurrent.ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

ThreadPoolExecutor Methods

Sr.No.

Method & Description

1

protected void afterExecute(Runnable r, Throwable t) Method invoked upon completion of execution of the given Runnable.

2

void allowCoreThreadTimeOut(boolean value) Sets the policy governing whether core threads may time out and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive.

3

boolean allowsCoreThreadTimeOut() Returns true if this pool allows core threads to time out and terminate if no tasks arrive within the keepAlive time, being replaced if needed when new tasks arrive.

4

boolean awaitTermination(long timeout, TimeUnit unit) Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

5

protected void beforeExecute(Thread t, Runnable r) Method invoked prior to executing the given Runnable in the given thread.

6

void execute(Runnable command) Executes the given task sometime in the future.

7

protected void finalize() Invokes shutdown when this executor is no longer referenced and it has no threads.

8

int getActiveCount() Returns the approximate number of threads that are actively executing tasks.

9

long getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution.

10

int getCorePoolSize() Returns the core number of threads.

11

long getKeepAliveTime(TimeUnit unit) Returns the thread keep-alive time, which is the amount of time that threads in excess of the core pool size may remain idle before being terminated.

12

int getLargestPoolSize() Returns the largest number of threads that have ever simultaneously been in the pool.

13

int getMaximumPoolSize() Returns the maximum allowed number of threads.

14

int getPoolSize() Returns the current number of threads in the pool.

15

BlockingQueue getQueue() Returns the task queue used by this executor.

15

RejectedExecutionHandler getRejectedExecutionHandler() Returns the current handler for unexecutable tasks.

16

long getTaskCount() Returns the approximate total number of tasks that have ever been scheduled for execution.

17

ThreadFactory getThreadFactory() Returns the thread factory used to create new threads.

18

boolean isShutdown() Returns true if this executor has been shut down.

19

boolean isTerminated() Returns true if all tasks have completed following shut down.

20

boolean isTerminating() Returns true if this executor is in the process of terminating after shutdown() or shutdownNow() but has not completely terminated.

21

int prestartAllCoreThreads() Starts all core threads, causing them to idly wait for work.

22

boolean prestartCoreThread() Starts a core thread, causing it to idly wait for work.

23

void purge() Tries to remove from the work queue all Future tasks that have been cancelled.

24

boolean remove(Runnable task) Removes this task from the executor’s internal queue if it is present, thus causing it not to be run if it has not already started.

25

void setCorePoolSize(int corePoolSize) Sets the core number of threads.

26

void setKeepAliveTime(long time, TimeUnit unit) Sets the time limit for which threads may remain idle before being terminated.

27

void setMaximumPoolSize(int maximumPoolSize) Sets the maximum allowed number of threads.

28

void setRejectedExecutionHandler(RejectedExecutionHandler handler) Sets a new handler for unexecutable tasks.

29

void setThreadFactory(ThreadFactory threadFactory) Sets the thread factory used to create new threads.

30

void shutdown() Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

31

List<Runnable> shutdownNow() Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

32

protected void terminated() Method invoked when the Executor has terminated.

33

String toString() Returns a string identifying this pool, as well as its state, including indications of run state and estimated worker and task counts.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 ThreadPoolExecutor 接口。

The following TestThread program shows usage of ThreadPoolExecutor interface in thread based environment.

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();

      //Stats before tasks execution
      System.out.println("Largest executions: "
         + executor.getLargestPoolSize());
      System.out.println("Maximum allowed threads: "
         + executor.getMaximumPoolSize());
      System.out.println("Current threads in pool: "
         + executor.getPoolSize());
      System.out.println("Currently executing threads: "
         + executor.getActiveCount());
      System.out.println("Total number of threads(ever scheduled): "
         + executor.getTaskCount());

      executor.submit(new Task());
      executor.submit(new Task());

      //Stats after tasks execution
      System.out.println("Core threads: " + executor.getCorePoolSize());
      System.out.println("Largest executions: "
         + executor.getLargestPoolSize());
      System.out.println("Maximum allowed threads: "
         + executor.getMaximumPoolSize());
      System.out.println("Current threads in pool: "
         + executor.getPoolSize());
      System.out.println("Currently executing threads: "
         + executor.getActiveCount());
      System.out.println("Total number of threads(ever scheduled): "
         + executor.getTaskCount());

      executor.shutdown();
   }

   static class Task implements Runnable {

      public void run() {

         try {
            Long duration = (long) (Math.random() * 5);
            System.out.println("Running Task! Thread Name: " +
               Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(duration);
            System.out.println("Task Completed! Thread Name: " +
               Thread.currentThread().getName());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Largest executions: 0
Maximum allowed threads: 2147483647
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 0
Largest executions: 2
Maximum allowed threads: 2147483647
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-2
Running Task! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-2

ScheduledThreadPoolExecutor Class

java.util.concurrent.ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,它还可以计划命令在给定的延迟后运行,或者定期执行。

java.util.concurrent.ScheduledThreadPoolExecutor is a subclass of ThreadPoolExecutor and can additionally schedule commands to run after a given delay, or to execute periodically.

ScheduledThreadPoolExecutor Methods

Sr.No.

Method & Description

1

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) Modifies or replaces the task used to execute a callable.

2

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) Modifies or replaces the task used to execute a runnable.

3

void execute(Runnable command) Executes command with zero required delay.

4

boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() Gets the policy on whether to continue executing existing periodic tasks even when this executor has been shutdown.

5

boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() Gets the policy on whether to execute existing delayed tasks even when this executor has been shutdown.

6

BlockingQueue<Runnable> getQueue() Returns the task queue used by this executor.

7

boolean getRemoveOnCancelPolicy() Gets the policy on whether cancelled tasks should be immediately removed from the work queue at time of cancellation.

8

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) Creates and executes a ScheduledFuture that becomes enabled after the given delay.

9

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) Creates and executes a one-shot action that becomes enabled after the given delay.

10

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on.

11

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next.

12

void setContinueExistingPeriodicTasksAfterShutdownPolicy (boolean value) Sets the policy on whether to continue executing existing periodic tasks even when this executor has been shutdown.

13

void setExecuteExistingDelayedTasksAfterShutdownPolicy (boolean value) Sets the policy on whether to execute existing delayed tasks even when this executor has been shutdown.

14

void setRemoveOnCancelPolicy(boolean value) Sets the policy on whether cancelled tasks should be immediately removed from the work queue at time of cancellation.

15

void shutdown() Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

16

List<Runnable> shutdownNow() Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

17

<T> Future<T> submit(Callable<T> task) Submits a value-returning task for execution and returns a Future representing the pending results of the task.

18

Future<?> submit(Runnable task) Submits a Runnable task for execution and returns a Future representing that task.

19

<T> Future<T> submit(Runnable task, T result) Submits a Runnable task for execution and returns a Future representing that task.

Example

以下 TestThread 程序展示了在基于线程的环境中使用 ScheduledThreadPoolExecutor 接口。

The following TestThread program shows usage of ScheduledThreadPoolExecutor interface in thread based environment.

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      final ScheduledThreadPoolExecutor scheduler =
         (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);

      final ScheduledFuture<?> beepHandler =
         scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);

      scheduler.schedule(new Runnable() {

         @Override
         public void run() {
            beepHandler.cancel(true);
            scheduler.shutdown();
         }
      }, 10, TimeUnit.SECONDS);
   }

   static class BeepTask implements Runnable {

      public void run() {
         System.out.println("beep");
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

beep
beep
beep
beep

Java Concurrency - Futures and Callables

java.util.concurrent.Callable 对象可以返回由线程完成的计算结果,而 runnable 接口只能运行该线程。 Callable 对象返回 Future 对象,该对象提供用于监视线程执行的任务进度的的方法。 Future 对象可用于检查 Callable 的状态,然后在线程完成后从 Callable 中检索结果。它还提供了超时功能。

java.util.concurrent.Callable object can return the computed result done by a thread in contrast to runnable interface which can only run the thread. The Callable object returns Future object which provides methods to monitor the progress of a task being executed by a thread. Future object can be used to check the status of a Callable and then retrieve the result from the Callable once the thread is done. It also provides timeout functionality.

Syntax

//submit the callable using ThreadExecutor
//and get the result as a Future object
Future<Long> result10 = executor.submit(new FactorialService(10));

//get the result using get method of the Future object
//get method waits till the thread execution and then return the result of the execution.
Long factorial10 = result10.get();

Example

以下 TestThread 程序展示了在基于线程的环境中使用 Future 和 Callable。

The following TestThread program shows usage of Futures and Callables in thread based environment.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException,
      ExecutionException {

      ExecutorService executor = Executors.newSingleThreadExecutor();

      System.out.println("Factorial Service called for 10!");
      Future<Long> result10 = executor.submit(new FactorialService(10));

      System.out.println("Factorial Service called for 20!");
      Future<Long> result20 = executor.submit(new FactorialService(20));

      Long factorial10 = result10.get();
      System.out.println("10! = " + factorial10);

      Long factorial20 = result20.get();
      System.out.println("20! = " + factorial20);

      executor.shutdown();
   }

   static class FactorialService implements Callable<Long> {
      private int number;

      public FactorialService(int number) {
         this.number = number;
      }

      @Override
      public Long call() throws Exception {
         return factorial();
      }

      private Long factorial() throws InterruptedException {
         long result = 1;

         while (number != 0) {
            result = number * result;
            number--;
            Thread.sleep(100);
         }
         return result;
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Factorial Service called for 10!
Factorial Service called for 20!
10! = 3628800
20! = 2432902008176640000

Java Concurrency - Fork-Join framework

fork-join 框架允许在多个工作进程上中断某个任务,然后等待结果以合并它们。它很大程度上利用了多处理器的计算机容量。以下是 fork-join 框架中使用的核心概念和对象。

The fork-join framework allows to break a certain task on several workers and then wait for the result to combine them. It leverages multi-processor machine’s capacity to great extent. Following are the core concepts and objects used in fork-join framework.

Fork

Fork 是任务将自身拆分为多个可并发执行的更小的独立子任务的过程。

Fork is a process in which a task splits itself into smaller and independent sub-tasks which can be executed concurrently.

Syntax

Sum left  = new Sum(array, low, mid);
left.fork();

此处 Sum 为 RecursiveTask 的子类,而 left.fork() 将任务拆分为子任务。

Here Sum is a subclass of RecursiveTask and left.fork() spilts the task into sub-tasks.

Join

Join 是任务在子任务完成执行后,合并所有子任务结果的过程,否则它会保持等待状态。

Join is a process in which a task join all the results of sub-tasks once the subtasks have finished executing, otherwise it keeps waiting.

Syntax

left.join();

此处 left 为 Sum 类的对象。

Here left is an object of Sum class.

ForkJoinPool

它是一个特殊线程池,旨在处理 fork 和 join 任务拆分。

it is a special thread pool designed to work with fork-and-join task splitting.

Syntax

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

此助手程序使用并发级别的 4 CPU 创建了新的 ForkJoinPool。

Here a new ForkJoinPool with a parallelism level of 4 CPUs.

RecursiveAction

RecursiveAction 表示不返回任何值的某个任务。

RecursiveAction represents a task which does not return any value.

Syntax

class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}

RecursiveTask

RecursiveTask 表示返回某个值的某个任务。

RecursiveTask represents a task which returns a value.

Syntax

class Sum extends RecursiveTask<Long> {
   @Override
   protected Long compute() { return null; }
}

Example

以下 TestThread 程序演示了线程基础环境中 Fork-Join 框架的用法。

The following TestThread program shows usage of Fork-Join framework in thread based environment.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException,
      ExecutionException {

      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);

      int[] numbers = new int[1000];

      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }

      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }

   static class Sum extends RecursiveTask<Long> {
      int low;
      int high;
      int[] array;

      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }

      protected Long compute() {

         if(high - low <= 10) {
            long sum = 0;

            for(int i = low; i < high; ++i)
               sum += array[i];
               return sum;
         } else {
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

32
499500

Java Concurrency - BlockingQueue Interface

java.util.concurrent.BlockingQueue 接口是 Queue 接口的子接口,并且另外支持涉及在检索某个元素之前等待队列变为非空,并在存储某个元素之前等待队列中可用的空间等操作。

A java.util.concurrent.BlockingQueue interface is a subinterface of Queue interface, and additionally supports operations such as waiting for the queue to become non-empty before retrieving an element, and wait for space to become available in the queue before storing an element.

BlockingQueue Methods

Sr.No.

Method & Description

1

boolean add(E e) Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and throwing an IllegalStateException if no space is currently available.

2

boolean contains(Object o) Returns true if this queue contains the specified element.

3

int drainTo(Collection<? super E> c) Removes all available elements from this queue and adds them to the given collection.

4

int drainTo(Collection<? super E> c, int maxElements) Removes at most the given number of available elements from this queue and adds them to the given collection.

5

boolean offer(E e) Inserts the specified element into this queue if it is possible to do so immediately without violating capacity restrictions, returning true upon success and false if no space is currently available.

6

boolean offer(E e, long timeout, TimeUnit unit) Inserts the specified element into this queue, waiting up to the specified wait time if necessary for space to become available.

7

E poll(long timeout, TimeUnit unit) Retrieves and removes the head of this queue, waiting up to the specified wait time if necessary for an element to become available.

8

void put(E e) Inserts the specified element into this queue, waiting if necessary for space to become available.

9

int remainingCapacity() Returns the number of additional elements that this queue can ideally (in the absence of memory or resource constraints) accept without blocking, or Integer.MAX_VALUE if there is no intrinsic limit.

10

boolean remove(Object o) Removes a single instance of the specified element from this queue, if it is present.

11

E take() Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.

Example

以下 TestThread 程序演示了线程基础环境中 BlockingQueue 接口的用法。

The following TestThread program shows usage of BlockingQueue interface in thread based environment.

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);

            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {

         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27

Java Concurrency - ConcurrentMap Interface

java.util.concurrent.ConcurrentMap 接口是 Map 接口的子接口,支持对基础映射变量执行原子操作。它具有类似于易失变量上的读写操作的 get 和 set 方法。也就是说,某项设置与对同一变量执行的任何后续 get 操作之间存在发生在前关系。此接口确保线程安全和原子性保证。

A java.util.concurrent.ConcurrentMap interface is a subinterface of Map interface, supports atomic operations on underlying map variable. It have get and set methods that work like reads and writes on volatile variables. That is, a set has a happens-before relationship with any subsequent get on the same variable. This interface ensures thread safety and atomicity guarantees.

ConcurrentMap Methods

Sr.No.

Method & Description

1

default V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) Attempts to compute a mapping for the specified key and its current mapped value (or null if there is no current mapping).

2

default V computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction) If the specified key is not already associated with a value (or is mapped to null), attempts to compute its value using the given mapping function and enters it into this map unless null.

3

default V computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) If the value for the specified key is present and non-null, attempts to compute a new mapping given the key and its current mapped value.

4

default void forEach(BiConsumer<? super K,? super V> action) Performs the given action for each entry in this map until all entries have been processed or the action throws an exception.

5

default V getOrDefault(Object key, V defaultValue) Returns the value to which the specified key is mapped, or defaultValue if this map contains no mapping for the key.

6

default V merge(K key, V value, BiFunction<? super V,? super V,? extends V> remappingFunction) If the specified key is not already associated with a value or is associated with null, associates it with the given non-null value.

7

V putIfAbsent(K key, V value) If the specified key is not already associated with a value, associate it with the given value.

8

boolean remove(Object key, Object value) Removes the entry for a key only if currently mapped to a given value.

9

V replace(K key, V value) Replaces the entry for a key only if currently mapped to some value.

10

boolean replace(K key, V oldValue, V newValue) Replaces the entry for a key only if currently mapped to a given value.

11

default void replaceAll(BiFunction<? super K,? super V,? extends V> function) Replaces each entry’s value with the result of invoking the given function on that entry until all entries have been processed or the function throws an exception.

Example

以下 TestThread 程序展示了 ConcurrentMap 与 HasMap 的用法。

The following TestThread program shows usage of ConcurrentMap vs HashMap.

import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class TestThread {

   public static void main(final String[] arguments) {
      Map<String,String> map = new ConcurrentHashMap<String, String>();

      map.put("1", "One");
      map.put("2", "Two");
      map.put("3", "Three");
      map.put("5", "Five");
      map.put("6", "Six");

      System.out.println("Initial ConcurrentHashMap: " + map);
      Iterator<String> iterator = map.keySet().iterator();

      try {

         while(iterator.hasNext()) {
            String key = iterator.next();

            if(key.equals("3")) {
               map.put("4", "Four");
            }
         }
      } catch(ConcurrentModificationException cme) {
         cme.printStackTrace();
      }
      System.out.println("ConcurrentHashMap after modification: " + map);

      map = new HashMap<String, String>();

      map.put("1", "One");
      map.put("2", "Two");
      map.put("3", "Three");
      map.put("5", "Five");
      map.put("6", "Six");

      System.out.println("Initial HashMap: " + map);
      iterator = map.keySet().iterator();

      try {

         while(iterator.hasNext()) {
            String key = iterator.next();

            if(key.equals("3")) {
               map.put("4", "Four");
            }
         }
         System.out.println("HashMap after modification: " + map);
      } catch(ConcurrentModificationException cme) {
         cme.printStackTrace();
      }
   }
}

这将产生以下结果。

This will produce the following result.

Output

Initial ConcurrentHashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
ConcurrentHashMap after modification: {1 = One, 2 = Two, 3 = Three, 4 = Four, 5 = Five, 6 = Six}
Initial HashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(Unknown Source)
	at java.util.HashMap$KeyIterator.next(Unknown Source)
	at TestThread.main(TestThread.java:48)

ConcurrentNavigableMap Interface

一个 java.util.concurrent.ConcurrentNavigableMap 接口是 ConcurrentMap 接口的子接口,并且支持 NavigableMap 操作,以及对其可导航子映射和近似匹配进行递归操作。

A java.util.concurrent.ConcurrentNavigableMap interface is a subinterface of ConcurrentMap interface, and supports NavigableMap operations, and recursively so for its navigable sub-maps, and approximate matches.

ConcurrentMap Methods

Sr.No.

Method & Description

1

NavigableSet<K> descendingKeySet() Returns a reverse order NavigableSet view of the keys contained in this map.

2

ConcurrentNavigableMap<K,V> descendingMap() Returns a reverse order view of the mappings contained in this map.

3

ConcurrentNavigableMap<K,V> headMap(K toKey) Returns a view of the portion of this map whose keys are strictly less than toKey.

4

ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive) Returns a view of the portion of this map whose keys are less than (or equal to, if inclusive is true) toKey.

5

NavigableSet<K> keySet() Returns a NavigableSet view of the keys contained in this map.

6

NavigableSet<K> navigableKeySet() Returns a NavigableSet view of the keys contained in this map.

7

ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) Returns a view of the portion of this map whose keys range from fromKey to toKey.

8

ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey) Returns a view of the portion of this map whose keys range from fromKey, inclusive, to toKey, exclusive.

9

ConcurrentNavigableMap<K,V> tailMap(K fromKey) Returns a view of the portion of this map whose keys are greater than or equal to fromKey.

10

ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive) Returns a view of the portion of this map whose keys are greater than (or equal to, if inclusive is true) fromKey.

Example

以下 TestThread 程序演示了 ConcurrentNavigableMap 的用法。

The following TestThread program shows usage of ConcurrentNavigableMap.

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class TestThread {

   public static void main(final String[] arguments) {
      ConcurrentNavigableMap<String,String> map =
         new ConcurrentSkipListMap<String, String>();

      map.put("1", "One");
      map.put("2", "Two");
      map.put("3", "Three");
      map.put("5", "Five");
      map.put("6", "Six");

      System.out.println("Initial ConcurrentHashMap: "+map);
      System.out.println("HeadMap(\"2\") of ConcurrentHashMap: "+map.headMap("2"));
      System.out.println("TailMap(\"2\") of ConcurrentHashMap: "+map.tailMap("2"));
      System.out.println(
         "SubMap(\"2\", \"4\") of ConcurrentHashMap: "+map.subMap("2","4"));
   }
}

这将产生以下结果。

This will produce the following result.

Output

Initial ConcurrentHashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
HeadMap("2") of ConcurrentHashMap: {1 = One}
TailMap("2") of ConcurrentHashMap: {2 = Two, 3 = Three, 5 = Five, 6 = Six}
SubMap("2", "4") of ConcurrentHashMap: {2 = Two, 3 = Three}