Java Concurrency 简明教程
Java Concurrency - Overview
Java 是一种多线程编程语言,这意味着我们可以使用 Java 开发多线程程序。多线程程序包含两个或更多可以同时运行的部分,每个部分都可以在同一时间处理不同的任务,从而优化利用可用资源,特别是在您的计算机有多个 CPU 时。
根据定义,多任务处理是多个进程共享公共处理资源(如 CPU)时发生的情况。多线程将多任务处理的概念扩展到应用程序中,您可以在其中将单个应用程序内的特定操作细分为各个线程。每个线程都可以并行运行。操作系统不仅在不同应用程序之间划分处理时间,还在应用程序内的每个线程之间划分处理时间。
通过使用多线程,您可以以在相同程序中可以并行执行多项活动的方式来编写代码。
Life Cycle of a Thread
线程在其生命周期中经过不同的阶段。例如,一个线程诞生、启动、运行,然后死亡。下图显示了线程的完整生命周期。
以下是生命周期的阶段 -
-
New - 新线程在其生命周期中处于新的状态。它一直保持此状态,直到程序启动线程。它也被称为 born thread。
-
Runnable - 新生的线程启动后,该线程变得可运行。处于此状态的线程被认为正在执行其任务。
-
Waiting - 有时,线程在等待另一个线程执行任务时会转变为等待状态。仅当其他线程发信号指示等待线程继续执行时,线程才会转换回可运行状态。
-
Timed Waiting - 可运行线程可以进入特定时间间隔的时间等待状态。当该时间间隔到期或它正在等待的事件发生时,处于此状态的线程会转换回可运行状态。
-
Terminated (Dead) - 当可运行线程完成其任务或以其他方式终止时,它会进入终止状态。
Thread Priorities
每个 Java 线程都有一个优先级,这有助于操作系统确定线程调度的顺序。
Java 线程优先级介于 MIN_PRIORITY(常量为 1)和 MAX_PRIORITY(常量为 10)之间。默认情况下,每个线程都获得 NORM_PRIORITY(常量为 5)优先级。
具有较高优先级的线程对程序而言更为重要,并且应该在较低优先级的线程之前分配处理器时间。但是,线程优先级不能保证线程执行的顺序,并且很大程度上取决于平台。
Create a Thread by Implementing a Runnable Interface
如果准备将您的类作为线程执行,那么可以通过实现 Runnable 接口实现此目的。您需要遵循三个基本步骤 −
Step 1
第一步,您需要实现 Runnable 接口提供的 run() 方法。此方法为线程提供了一个入口点,您需要将完整业务逻辑放入此方法内。以下是 run() 方法的一个简单语法:
public void run( )
Step 2
作为第二步,您将使用以下构造函数实例化一个 Thread 对象 −
Thread(Runnable threadObj, String threadName);
其中,threadObj 是实现 Runnable 接口的类的实例,threadName 是赋予新线程的名称。
Step 3
一旦创建了 Thread 对象,就可以通过调用 start() 方法来启动它,它执行对 run( ) 方法的调用。以下是 start() 方法的简单语法 -
void start();
Example
下面是创建一个新线程并开始运行它的示例:
class RunnableDemo implements Runnable {
private Thread t;
private String threadName;
RunnableDemo(String name) {
threadName = name;
System.out.println("Creating " + threadName );
}
public void run() {
System.out.println("Running " + threadName );
try {
for(int i = 4; i > 0; i--) {
System.out.println("Thread: " + threadName + ", " + i);
// Let the thread sleep for a while.
Thread.sleep(50);
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
}
public class TestThread {
public static void main(String args[]) {
RunnableDemo R1 = new RunnableDemo("Thread-1");
R1.start();
RunnableDemo R2 = new RunnableDemo("Thread-2");
R2.start();
}
}
这会产生以下结果 −
Output
Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
Create a Thread by Extending a Thread Class
创建线程的第二种方法是创建一个扩展 Thread 类的类,使用以下两个简单步骤。这种方法在使用 Thread 类中可用方法创建多个线程时提供了更多的灵活性。
Step 1
您需要覆盖 Thread 类中可用的 run( ) 方法。此方法为线程提供一个入口点,您将把您的完整业务逻辑放在这个方法中。以下是 run() 方法的简单语法:
public void run( )
Step 2
一旦创建 Thread 对象,便可以通过调用 start() 方法来启动它,该调用会执行一个 run( ) 方法调用。以下是 start() 方法的一个简单语法 −
void start( );
Example
以下是按照 Thread 扩展重写的先前程序:
class ThreadDemo extends Thread {
private Thread t;
private String threadName;
ThreadDemo(String name) {
threadName = name;
System.out.println("Creating " + threadName );
}
public void run() {
System.out.println("Running " + threadName );
try {
for(int i = 4; i > 0; i--) {
System.out.println("Thread: " + threadName + ", " + i);
// Let the thread sleep for a while.
Thread.sleep(50);
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
}
public class TestThread {
public static void main(String args[]) {
ThreadDemo T1 = new ThreadDemo("Thread-1");
T1.start();
ThreadDemo T2 = new ThreadDemo("Thread-2");
T2.start();
}
}
这会产生以下结果 −
Output
Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 4
Running Thread-2
Thread: Thread-2, 4
Thread: Thread-1, 3
Thread: Thread-2, 3
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
Java Concurrency - Environment Setup
在本章,我们将讨论为 Java 设置宜人环境的事物的不同方面。
Local Environment Setup
如果您仍然希望为 Java 编程语言设置您的环境,本文将指导您如何在您的机器上下载和设置 Java。以下是设置环境的步骤。
Java SE 可以通过链接 Download Java 免费获得。您可以根据您的操作系统下载一个版本。
按照说明下载 Java 并运行 .exe 以在您的机器上安装 Java。一旦您在您的机器上安装了 Java,您将需要设置环境变量来指向正确的安装目录 −
Popular Java Editors
要编写 Java 程序,您将需要一个文本编辑器。市场上有更复杂的 IDE。但现在,您可以考虑以下内容之一 −
-
Notepad − 在 Windows 机器上,您可以使用任何简单的文本编辑器,如记事本(推荐用于本教程)、TextPad。
-
Netbeans − 一个开源且免费的 Java IDE,可从 https://netbeans.org/index.html 下载。
-
Eclipse − 一个由 Eclipse 开源社区开发的 Java IDE,可从 https://www.eclipse.org/ 下载。
Java Concurrency - Major Operations
核心 Java 对多线程程序提供了完全控制。您可以开发多线程程序,根据您的要求完全暂停、恢复或停止。有各种静态方法,您可以在线程对象上使用它们来控制它们的 behavior。下表列出了这些方法 −
Sr.No. |
Method & Description |
1 |
public void suspend() 此方法会使一个线程进入挂起状态,可以使用 resume() 方法恢复。 |
2 |
public void stop() 此方法会完全停止一个线程。 |
3 |
public void resume() 此方法会恢复一个线程,该线程使用 suspend() 方法挂起。 |
4 |
public void wait() 导致当前线程等待,直至另一个线程调用 notify()。 |
5 |
public void notify() 唤醒正在此对象的监视器上等待的单个线程。 |
注意,Java 的最新版本已声明 suspend( )、resume( ) 和 stop( ) 方法已弃用,因此你需要使用可用的替代方法。
Example
class RunnableDemo implements Runnable {
public Thread t;
private String threadName;
boolean suspended = false;
RunnableDemo(String name) {
threadName = name;
System.out.println("Creating " + threadName );
}
public void run() {
System.out.println("Running " + threadName );
try {
for(int i = 10; i > 0; i--) {
System.out.println("Thread: " + threadName + ", " + i);
// Let the thread sleep for a while.
Thread.sleep(300);
synchronized(this) {
while(suspended) {
wait();
}
}
}
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
void suspend() {
suspended = true;
}
synchronized void resume() {
suspended = false;
notify();
}
}
public class TestThread {
public static void main(String args[]) {
RunnableDemo R1 = new RunnableDemo("Thread-1");
R1.start();
RunnableDemo R2 = new RunnableDemo("Thread-2");
R2.start();
try {
Thread.sleep(1000);
R1.suspend();
System.out.println("Suspending First Thread");
Thread.sleep(1000);
R1.resume();
System.out.println("Resuming First Thread");
R2.suspend();
System.out.println("Suspending thread Two");
Thread.sleep(1000);
R2.resume();
System.out.println("Resuming thread Two");
} catch (InterruptedException e) {
System.out.println("Main thread Interrupted");
} try {
System.out.println("Waiting for threads to finish.");
R1.t.join();
R2.t.join();
} catch (InterruptedException e) {
System.out.println("Main thread Interrupted");
}
System.out.println("Main thread exiting.");
}
}
以上程序产生以下输出 −
Output
Creating Thread-1
Starting Thread-1
Creating Thread-2
Starting Thread-2
Running Thread-1
Thread: Thread-1, 10
Running Thread-2
Thread: Thread-2, 10
Thread: Thread-1, 9
Thread: Thread-2, 9
Thread: Thread-1, 8
Thread: Thread-2, 8
Thread: Thread-1, 7
Thread: Thread-2, 7
Suspending First Thread
Thread: Thread-2, 6
Thread: Thread-2, 5
Thread: Thread-2, 4
Resuming First Thread
Suspending thread Two
Thread: Thread-1, 6
Thread: Thread-1, 5
Thread: Thread-1, 4
Thread: Thread-1, 3
Resuming thread Two
Thread: Thread-2, 3
Waiting for threads to finish.
Thread: Thread-1, 2
Thread: Thread-2, 2
Thread: Thread-1, 1
Thread: Thread-2, 1
Thread Thread-1 exiting.
Thread Thread-2 exiting.
Main thread exiting.
Interthread Communication
如果您了解进程间通信,那么您将很容易理解线程间通信。当您开发两个或更多线程交换某些信息的应用程序时,线程间通信非常重要。
有三个简单的方法和一个使线程通信成为可能的小技巧。所有三个方法如下所列 −
Sr.No. |
Method & Description |
1 |
public void wait() 导致当前线程等待,直至另一个线程调用 notify()。 |
2 |
public void notify() 唤醒正在此对象的监视器上等待的单个线程。 |
3 |
public void notifyAll() 唤醒所有在相同的对象上调用 wait( ) 的线程。 |
这些方法已作为 final 中的对象方法实现,因此在所有类中均可用。所有三个方法只能在 synchronized 上下文中调用。
Example
此示例显示了两个线程如何使用 wait() 和 notify() 方法进行通信。您可以使用相同概念创建复杂系统。
class Chat {
boolean flag = false;
public synchronized void Question(String msg) {
if (flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(msg);
flag = true;
notify();
}
public synchronized void Answer(String msg) {
if (!flag) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(msg);
flag = false;
notify();
}
}
class T1 implements Runnable {
Chat m;
String[] s1 = { "Hi", "How are you ?", "I am also doing fine!" };
public T1(Chat m1) {
this.m = m1;
new Thread(this, "Question").start();
}
public void run() {
for (int i = 0; i < s1.length; i++) {
m.Question(s1[i]);
}
}
}
class T2 implements Runnable {
Chat m;
String[] s2 = { "Hi", "I am good, what about you?", "Great!" };
public T2(Chat m2) {
this.m = m2;
new Thread(this, "Answer").start();
}
public void run() {
for (int i = 0; i < s2.length; i++) {
m.Answer(s2[i]);
}
}
}
public class TestThread {
public static void main(String[] args) {
Chat m = new Chat();
new T1(m);
new T2(m);
}
}
当上面的程序被编译并执行时,它会产生以下结果 -
Output
Hi
Hi
How are you ?
I am good, what about you?
I am also doing fine!
Great!
本文中的示例已从 [[role="bare"] [role="bare"]https://stackoverflow.com/questions/2170520/inter-thread-communication-in-java ] 中提取并修改。
Java Concurrency - Synchronization
Multithreading Example with Synchronization
这里有一个在序列中打印 counter 值的相同示例,我们每次运行它时,它都会产生相同的结果。
Example
class PrintDemo {
public void printCount() {
try {
for(int i = 5; i > 0; i--) {
System.out.println("Counter --- " + i );
}
} catch (Exception e) {
System.out.println("Thread interrupted.");
}
}
}
class ThreadDemo extends Thread {
private Thread t;
private String threadName;
PrintDemo PD;
ThreadDemo(String name, PrintDemo pd) {
threadName = name;
PD = pd;
}
public void run() {
synchronized(PD) {
PD.printCount();
}
System.out.println("Thread " + threadName + " exiting.");
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}
}
}
public class TestThread {
public static void main(String args[]) {
PrintDemo PD = new PrintDemo();
ThreadDemo T1 = new ThreadDemo("Thread - 1 ", PD);
ThreadDemo T2 = new ThreadDemo("Thread - 2 ", PD);
T1.start();
T2.start();
// wait for threads to end
try {
T1.join();
T2.join();
} catch (Exception e) {
System.out.println("Interrupted");
}
}
}
每次运行此程序都会产生相同的结果 −
Java Concurrency - Deadlock
死锁描述了两个或多个线程被永久阻塞的情况,等待彼此。当多个线程需要相同锁但按不同顺序获取它们时,会发生死锁。Java 多线程程序可能会受到死锁条件的影响,因为 synchronized 关键字会导致执行线程在等待与指定对象关联的锁或监视器的同时阻塞。这里有一个例子。
Example
public class TestThread {
public static Object Lock1 = new Object();
public static Object Lock2 = new Object();
public static void main(String args[]) {
ThreadDemo1 T1 = new ThreadDemo1();
ThreadDemo2 T2 = new ThreadDemo2();
T1.start();
T2.start();
}
private static class ThreadDemo1 extends Thread {
public void run() {
synchronized (Lock1) {
System.out.println("Thread 1: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 1: Waiting for lock 2...");
synchronized (Lock2) {
System.out.println("Thread 1: Holding lock 1 & 2...");
}
}
}
}
private static class ThreadDemo2 extends Thread {
public void run() {
synchronized (Lock2) {
System.out.println("Thread 2: Holding lock 2...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 2: Waiting for lock 1...");
synchronized (Lock1) {
System.out.println("Thread 2: Holding lock 1 & 2...");
}
}
}
}
}
当您编译并执行上述程序时,您会发现死锁情况,以下是程序产生的输出 −
Output
Thread 1: Holding lock 1...
Thread 2: Holding lock 2...
Thread 1: Waiting for lock 2...
Thread 2: Waiting for lock 1...
上述程序将永久挂起,因为该位置的线程既无法进行,又等待彼此释放锁,所以您可以通过按 CTRL+C 退出程序。
Deadlock Solution Example
让我们更改锁的顺序并运行相同的程序,看看这两个线程是否仍在互相等待 −
Example
public class TestThread {
public static Object Lock1 = new Object();
public static Object Lock2 = new Object();
public static void main(String args[]) {
ThreadDemo1 T1 = new ThreadDemo1();
ThreadDemo2 T2 = new ThreadDemo2();
T1.start();
T2.start();
}
private static class ThreadDemo1 extends Thread {
public void run() {
synchronized (Lock1) {
System.out.println("Thread 1: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 1: Waiting for lock 2...");
synchronized (Lock2) {
System.out.println("Thread 1: Holding lock 1 & 2...");
}
}
}
}
private static class ThreadDemo2 extends Thread {
public void run() {
synchronized (Lock1) {
System.out.println("Thread 2: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
System.out.println("Thread 2: Waiting for lock 2...");
synchronized (Lock2) {
System.out.println("Thread 2: Holding lock 1 & 2...");
}
}
}
}
}
因此,仅仅更改锁的顺序就可以防止程序进入死锁情况,并完成以下结果 −
Java Concurrency - ThreadLocal Class
ThreadLocal 类用于创建线程局部变量,该变量只能被同一线程读写。例如,如果两个线程正在访问对同一 threadLocal 变量有引用的代码,那么每个线程将看不到其他线程对 threadLocal 变量所做的任何修改。
ThreadLocal Methods
以下是 ThreadLocal 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public T get() 返回此线程局部变量当前线程副本中的值。 |
2 |
protected T initialValue() 返回此线程局部变量当前线程的“初始值”。 |
3 |
public void remove() 删除此线程局部变量的当前线程值。 |
4 |
public void set(T value) 将此线程本地变量的当前线程副本设置为指定的值。 |
Example
以下 TestThread 程序展示了 ThreadLocal 类的其中一些方法。此处我们使用了两个计数器变量,一个是普通变量,另一个是 ThreadLocal。
class RunnableDemo implements Runnable {
int counter;
ThreadLocal<Integer> threadLocalCounter = new ThreadLocal<Integer>();
public void run() {
counter++;
if(threadLocalCounter.get() != null) {
threadLocalCounter.set(threadLocalCounter.get().intValue() + 1);
} else {
threadLocalCounter.set(0);
}
System.out.println("Counter: " + counter);
System.out.println("threadLocalCounter: " + threadLocalCounter.get());
}
}
public class TestThread {
public static void main(String args[]) {
RunnableDemo commonInstance = new RunnableDemo();
Thread t1 = new Thread(commonInstance);
Thread t2 = new Thread(commonInstance);
Thread t3 = new Thread(commonInstance);
Thread t4 = new Thread(commonInstance);
t1.start();
t2.start();
t3.start();
t4.start();
// wait for threads to end
try {
t1.join();
t2.join();
t3.join();
t4.join();
} catch (Exception e) {
System.out.println("Interrupted");
}
}
}
这将产生以下结果。
ThreadLocalRandom Class
java.util.concurrent.ThreadLocalRandom 是从 jdk 1.7 引入的一个实用程序类,当多个线程或 ForkJoinTasks 需要生成随机数时非常有用。它能提升性能,争用比 Math.random() 方法少。
ThreadLocalRandom Methods
以下是 ThreadLocalRandom 类中重要的可用方法的列表。
Sr.No. |
Method & Description |
1 |
public static ThreadLocalRandom current() 返回当前线程的 ThreadLocalRandom。 |
2 |
protected int next(int bits) 生成下一个伪随机数。 |
3 |
public double nextDouble(double n) 返回介于 0(含)和指定值(不含)之间的均匀分布的伪随机 double 值。 |
4 |
public double nextDouble(double least, double bound) 返回介于给定的最小值(含)和边界(不含)之间的均匀分布的伪随机值。 |
5 |
public int nextInt(int least, int bound) 返回介于给定的最小值(含)和边界(不含)之间的均匀分布的伪随机值。 |
6 |
public long nextLong(long n) 返回介于 0(含)和指定值(不含)之间的均匀分布的伪随机值。 |
7 |
public long nextLong(long least, long bound) 返回介于给定的最小值(含)和边界(不含)之间的均匀分布的伪随机值。 |
8 |
public void setSeed(long seed) 抛出 UnsupportedOperationException。 |
Example
以下 TestThread 程序展示了 Lock 接口的其中一些方法。此处我们使用了 lock() 获取锁,使用了 unlock() 释放锁。
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.ThreadLocalRandom;
public class TestThread {
public static void main(final String[] arguments) {
System.out.println("Random Integer: " + new Random().nextInt());
System.out.println("Seeded Random Integer: " + new Random(15).nextInt());
System.out.println(
"Thread Local Random Integer: " + ThreadLocalRandom.current().nextInt());
final ThreadLocalRandom random = ThreadLocalRandom.current();
random.setSeed(15); //exception will come as seeding is not allowed in ThreadLocalRandom.
System.out.println("Seeded Thread Local Random Integer: " + random.nextInt());
}
}
这将产生以下结果。
Output
Random Integer: 1566889198
Seeded Random Integer: -1159716814
Thread Local Random Integer: 358693993
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.concurrent.ThreadLocalRandom.setSeed(Unknown Source)
at TestThread.main(TestThread.java:21)
此处我们使用了 ThreadLocalRandom 和 Random 类获取随机数。
Java Concurrency - Lock Interface
java.util.concurrent.locks.Lock 接口用作与同步块类似的线程同步机制。新的锁定机制更加灵活,并提供比同步块更多的选项。Lock 与同步块之间的主要区别如下 −
-
Guarantee of sequence − 同步块不保证等待线程获得访问权限的顺序。Lock 接口对它进行处理。
-
No timeout − 如果未授予锁,同步块没有超时选项。Lock 接口提供此类选项。
-
Single method − 同步块必须完全包含在一个方法中,而锁接口的方法 lock() 和 unlock() 可以分别位于不同方法中。
Lock Methods
以下是 Lock 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public void lock() 获得锁。 |
2 |
public void lockInterruptibly() 获取锁,除非当前线程被中断。 |
3 |
public Condition newCondition() 返回一个新的 Condition 实例,该实例绑定到此 Lock 实例。 |
4 |
public boolean tryLock() 仅在调用时锁空闲时才获取锁。 |
5 |
public boolean tryLock(long time, TimeUnit unit) 如果锁在给定的等待时间内是空闲的且当前线程没有被中断,则获取锁。 |
6 |
public void unlock() 释放锁。 |
Example
以下 TestThread 程序展示了 Lock 接口的其中一些方法。此处我们使用了 lock() 获取锁,使用了 unlock() 释放锁。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class PrintDemo {
private final Lock queueLock = new ReentrantLock();
public void print() {
queueLock.lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration / 1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.printf(
"%s printed the document successfully.\n", Thread.currentThread().getName());
queueLock.unlock();
}
}
}
class ThreadDemo extends Thread {
PrintDemo printDemo;
ThreadDemo(String name, PrintDemo printDemo) {
super(name);
this.printDemo = printDemo;
}
@Override
public void run() {
System.out.printf(
"%s starts printing a document\n", Thread.currentThread().getName());
printDemo.print();
}
}
public class TestThread {
public static void main(String args[]) {
PrintDemo PD = new PrintDemo();
ThreadDemo t1 = new ThreadDemo("Thread - 1 ", PD);
ThreadDemo t2 = new ThreadDemo("Thread - 2 ", PD);
ThreadDemo t3 = new ThreadDemo("Thread - 3 ", PD);
ThreadDemo t4 = new ThreadDemo("Thread - 4 ", PD);
t1.start();
t2.start();
t3.start();
t4.start();
}
}
这将产生以下结果。
Output
Thread - 1 starts printing a document
Thread - 4 starts printing a document
Thread - 3 starts printing a document
Thread - 2 starts printing a document
Thread - 1 Time Taken 4 seconds.
Thread - 1 printed the document successfully.
Thread - 4 Time Taken 3 seconds.
Thread - 4 printed the document successfully.
Thread - 3 Time Taken 5 seconds.
Thread - 3 printed the document successfully.
Thread - 2 Time Taken 4 seconds.
Thread - 2 printed the document successfully.
这里我们使用 ReentrantLock 类作为 Lock 接口的实现。ReentrantLock 类允许一个线程锁定方法,即使它已经在另一个方法上拥有锁。
Java Concurrency - ReadWriteLock Interface
一个 java.util.concurrent.locks.ReadWriteLock 接口允许多个线程一次读取,但一次只能有一个线程写入。
-
Read Lock − 如果没有线程对 ReadWriteLock 进行写锁定,则多个线程可以访问读锁定。
-
Write Lock − 如果没有线程正在读或写,则一个线程可以访问写锁定。
Lock Methods
以下是 Lock 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public Lock readLock() 返回用于读取的锁。 |
2 |
public Lock writeLock() 返回用于写入的锁。 |
Example
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestThread {
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private static String message = "a";
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new WriterA());
t1.setName("Writer A");
Thread t2 = new Thread(new WriterB());
t2.setName("Writer B");
Thread t3 = new Thread(new Reader());
t3.setName("Reader");
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
}
static class Reader implements Runnable {
public void run() {
if(lock.isWriteLocked()) {
System.out.println("Write Lock Present.");
}
lock.readLock().lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration / 1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() +": "+ message );
lock.readLock().unlock();
}
}
}
static class WriterA implements Runnable {
public void run() {
lock.writeLock().lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration / 1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
message = message.concat("a");
lock.writeLock().unlock();
}
}
}
static class WriterB implements Runnable {
public void run() {
lock.writeLock().lock();
try {
Long duration = (long) (Math.random() * 10000);
System.out.println(Thread.currentThread().getName()
+ " Time Taken " + (duration / 1000) + " seconds.");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
message = message.concat("b");
lock.writeLock().unlock();
}
}
}
}
这将产生以下结果。
Java Concurrency - Condition Interface
java.util.concurrent.locks.Condition 接口为线程提供暂停其执行的能力,直到给定的条件为真。一个 Condition 对象必然绑定到一个 Lock,并且通过 newCondition() 方法获得。
Condition Methods
以下是 Condition 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public void await() 导致当前线程等待,直到它被信号或中断。 |
2 |
public boolean await(long time, TimeUnit unit) 导致当前线程等待,直到它被信号或中断,或指定的等待时间经过。 |
3 |
public long awaitNanos(long nanosTimeout) 导致当前线程等待,直到它被信号或中断,或指定的等待时间经过。 |
4 |
public long awaitUninterruptibly() 导致当前线程等待,直到它被信号。 |
5 |
public long awaitUntil() 让当前线程等待,直到它被发出信号或被中断,或者指定的截止时间经过。 |
6 |
public void signal() 唤醒一个等待中的线程。 |
7 |
public void signalAll() 唤醒所有等待中的线程。 |
Example
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestThread {
public static void main(String[] args) throws InterruptedException {
ItemQueue itemQueue = new ItemQueue(10);
//Create a producer and a consumer.
Thread producer = new Producer(itemQueue);
Thread consumer = new Consumer(itemQueue);
//Start both threads.
producer.start();
consumer.start();
//Wait for both threads to terminate.
producer.join();
consumer.join();
}
static class ItemQueue {
private Object[] items = null;
private int current = 0;
private int placeIndex = 0;
private int removeIndex = 0;
private final Lock lock;
private final Condition isEmpty;
private final Condition isFull;
public ItemQueue(int capacity) {
this.items = new Object[capacity];
lock = new ReentrantLock();
isEmpty = lock.newCondition();
isFull = lock.newCondition();
}
public void add(Object item) throws InterruptedException {
lock.lock();
while(current >= items.length)
isFull.await();
items[placeIndex] = item;
placeIndex = (placeIndex + 1) % items.length;
++current;
//Notify the consumer that there is data available.
isEmpty.signal();
lock.unlock();
}
public Object remove() throws InterruptedException {
Object item = null;
lock.lock();
while(current <= 0) {
isEmpty.await();
}
item = items[removeIndex];
removeIndex = (removeIndex + 1) % items.length;
--current;
//Notify the producer that there is space available.
isFull.signal();
lock.unlock();
return item;
}
public boolean isEmpty() {
return (items.length == 0);
}
}
static class Producer extends Thread {
private final ItemQueue queue;
public Producer(ItemQueue queue) {
this.queue = queue;
}
@Override
public void run() {
String[] numbers =
{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"};
try {
for(String number: numbers) {
System.out.println("[Producer]: " + number);
}
queue.add(null);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
static class Consumer extends Thread {
private final ItemQueue queue;
public Consumer(ItemQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
do {
Object number = queue.remove();
System.out.println("[Consumer]: " + number);
if(number == null) {
return;
}
} while(!queue.isEmpty());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
这将产生以下结果。
Java Concurrency - AtomicInteger Class
java.util.concurrent.atomic.AtomicInteger 类提供对底层 int 值的操作,该值可以原子地读写,还包含高级原子操作。AtomicInteger 支持对底层 int 变量的原子操作。它有 get 和 set 方法,就像对 volatile 变量进行读写一样。也就是说,set 与同一变量上的任何后续 get 都有 happens-before 关系。atomic compareAndSet 方法也有这些内存一致性特性。
AtomicInteger Methods
以下是 AtomicInteger 类中可用的重要方法的列表。
Sr.No. |
Method & Description |
1 |
public int addAndGet(int delta) 以原子方式将给定值添加到当前值。 |
2 |
public boolean compareAndSet(int expect, int update) 如果当前值与预期值相同,则以原子方式将值设置为给定的更新值。 |
3 |
public int decrementAndGet() 以原子方式将当前值减一。 |
4 |
public double doubleValue() 将指定数字的值作为双精度浮点数返回。 |
5 |
public float floatValue() 将指定数字的值作为浮点数返回。 |
6 |
public int get() 获取当前值。 |
7 |
public int getAndAdd(int delta) 以原子方式将给定值添加到当前值。 |
8 |
public int getAndDecrement() 以原子方式将当前值减 1。 |
9 |
public int getAndIncrement() 以原子方式将当前值加 1。 |
10 |
public int getAndSet(int newValue) 以原子方式设置为给定值并返回旧值。 |
11 |
public int incrementAndGet() 以原子方式将当前值加 1。 |
12 |
public int intValue() 将指定数字的值作为 int 返回。 |
13 |
public void lazySet(int newValue) 最终设置为给定值。 |
14 |
public long longValue() 将指定数字的值作为长整型返回。 |
15 |
public void set(int newValue) 设置为给定值。 |
16 |
public String toString() 返回当前值的 String 表示形式。 |
17 |
public boolean weakCompareAndSet(int expect, int update) 如果当前值与预期值相同,则以原子方式将该值设置为给定更新值。 |
Example
以下 TestThread 程序显示了在线程环境中计数器的非安全实现。
public class TestThread {
static class Counter {
private int c = 0;
public void increment() {
c++;
}
public int value() {
return c;
}
}
public static void main(final String[] arguments) throws InterruptedException {
final Counter counter = new Counter();
//1000 threads
for(int i = 0; i < 1000 ; i++) {
new Thread(new Runnable() {
public void run() {
counter.increment();
}
}).start();
}
Thread.sleep(6000);
System.out.println("Final number (should be 1000): " + counter.value());
}
}
这可能会根据计算机速度和线程交错产生以下结果。
Example
import java.util.concurrent.atomic.AtomicInteger;
public class TestThread {
static class Counter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.getAndIncrement();
}
public int value() {
return c.get();
}
}
public static void main(final String[] arguments) throws InterruptedException {
final Counter counter = new Counter();
//1000 threads
for(int i = 0; i < 1000 ; i++) {
new Thread(new Runnable() {
public void run() {
counter.increment();
}
}).start();
}
Thread.sleep(6000);
System.out.println("Final number (should be 1000): " + counter.value());
}
}
这将产生以下结果。
Java Concurrency - AtomicLong Class
java.util.concurrent.atomic.AtomicLong 类在可以读取和写入原子的底层长值上提供操作,还包含高级原子操作。AtomicLong 支持对底层长变量的原子操作。它具有像读写 volatile 变量那样的 get 和 set 方法。也就是说,一个 set 与对同一变量的任何后续 get 都具有 happens-before 关系。atomic compareAndSet 方法还具有这些内存一致性特性。
AtomicLong Methods
以下是 AtomicLong 类中提供的最重要方法列表。
Sr.No. |
Method & Description |
1 |
public long addAndGet(long delta) 将给定值原子地加到当前值上。 |
2 |
public boolean compareAndSet(long expect, long update) 如果当前值与预期值相同时,原子地将值设置为给定的更新值。 |
3 |
public long decrementAndGet() 原子地将当前值减 1。 |
4 |
public double doubleValue() 将指定数字的值作为双精度浮点数返回。 |
5 |
public float floatValue() 将指定数字的值作为浮点数返回。 |
6 |
public long get() 获得当前值。 |
7 |
public long getAndAdd(long delta) 将给定值原子地加到当前值上。 |
8 |
public long getAndDecrement() 原子地将当前值减 1。 |
9 |
public long getAndIncrement() 原子地将当前值加 1。 |
10 |
public long getAndSet(long newValue) 原子地设置为给定的值,并返回旧值。 |
11 |
public long incrementAndGet() 原子地将当前值加 1。 |
12 |
public int intValue() 将指定数字的值作为 int 返回。 |
13 |
public void lazySet(long newValue) 最终设置为给定值。 |
14 |
public long longValue() 将指定数字的值作为长整型返回。 |
15 |
public void set(long newValue) 设置为给定值。 |
16 |
public String toString() 返回当前值的 String 表示形式。 |
17 |
public boolean weakCompareAndSet(long expect, long update) 如果当前值与预期值相同时,原子地将值设置为给定的更新值。 |
Example
TestThread 程序展示了在基于线程的环境中使用 AtomicLong 来安全实现计数器的过程。
import java.util.concurrent.atomic.AtomicLong;
public class TestThread {
static class Counter {
private AtomicLong c = new AtomicLong(0);
public void increment() {
c.getAndIncrement();
}
public long value() {
return c.get();
}
}
public static void main(final String[] arguments) throws InterruptedException {
final Counter counter = new Counter();
//1000 threads
for(int i = 0; i < 1000 ; i++) {
new Thread(new Runnable() {
public void run() {
counter.increment();
}
}).start();
}
Thread.sleep(6000);
System.out.println("Final number (should be 1000): " + counter.value());
}
}
这将产生以下结果。
Java Concurrency - AtomicBoolean Class
java.util.concurrent.atomic.AtomicBoolean 类在可以读取和写入原子的底层布尔值上提供操作,还包含高级原子操作。AtomicBoolean 支持对底层布尔变量的原子操作。它具有像读写 volatile 变量那样的 get 和 set 方法。也就是说,一个 set 与对同一变量的任何后续 get 都具有 happens-before 关系。atomic compareAndSet 方法还具有这些内存一致性特性。
AtomicBoolean Methods
以下是 AtomicBoolean 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public boolean compareAndSet(boolean expect, boolean update) 如果当前值 == 预期值,则以原子方式将值设置给定的更新后值。 |
2 |
public boolean get() 返回当前值。 |
3 |
public boolean getAndSet(boolean newValue) 以原子方式设置为给定的值并返回前一个值。 |
4 |
public void lazySet(boolean newValue) 最终设置为给定的值。 |
5 |
public void set(boolean newValue) 无条件设置为给定的值。 |
6 |
public String toString() 返回当前值的 String 表示形式。 |
7 |
public boolean weakCompareAndSet(boolean expect, boolean update) 如果当前值 == 预期值,则以原子方式将值设置给定的更新后值。 |
Example
以下 TestThread 程序显示了在基于线程的环境中使用 AtomicBoolean 变量。
import java.util.concurrent.atomic.AtomicBoolean;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
new Thread("Thread 1") {
public void run() {
while(true) {
System.out.println(Thread.currentThread().getName()
+" Waiting for Thread 2 to set Atomic variable to true. Current value is "
+ atomicBoolean.get());
if(atomicBoolean.compareAndSet(true, false)) {
System.out.println("Done!");
break;
}
}
};
}.start();
new Thread("Thread 2") {
public void run() {
System.out.println(Thread.currentThread().getName() +
", Atomic Variable: " +atomicBoolean.get());
System.out.println(Thread.currentThread().getName() +
" is setting the variable to true ");
atomicBoolean.set(true);
System.out.println(Thread.currentThread().getName() +
", Atomic Variable: " +atomicBoolean.get());
};
}.start();
}
}
这将产生以下结果。
Output
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 2, Atomic Variable: false
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Thread 2 is setting the variable to true
Thread 2, Atomic Variable: true
Thread 1 Waiting for Thread 2 to set Atomic variable to true. Current value is false
Done!
Java Concurrency - AtomicReference Class
java.util.concurrent.atomic.AtomicReference 类提供对基础对象引用的操作,该引用可被原子性地读写,并且还包含高级原子操作。AtomicReference 支持对基础对象引用变量的原子操作。它有类似对 volatile 变量的读写操作的 get 和 set 方法。也就是说,set 操作与任何对相同变量的后续 get 操作之间存在先行发生关系。原子 compareAndSet 方法也有这些内存一致性特征。
AtomicReference Methods
以下是 AtomicReference 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public boolean compareAndSet(V expect, V update) 如果当前值 == 预期值,则以原子方式将值设置给定的更新后值。 |
2 |
public boolean get() 返回当前值。 |
3 |
public boolean getAndSet(V newValue) 以原子方式设置为给定的值并返回前一个值。 |
4 |
public void lazySet(V newValue) 最终设置为给定的值。 |
5 |
public void set(V newValue) 无条件设置为给定的值。 |
6 |
public String toString() 返回当前值的 String 表示形式。 |
7 |
public boolean weakCompareAndSet(V expect, V update) 如果当前值 == 预期值,则以原子方式将值设置给定的更新后值。 |
Example
以下 TestThread 程序显示了在基于线程的环境中使用 AtomicReference 变量。
import java.util.concurrent.atomic.AtomicReference;
public class TestThread {
private static String message = "hello";
private static AtomicReference<String> atomicReference;
public static void main(final String[] arguments) throws InterruptedException {
atomicReference = new AtomicReference<String>(message);
new Thread("Thread 1") {
public void run() {
atomicReference.compareAndSet(message, "Thread 1");
message = message.concat("-Thread 1!");
};
}.start();
System.out.println("Message is: " + message);
System.out.println("Atomic Reference of Message is: " + atomicReference.get());
}
}
这将产生以下结果。
Java Concurrency - AtomicIntegerArray Class
java.util.concurrent.atomic.AtomicIntegerArray 类提供对基础 int 数组的操作,该数组可被原子性地读写,并且还包含高级原子操作。AtomicIntegerArray 支持对基础 int 数组变量的原子操作。它有类似对 volatile 变量的读写操作的 get 和 set 方法。也就是说,set 操作与任何对相同变量的后续 get 操作之间存在先行发生关系。原子 compareAndSet 方法也有这些内存一致性特征。
AtomicIntegerArray Methods
以下是 AtomicIntegerArray 类中可用的重要方法列表。
Sr.No. |
Method & Description |
1 |
public int addAndGet(int i, int delta) 以原子方式将给定的值添加到索引 i 处的元素。 |
2 |
public boolean compareAndSet(int i, int expect, int update) 如果当前值==期望值,则按原子方式将位置 i 处的元素设置为给定的更新值。 |
3 |
public int decrementAndGet(int i) 按原子方式将索引 i 处的元素递减 1。 |
4 |
public int get(int i) 按原子方式获取位置 i 处的当前值。 |
5 |
public int getAndAdd(int i, int delta) 按原子方式将给定值添加到索引 i 处的元素。 |
6 |
public int getAndDecrement(int i) 按原子方式将索引 i 处的元素递减 1。 |
7 |
public int getAndIncrement(int i) 按原子方式将索引 i 处的元素递增 1。 |
8 |
public int getAndSet(int i, int newValue) 按原子方式将位置 i 处的元素设置为给定值并返回旧值。 |
9 |
public int incrementAndGet(int i) 按原子方式将索引 i 处的元素递增 1。 |
10 |
public void lazySet(int i, int newValue) 最终将位置 i 处的元素设置为给定值。 |
11 |
public int length() 返回数组的长度。 |
12 |
public void set(int i, int newValue) 将位置 i 处的元素设置为给定的值。 |
13 |
public String toString() 返回数组当前值的字符串表示。 |
14 |
public boolean weakCompareAndSet(int i, int expect, int update) 如果当前值==期望值,则按原子方式将位置 i 处的元素设置为给定的更新值。 |
Example
以下 TestThread 程序演示了在基于线程的环境中使用 AtomicIntegerArray 变量。
import java.util.concurrent.atomic.AtomicIntegerArray;
public class TestThread {
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, 1);
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Values: ");
for (int i = 0; i<atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicIntegerArray.length(); i++) {
int add = atomicIntegerArray.incrementAndGet(i);
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: 3");
}
}
}
}
}
这将产生以下结果。
Output
Thread 10, index 0, value: 2
Thread 10, index 1, value: 2
Thread 10, index 2, value: 2
Thread 11, index 0, value: 3
Thread 10, index 3, value: 2
Thread 11, index 1, value: 3
Thread 11, index 2, value: 3
Thread 10, index 4, value: 2
Thread 11, index 3, value: 3
Thread 10, index 5, value: 2
Thread 10, index 6, value: 2
Thread 11, index 4, value: 3
Thread 10, index 7, value: 2
Thread 11, index 5, value: 3
Thread 10, index 8, value: 2
Thread 11, index 6, value: 3
Thread 10, index 9, value: 2
Thread 11, index 7, value: 3
Thread 11, index 8, value: 3
Thread 11, index 9, value: 3
Values:
3 3 3 3 3 3 3 3 3 3
Java Concurrency - AtomicLongArray Class
java.util.concurrent.atomic.AtomicLongArray 类提供了可在底层长数组进行原子读取和写入的操作,还包含一些高级原子操作。AtomicLongArray 支持对底层 long 数组变量的原子操作。它有类似于对 volatile 变量进行读写操作的 get 和 set 方法。这意味着 set 与对同一变量的任何后续 get 具有先发生后发生关系。atomic compareAndSet 方法也有这些内存一致性特性。
AtomicLongArray Methods
以下是 AtomicLongArray 类中提供的一些重要方法。
Sr.No. |
Method & Description |
1 |
public long addAndGet(int i, long delta) 按原子方式将给定值添加到索引 i 处的元素。 |
2 |
public boolean compareAndSet(int i, long expect, long update) 如果当前值==期望值,则按原子方式将位置 i 处的元素设置为给定的更新值。 |
3 |
public long decrementAndGet(int i) 按原子方式将索引 i 处的元素递减 1。 |
4 |
public long get(int i) 获得位置 i 的当前值。 |
5 |
public long getAndAdd(int i, long delta) 原子地将给定值添加到索引 i 处的元素中。 |
6 |
public long getAndDecrement(int i) 原子地使索引 i 处的元素递减 1。 |
7 |
public long getAndIncrement(int i) 原子地使索引 i 处的元素递增 1。 |
8 |
public long getAndSet(int i, long newValue) 原子地将位置 i 处的元素设置为给定值,并返回旧值。 |
9 |
public long incrementAndGet(int i) 原子地使索引 i 处的元素递减 1。 |
10 |
public void lazySet(int i, long newValue) 最终将位置 i 处的元素设置为给定值。 |
11 |
public int length() 返回数组的长度。 |
12 |
public void set(int i, long newValue) 将位置 i 处的元素设置为给定值。 |
13 |
public String toString() 返回数组当前值的字符串表示。 |
14 |
public boolean weakCompareAndSet(int i, long expect, long update) 如果当前值 == 预期值,则原子地将位置 i 处的元素设置为给定的更新值。 |
Example
以下 TestThread 程序演示了在基于线程的环境中使用 AtomicIntegerArray 变量。
import java.util.concurrent.atomic.AtomicLongArray;
public class TestThread {
private static AtomicLongArray atomicLongArray = new AtomicLongArray(10);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicLongArray.length(); i++) {
atomicLongArray.set(i, 1);
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Values: ");
for (int i = 0; i<atomicLongArray.length(); i++) {
System.out.print(atomicLongArray.get(i) + " ");
}
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicLongArray.length(); i++) {
long add = atomicLongArray.incrementAndGet(i);
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicLongArray.length(); i++) {
boolean swapped = atomicLongArray.compareAndSet(i, 2, 3);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: 3");
}
}
}
}
}
这将产生以下结果。
Output
Thread 9, index 0, value: 2
Thread 10, index 0, value: 3
Thread 9, index 1, value: 2
Thread 9, index 2, value: 2
Thread 9, index 3, value: 2
Thread 9, index 4, value: 2
Thread 10, index 1, value: 3
Thread 9, index 5, value: 2
Thread 10, index 2, value: 3
Thread 9, index 6, value: 2
Thread 10, index 3, value: 3
Thread 9, index 7, value: 2
Thread 10, index 4, value: 3
Thread 9, index 8, value: 2
Thread 9, index 9, value: 2
Thread 10, index 5, value: 3
Thread 10, index 6, value: 3
Thread 10, index 7, value: 3
Thread 10, index 8, value: 3
Thread 10, index 9, value: 3
Values:
3 3 3 3 3 3 3 3 3 3
AtomicReferenceArray Class
java.util.concurrent.atomic.AtomicReferenceArray 类在基础引用数组上提供操作,这些操作可以原子地进行读和写,还包含高级原子操作。AtomicReferenceArray 支持对基础引用数组变量进行原子操作。它有 get 和 set 方法,这些方法的工作方式类似于对 volatile 变量进行读取和写入。也就是说,set 与对同一变量的任何后续 get 具有存在先行关系。原子 compareAndSet 方法也具有这些内存一致性特性。
AtomicReferenceArray Methods
以下是 AtomicReferenceArray 类中可用一些重要方法的列表。
Sr.No. |
Method & Description |
1 |
public boolean compareAndSet(int i, E expect, E update) 如果当前值 == 预期值,则原子地将位置 i 处的元素设置为给定的更新值。 |
2 |
public E get(int i) 获得位置 i 的当前值。 |
3 |
public E getAndSet(int i, E newValue) 原子地将位置 i 处的元素设置为给定值,并返回旧值。 |
4 |
public void lazySet(int i, E newValue) 最终将位置 i 处的元素设置为给定值。 |
5 |
public int length() 返回数组的长度。 |
6 |
public void set(int i, E newValue) 将位置 i 处的元素设置为给定值。 |
7 |
public String toString() 返回数组当前值的字符串表示。 |
8 |
public boolean weakCompareAndSet(int i, E expect, E update) 如果当前值 == 预期值,则原子地将位置 i 处的元素设置为给定的更新值。 |
Example
以下 TestThread 程序展示了在基于线程的环境中,AtomicReferenceArray 变量的用法。
import java.util.concurrent.atomic.AtomicReferenceArray;
public class TestThread {
private static String[] source = new String[10];
private static AtomicReferenceArray<String> atomicReferenceArray
= new AtomicReferenceArray<String>(source);
public static void main(final String[] arguments) throws InterruptedException {
for (int i = 0; i<atomicReferenceArray.length(); i++) {
atomicReferenceArray.set(i, "item-2");
}
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
}
static class Increment implements Runnable {
public void run() {
for(int i = 0; i<atomicReferenceArray.length(); i++) {
String add = atomicReferenceArray.getAndSet(i,"item-"+ (i+1));
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ add);
}
}
}
static class Compare implements Runnable {
public void run() {
for(int i = 0; i<atomicReferenceArray.length(); i++) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", value: "+ atomicReferenceArray.get(i));
boolean swapped = atomicReferenceArray.compareAndSet(i, "item-2", "updated-item-2");
System.out.println("Item swapped: " + swapped);
if(swapped) {
System.out.println("Thread " + Thread.currentThread().getId()
+ ", index " +i + ", updated-item-2");
}
}
}
}
}
这将产生以下结果。
Output
Thread 9, index 0, value: item-2
Thread 10, index 0, value: item-1
Item swapped: false
Thread 10, index 1, value: item-2
Item swapped: true
Thread 9, index 1, value: updated-item-2
Thread 10, index 1, updated-item-2
Thread 10, index 2, value: item-3
Item swapped: false
Thread 10, index 3, value: item-2
Item swapped: true
Thread 10, index 3, updated-item-2
Thread 10, index 4, value: item-2
Item swapped: true
Thread 10, index 4, updated-item-2
Thread 10, index 5, value: item-2
Item swapped: true
Thread 10, index 5, updated-item-2
Thread 10, index 6, value: item-2
Thread 9, index 2, value: item-2
Item swapped: true
Thread 9, index 3, value: updated-item-2
Thread 10, index 6, updated-item-2
Thread 10, index 7, value: item-2
Thread 9, index 4, value: updated-item-2
Item swapped: true
Thread 9, index 5, value: updated-item-2
Thread 10, index 7, updated-item-2
Thread 9, index 6, value: updated-item-2
Thread 10, index 8, value: item-2
Thread 9, index 7, value: updated-item-2
Item swapped: true
Thread 9, index 8, value: updated-item-2
Thread 10, index 8, updated-item-2
Thread 9, index 9, value: item-2
Thread 10, index 9, value: item-10
Item swapped: false
Java Concurrency - Executor Interface
java.util.concurrent.Executor 接口是一个支持启动新任务的简单接口。
ExecutorService Methods
Sr.No. |
Method & Description |
1 |
void execute(Runnable command) 在将来的某个时间执行给定的命令。 |
Example
以下 TestThread 程序展示了在基于线程的环境中使用 Executor 接口。
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
Executor executor = Executors.newCachedThreadPool();
executor.execute(new Task());
ThreadPoolExecutor pool = (ThreadPoolExecutor)executor;
pool.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
ExecutorService Interface
java.util.concurrent.ExecutorService 接口是 Executor 接口的子接口,它增加了在单个任务和执行器本身上管理生命周期所需的功能。
ExecutorService Methods
Sr.No. |
Method & Description |
1 |
boolean awaitTermination(long timeout, TimeUnit unit) 在关闭请求之后或超时发生或当前线程中断后,才会阻塞,以先发生者为准。 |
2 |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 执行已给定的任务,返回在所有任务完成后包含其状态和结果的 Future 列表。 |
3 |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 执行已给定的任务,返回在所有任务完成后或超时到期后包含其状态和结果的 Future 列表,以先发生者为准。 |
4 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks) 执行已给定的任务,返回已成功完成的任务的结果(即未抛出异常),如果有的。 |
5 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 执行已给定的任务,返回已在给定超时时间内完成的任务的结果(即未抛出异常),如果有的。 |
6 |
boolean isShutdown() 如果此执行器已关闭,则返回 true。 |
7 |
boolean isTerminated() 如果有任务在关闭后完成,则返回 true。 |
8 |
void shutdown() 启动有序关闭,其中已提交的任务执行,但不会接纳新任务。 |
9 |
List<Runnable> shutdownNow() 尝试停止所有正在积极执行的任务,停止处理等待的任务,并返回等待执行的任务的列表。 |
10 |
<T> Future<T> submit(Callable<T> task) 提交一个返回值的的任务以执行,并返回一个表示任务的待处理结果的 Future。 |
11 |
Future<?> submit(Runnable task) 提交一个可运行的任务以执行,并返回一个表示该任务的 Future。 |
12 |
<T> Future<T> submit(Runnable task, T result) 提交一个可运行的任务以执行,并返回一个表示该任务的 Future。 |
Example
以下 TestThread 程序展示了在基于线程的环境中使用 ExecutorService 接口。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.submit(new Task());
System.out.println("Shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.err.println("tasks interrupted");
} finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 20);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
Output
Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:302)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
at TestThread$Task.run(TestThread.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
ScheduledExecutorService Interface
java.util.concurrent.ScheduledExecutorService 接口是 ExecutorService 接口的子接口,并支持任务的未来和/或定期执行。
ScheduledExecutorService Methods
Sr.No. |
Method & Description |
1 |
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 创建和执行一个 ScheduledFuture,该 ScheduledFuture 在给定的延迟后才启用。 |
2 |
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 创建和执行一个一次性操作,该操作在给定的延迟后才启用。 |
3 |
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 创建和执行一个周期性操作,该操作在给定的初始延迟后才首次启用,随后在给定的周期内启用;即执行将在 initialDelay、initialDelay+period、initialDelay + 2 * period 等时间点后开始。 |
4 |
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 在给定的初始延迟后首次启用,在一次执行终止与下一次执行开始之间有给定延迟的周期性操作。 |
Example
以下 TestThread 程序展示了在基于线程的环境中使用 ScheduledExecutorService 接口。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
这将产生以下结果。
newFixedThreadPool Method
可以通过调用 Executors 类的静态 newFixedThreadPool() 方法获得一个固定线程池。
Syntax
ExecutorService fixedPool = Executors.newFixedThreadPool(2);
其中
-
最多有 2 个线程激活以处理任务。
-
如果提交的线程超过 2 个,则将它们保存在队列中,直到线程变为可用。
-
如果线程因为执行期间的故障而终止,则会创建新线程以取代它(尚未调用执行器的关闭)。
-
任何线程在池关闭之前都会存在。
Example
以下 TestThread 程序展示了在基于线程的环境中使用 newFixedThreadPool 方法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Cast the object to its class type
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
//Stats before tasks execution
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.submit(new Task());
executor.submit(new Task());
//Stats after tasks execution
System.out.println("Core threads: " + pool.getCorePoolSize());
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task! Thread Name: " +
Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed! Thread Name: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
Output
Largest executions: 0
Maximum allowed threads: 2
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 2
Largest executions: 2
Maximum allowed threads: 2
Current threads in pool: 2
Currently executing threads: 1
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-1
Running Task! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-1
newCachedThreadPool Method
可以通过调用 Executors 类的静态 newCachedThreadPool() 方法获得一个缓存线程池。
Syntax
ExecutorService executor = Executors.newCachedThreadPool();
其中
-
newCachedThreadPool 方法创建拥有可扩展线程池的执行器。
-
这种执行器适合于启动许多短生存期任务的应用程序。
Example
以下 TestThread 程序展示了在基于线程的环境中使用 newCachedThreadPool 方法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
// Cast the object to its class type
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
//Stats before tasks execution
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.submit(new Task());
executor.submit(new Task());
//Stats after tasks execution
System.out.println("Core threads: " + pool.getCorePoolSize());
System.out.println("Largest executions: "
+ pool.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ pool.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ pool.getPoolSize());
System.out.println("Currently executing threads: "
+ pool.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ pool.getTaskCount());
executor.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task! Thread Name: " +
Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed! Thread Name: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
Output
Largest executions: 0
Maximum allowed threads: 2147483647
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 0
Largest executions: 2
Maximum allowed threads: 2147483647
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-1
Running Task! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-2
Task Completed! Thread Name: pool-1-thread-1
newScheduledThreadPool Method
可以通过调用 Executors 类的静态 newScheduledThreadPool() 方法获得预定线程池。
Example
以下 TestThread 程序显示了在线程环境中 newScheduledThreadPool 方法的用法。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
这将产生以下结果。
newSingleThreadExecutor Method
可以通过调用 Executors 类的静态 newSingleThreadExecutor() 方法获得一个单线程池。
Syntax
ExecutorService executor = Executors.newSingleThreadExecutor();
其中 newSingleThreadExecutor 方法创建一个一次执行一个任务的执行器。
Example
以下 TestThread 程序展示了在基于线程的环境中使用 newSingleThreadExecutor 方法。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
executor.submit(new Task());
System.out.println("Shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.err.println("tasks interrupted");
} finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 20);
System.out.println("Running Task!");
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
Output
Shutdown executor
Running Task!
shutdown finished
cancel non-finished tasks
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:302)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:328)
at TestThread$Task.run(TestThread.java:39)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
ThreadPoolExecutor Class
java.util.concurrent.ThreadPoolExecutor 是 ExecutorService,使用多个池线程中一个线程(通常由 Executors 工厂方法配置)执行每个提交的任务。它还提供各种实用方法来检查当前线程统计信息并控制它们。
ThreadPoolExecutor Methods
Sr.No. |
Method & Description |
1 |
protected void afterExecute(Runnable r, Throwable t) 给定的Runnable执行完毕后调用的方法。 |
2 |
void allowCoreThreadTimeOut(boolean value) 设置控制核心线程是否在没有任务在保留时间内到达的情况下超时并终止,并在需要时在有新任务到达时将其替换的策略。 |
3 |
boolean allowsCoreThreadTimeOut() 如果此线程池允许核心线程在没有任务在保留时间内到达的情况下超时并终止,并且并在需要时在有新任务到达时将其替换,则返回true。 |
4 |
boolean awaitTermination(long timeout, TimeUnit unit) 在关闭请求之后或超时发生或当前线程中断后,才会阻塞,以先发生者为准。 |
5 |
protected void beforeExecute(Thread t, Runnable r) 在给定线程中执行给定Runnable之前调用的方法。 |
6 |
void execute(Runnable command) 稍后执行给定的任务。 |
7 |
protected void finalize() 如果此执行器不再被引用并且没有线程时,调用关机。 |
8 |
int getActiveCount() 返回正在主动执行任务的线程的大概数量。 |
9 |
long getCompletedTaskCount() 返回已完成执行任务的大概总数。 |
10 |
int getCorePoolSize() 返回核心线程数。 |
11 |
long getKeepAliveTime(TimeUnit unit) 返回线程保留活动时间,即超出核心线程池大小的线程在被终止前可以保持空闲状态的时间量。 |
12 |
int getLargestPoolSize() 返回池中同时存在过的最大线程数。 |
13 |
int getMaximumPoolSize() 返回允许的最大线程数。 |
14 |
int getPoolSize() 返回池中的当前线程数。 |
15 |
BlockingQueue getQueue() 返回此执行器使用的任务队列。 |
15 |
RejectedExecutionHandler getRejectedExecutionHandler() 返回当前无法执行的任务的处理程序。 |
16 |
long getTaskCount() 返回已计划执行的任务的大概总数。 |
17 |
ThreadFactory getThreadFactory() 返回用于创建新线程的线程工厂。 |
18 |
boolean isShutdown() 如果此执行器已关闭,则返回 true。 |
19 |
boolean isTerminated() 如果有任务在关闭后完成,则返回 true。 |
20 |
boolean isTerminating() 如果此执行器正在shutdown()或shutdownNow()后终止进程,但尚未完全终止,则返回true。 |
21 |
int prestartAllCoreThreads() 启动所有核心线程,使它们空闲地等待工作。 |
22 |
boolean prestartCoreThread() 启动一个核心线程,使其空闲等待工作。 |
23 |
void purge() 尝试从工作队列中移除所有已取消的 Future 任务。 |
24 |
boolean remove(Runnable task) 如果存在,则从执行器的内部队列中移除该任务,因此如果任务尚未启动,则不会运行该任务。 |
25 |
void setCorePoolSize(int corePoolSize) 设置线程的核心数。 |
26 |
void setKeepAliveTime(long time, TimeUnit unit) 设置线程在终止前可以保持空闲的时间限制。 |
27 |
void setMaximumPoolSize(int maximumPoolSize) 设置允许的最大线程数。 |
28 |
void setRejectedExecutionHandler(RejectedExecutionHandler handler) 为不可执行的任务设置新的处理程序。 |
29 |
void setThreadFactory(ThreadFactory threadFactory) 设置用于创建新线程的线程工厂。 |
30 |
void shutdown() 启动有序关闭,其中已提交的任务执行,但不会接纳新任务。 |
31 |
List<Runnable> shutdownNow() 尝试停止所有正在积极执行的任务,停止处理等待的任务,并返回等待执行的任务的列表。 |
32 |
protected void terminated() 当 Executor 终止时调用的方法。 |
33 |
String toString() 返回标识此线程池的字符串,以及它的状态,包括运行状态和估计的工作程序和任务数的指示。 |
Example
以下 TestThread 程序展示了在基于线程的环境中使用 ThreadPoolExecutor 接口。
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
//Stats before tasks execution
System.out.println("Largest executions: "
+ executor.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ executor.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ executor.getPoolSize());
System.out.println("Currently executing threads: "
+ executor.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ executor.getTaskCount());
executor.submit(new Task());
executor.submit(new Task());
//Stats after tasks execution
System.out.println("Core threads: " + executor.getCorePoolSize());
System.out.println("Largest executions: "
+ executor.getLargestPoolSize());
System.out.println("Maximum allowed threads: "
+ executor.getMaximumPoolSize());
System.out.println("Current threads in pool: "
+ executor.getPoolSize());
System.out.println("Currently executing threads: "
+ executor.getActiveCount());
System.out.println("Total number of threads(ever scheduled): "
+ executor.getTaskCount());
executor.shutdown();
}
static class Task implements Runnable {
public void run() {
try {
Long duration = (long) (Math.random() * 5);
System.out.println("Running Task! Thread Name: " +
Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(duration);
System.out.println("Task Completed! Thread Name: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
Output
Largest executions: 0
Maximum allowed threads: 2147483647
Current threads in pool: 0
Currently executing threads: 0
Total number of threads(ever scheduled): 0
Core threads: 0
Largest executions: 2
Maximum allowed threads: 2147483647
Current threads in pool: 2
Currently executing threads: 2
Total number of threads(ever scheduled): 2
Running Task! Thread Name: pool-1-thread-2
Running Task! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-1
Task Completed! Thread Name: pool-1-thread-2
ScheduledThreadPoolExecutor Class
java.util.concurrent.ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,它还可以计划命令在给定的延迟后运行,或者定期执行。
ScheduledThreadPoolExecutor Methods
Sr.No. |
Method & Description |
1 |
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) 修改或替换用于执行可调用的任务。 |
2 |
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) 修改或替换用于执行可运行的任务。 |
3 |
void execute(Runnable command) 以零所需延迟执行命令。 |
4 |
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() 获取即使此执行器已关闭也继续执行现有周期性任务的策略。 |
5 |
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() 获取即使此执行器已关闭也执行现有延迟任务的策略。 |
6 |
BlockingQueue<Runnable> getQueue() 返回值执行器使用的任务队列。 |
7 |
boolean getRemoveOnCancelPolicy() 获取取消的任务是否应该在取消时立即从工作队列中移除的策略。 |
8 |
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 创建和执行一个 ScheduledFuture,该 ScheduledFuture 在给定的延迟后才启用。 |
9 |
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 创建和执行一个一次性操作,该操作在给定的延迟后才启用。 |
10 |
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 创建和执行一个周期性操作,该操作在给定的初始延迟后才首次启用,随后在给定的周期内启用;即执行将在 initialDelay、initialDelay+period、initialDelay + 2 * period 等时间点后开始。 |
11 |
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 在给定的初始延迟后首次启用,在一次执行终止与下一次执行开始之间有给定延迟的周期性操作。 |
12 |
void setContinueExistingPeriodicTasksAfterShutdownPolicy (boolean value) 设置即使此执行器已关闭也继续执行现有周期性任务的策略。 |
13 |
void setExecuteExistingDelayedTasksAfterShutdownPolicy (boolean value) 设置即使此执行器已关闭也执行现有延迟任务的策略。 |
14 |
void setRemoveOnCancelPolicy(boolean value) 设置取消的任务是否应该在取消时立即从工作队列中移除的策略。 |
15 |
void shutdown() 启动有序关闭,其中已提交的任务执行,但不会接纳新任务。 |
16 |
List<Runnable> shutdownNow() 尝试停止所有正在积极执行的任务,停止处理等待的任务,并返回等待执行的任务的列表。 |
17 |
<T> Future<T> submit(Callable<T> task) 提交一个返回值的的任务以执行,并返回一个表示任务的待处理结果的 Future。 |
18 |
Future<?> submit(Runnable task) 提交一个可运行的任务以执行,并返回一个表示该任务的 Future。 |
19 |
<T> Future<T> submit(Runnable task, T result) 提交一个可运行的任务以执行,并返回一个表示该任务的 Future。 |
Example
以下 TestThread 程序展示了在基于线程的环境中使用 ScheduledThreadPoolExecutor 接口。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler =
(ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);
final ScheduledFuture<?> beepHandler =
scheduler.scheduleAtFixedRate(new BeepTask(), 2, 2, TimeUnit.SECONDS);
scheduler.schedule(new Runnable() {
@Override
public void run() {
beepHandler.cancel(true);
scheduler.shutdown();
}
}, 10, TimeUnit.SECONDS);
}
static class BeepTask implements Runnable {
public void run() {
System.out.println("beep");
}
}
}
这将产生以下结果。
Java Concurrency - Futures and Callables
java.util.concurrent.Callable 对象可以返回由线程完成的计算结果,而 runnable 接口只能运行该线程。 Callable 对象返回 Future 对象,该对象提供用于监视线程执行的任务进度的的方法。 Future 对象可用于检查 Callable 的状态,然后在线程完成后从 Callable 中检索结果。它还提供了超时功能。
Syntax
//submit the callable using ThreadExecutor
//and get the result as a Future object
Future<Long> result10 = executor.submit(new FactorialService(10));
//get the result using get method of the Future object
//get method waits till the thread execution and then return the result of the execution.
Long factorial10 = result10.get();
Example
以下 TestThread 程序展示了在基于线程的环境中使用 Future 和 Callable。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
System.out.println("Factorial Service called for 10!");
Future<Long> result10 = executor.submit(new FactorialService(10));
System.out.println("Factorial Service called for 20!");
Future<Long> result20 = executor.submit(new FactorialService(20));
Long factorial10 = result10.get();
System.out.println("10! = " + factorial10);
Long factorial20 = result20.get();
System.out.println("20! = " + factorial20);
executor.shutdown();
}
static class FactorialService implements Callable<Long> {
private int number;
public FactorialService(int number) {
this.number = number;
}
@Override
public Long call() throws Exception {
return factorial();
}
private Long factorial() throws InterruptedException {
long result = 1;
while (number != 0) {
result = number * result;
number--;
Thread.sleep(100);
}
return result;
}
}
}
这将产生以下结果。
Java Concurrency - Fork-Join framework
fork-join 框架允许在多个工作进程上中断某个任务,然后等待结果以合并它们。它很大程度上利用了多处理器的计算机容量。以下是 fork-join 框架中使用的核心概念和对象。
RecursiveTask
RecursiveTask 表示返回某个值的某个任务。
Syntax
class Sum extends RecursiveTask<Long> {
@Override
protected Long compute() { return null; }
}
Example
以下 TestThread 程序演示了线程基础环境中 Fork-Join 框架的用法。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i = 0; i < numbers.length; i++) {
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i = low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low) / 2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
这将产生以下结果。
Java Concurrency - BlockingQueue Interface
java.util.concurrent.BlockingQueue 接口是 Queue 接口的子接口,并且另外支持涉及在检索某个元素之前等待队列变为非空,并在存储某个元素之前等待队列中可用的空间等操作。
BlockingQueue Methods
Sr.No. |
Method & Description |
1 |
boolean add(E e) 如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,则返回 true,如果当前没有可用空间,则返回 true 并抛出 IllegalStateException。 |
2 |
boolean contains(Object o) 如果此队列包含指定的元素,则返回 true。 |
3 |
int drainTo(Collection<? super E> c) 从此队列中删除所有可用的元素,并将它们添加到给定的集合中。 |
4 |
int drainTo(Collection<? super E> c, int maxElements) 从此队列中最多删除给定数量的可用的元素,并将它们添加到给定的集合中。 |
5 |
boolean offer(E e) 如果可以在不违反容量限制的情况下立即将指定的元素插入到此队列中,则返回 true,如果当前没有可用空间,则返回 false。 |
6 |
boolean offer(E e, long timeout, TimeUnit unit) 插入指定的元素到此队列中,如果需要,等待相应的时间直到有可用空间为止。 |
7 |
E poll(long timeout, TimeUnit unit) 检索并删除此队列的头部,如果需要,等待相应的时间直到有元素可用为止。 |
8 |
void put(E e) 插入指定的元素到此队列中,如果需要,等待直到有可用空间为止。 |
9 |
int remainingCapacity() 返回此队列在理想条件下(不存在内存或资源限制)可以接受的元素数量,如果没有固有限制,则返回 Integer.MAX_VALUE。 |
10 |
boolean remove(Object o) 从此队列中删除指定的元素的一个实例(如果存在的话)。 |
11 |
E take() 检索并删除此队列的头部,如果需要,等待直到有元素可用为止。 |
Example
以下 TestThread 程序演示了线程基础环境中 BlockingQueue 接口的用法。
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
static class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
try {
int result = random.nextInt(100);
Thread.sleep(1000);
queue.put(result);
System.out.println("Added: " + result);
result = random.nextInt(100);
Thread.sleep(1000);
queue.put(result);
System.out.println("Added: " + result);
result = random.nextInt(100);
Thread.sleep(1000);
queue.put(result);
System.out.println("Added: " + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("Removed: " + queue.take());
System.out.println("Removed: " + queue.take());
System.out.println("Removed: " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这将产生以下结果。
Java Concurrency - ConcurrentMap Interface
java.util.concurrent.ConcurrentMap 接口是 Map 接口的子接口,支持对基础映射变量执行原子操作。它具有类似于易失变量上的读写操作的 get 和 set 方法。也就是说,某项设置与对同一变量执行的任何后续 get 操作之间存在发生在前关系。此接口确保线程安全和原子性保证。
ConcurrentMap Methods
Sr.No. |
Method & Description |
1 |
default V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) 尝试计算指定键及其当前映射值的映射(如果没有当前映射,则为 null)。 |
2 |
default V computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction) 如果指定的键尚未与某个值关联(或已映射至 null),则尝试使用给定的映射函数计算它的值,并且如果该值为非 null,则将它输入到此映射。 |
3 |
default V computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) 如果针对指定键的值已存在且为非 null,则尝试在提供键及其当前映射值的情况下计算一个新映射。 |
4 |
default void forEach(BiConsumer<? super K,? super V> action) 对该映射中的每个条目执行给定的操作,直到处理完所有条目或该操作引发异常。 |
5 |
default V getOrDefault(Object key, V defaultValue) 返回指定键映射到的值,如果此映射不包含用于该键的映射,则返回 defaultvalue。 |
6 |
default V merge(K key, V value, BiFunction<? super V,? super V,? extends V> remappingFunction) 如果指定的键尚未与某个值关联或已与 null 关联,则使用指定的非 null 值与该键进行关联。 |
7 |
V putIfAbsent(K key, V value) 如果指定的键尚未与某个值关联,则使用给定的值与该键进行关联。 |
8 |
boolean remove(Object key, Object value) 仅在当前映射到给定值的情况下,才移除针对某个键的条目。 |
9 |
V replace(K key, V value) 仅在当前映射到某个值的情况下,才替换针对某个键的条目。 |
10 |
boolean replace(K key, V oldValue, V newValue) 仅在当前映射到给定值的情况下,才替换针对某个键的条目。 |
11 |
default void replaceAll(BiFunction<? super K,? super V,? extends V> function) 用在每个条目上调用给定函数的结果替换每个条目的值,直到处理完所有条目或该函数引发异常。 |
Example
以下 TestThread 程序展示了 ConcurrentMap 与 HasMap 的用法。
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TestThread {
public static void main(final String[] arguments) {
Map<String,String> map = new ConcurrentHashMap<String, String>();
map.put("1", "One");
map.put("2", "Two");
map.put("3", "Three");
map.put("5", "Five");
map.put("6", "Six");
System.out.println("Initial ConcurrentHashMap: " + map);
Iterator<String> iterator = map.keySet().iterator();
try {
while(iterator.hasNext()) {
String key = iterator.next();
if(key.equals("3")) {
map.put("4", "Four");
}
}
} catch(ConcurrentModificationException cme) {
cme.printStackTrace();
}
System.out.println("ConcurrentHashMap after modification: " + map);
map = new HashMap<String, String>();
map.put("1", "One");
map.put("2", "Two");
map.put("3", "Three");
map.put("5", "Five");
map.put("6", "Six");
System.out.println("Initial HashMap: " + map);
iterator = map.keySet().iterator();
try {
while(iterator.hasNext()) {
String key = iterator.next();
if(key.equals("3")) {
map.put("4", "Four");
}
}
System.out.println("HashMap after modification: " + map);
} catch(ConcurrentModificationException cme) {
cme.printStackTrace();
}
}
}
这将产生以下结果。
Output
Initial ConcurrentHashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
ConcurrentHashMap after modification: {1 = One, 2 = Two, 3 = Three, 4 = Four, 5 = Five, 6 = Six}
Initial HashMap: {1 = One, 2 = Two, 3 = Three, 5 = Five, 6 = Six}
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(Unknown Source)
at java.util.HashMap$KeyIterator.next(Unknown Source)
at TestThread.main(TestThread.java:48)
ConcurrentNavigableMap Interface
一个 java.util.concurrent.ConcurrentNavigableMap 接口是 ConcurrentMap 接口的子接口,并且支持 NavigableMap 操作,以及对其可导航子映射和近似匹配进行递归操作。
ConcurrentMap Methods
Sr.No. |
Method & Description |
1 |
NavigableSet<K> descendingKeySet() 返回此映射中包含的键的反向顺序 NavigableSet 视图。 |
2 |
ConcurrentNavigableMap<K,V> descendingMap() 返回此映射中包含的映射的反向顺序视图。 |
3 |
ConcurrentNavigableMap<K,V> headMap(K toKey) 返回此映射的键严格小于 toKey 的部分视图。 |
4 |
ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive) 返回此映射的键小于(或等于,如果 inclusive 为 true)toKey 的部分视图。 |
5 |
NavigableSet<K> keySet() 返回此映射中包含的键的 NavigableSet 视图。 |
6 |
NavigableSet<K> navigableKeySet() 返回此映射中包含的键的 NavigableSet 视图。 |
7 |
ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) 返回此映射的键范围从 fromKey 到 toKey 的部分视图。 |
8 |
ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey) 返回此地图一部分的视图,其键值从 fromKey(包括)到 toKey(不包括)。 |
9 |
ConcurrentNavigableMap<K,V> tailMap(K fromKey) 返回此地图一部分的视图,其键值大于或等于 fromKey。 |
10 |
ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive) 返回此地图一部分的视图,其键值大于(或等于,如果 inclusive 为真)fromKey。 |
Example
以下 TestThread 程序演示了 ConcurrentNavigableMap 的用法。
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class TestThread {
public static void main(final String[] arguments) {
ConcurrentNavigableMap<String,String> map =
new ConcurrentSkipListMap<String, String>();
map.put("1", "One");
map.put("2", "Two");
map.put("3", "Three");
map.put("5", "Five");
map.put("6", "Six");
System.out.println("Initial ConcurrentHashMap: "+map);
System.out.println("HeadMap(\"2\") of ConcurrentHashMap: "+map.headMap("2"));
System.out.println("TailMap(\"2\") of ConcurrentHashMap: "+map.tailMap("2"));
System.out.println(
"SubMap(\"2\", \"4\") of ConcurrentHashMap: "+map.subMap("2","4"));
}
}
这将产生以下结果。