Ruby 简明教程
Ruby - Multithreading
传统程序有一个单一执行线程,直到程序终止之前,组成程序的语句或指令将按顺序执行。
多线程程序有多个执行线程。在每个线程中,语句按顺序执行,但是这些线程本身可以并行执行,例如在多核 CPU 上。通常在单 CPU 计算机中,多个线程实际上并不会并行执行,但是可以通过交错执行线程来模拟并行性。
利用 Thread 类,Ruby 非常便于编写多线程程序。Ruby 线程是一种轻量且高效的方式,在您的代码中实现并发。
Creating Ruby Threads
若要启动一个新线程,只需将一个块与对 Thread.new 的调用关联即可。将创建一个新线程来执行块中代码,原始线程会立即从 Thread.new 返回,并使用下一条语句恢复执行 −
# Thread #1 is running here
Thread.new {
# Thread #2 runs this code
}
# Thread #1 runs this code
Example
这是一个示例,演示如何使用多线程 Ruby 程序。
#!/usr/bin/ruby
def func1
i = 0
while i<=2
puts "func1 at: #{Time.now}"
sleep(2)
i = i+1
end
end
def func2
j = 0
while j<=2
puts "func2 at: #{Time.now}"
sleep(1)
j = j+1
end
end
puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"
这将生成以下结果:
Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008
Thread Lifecycle
使用 Thread.new 创建新的线程。您还可以使用同义词 Thread.start 和 Thread.fork。
在创建线程后无需启动该线程,当 CPU 资源可用时,它将自动开始运行。
Thread 类定义了一些方法来查询和操作正在运行的线程。线程会运行与 Thread.new 的调用关联的块中的代码,然后停止运行。
该块中最后一个表达式的值为线程值,可以通过调用 Thread 对象的 value 方法获取该值。如果线程已经运行完成,则 value 会立即返回线程值。否则,value 方法会阻塞,在线程完成之前不会返回。
类方法 Thread.current 返回表示当前线程的 Thread 对象。这样,线程可以对自己进行操作。类方法 Thread.main 返回表示主线程的 Thread 对象。这是在启动 Ruby 程序时开始运行的初始执行线程。
您可以通过调用线程的 Thread.join 方法来等待特定线程完成。调用线程将阻塞,直到给定线程完成。
Threads and Exceptions
如果在主线程中引发了一个异常,并且在任何地方都没有处理该异常,则 Ruby 解释器将打印一条消息并退出。在主线程之外的线程中,未处理的异常会导致该线程停止运行。
如果一个线程由于未处理的异常而 t 退出,而另一个线程 s 调用 t.join 或 t.value,那么 t 中发生的异常将在线程 s 中引发。
如果 Thread.abort_on_exception 为 false,即默认条件,一个未处理的异常会简单地终止当前线程,而其他所有线程继续运行。
如果您希望任何线程中的任何未处理异常导致解释器退出,请将类方法 Thread.abort_on_exception 设置为 true。
t = Thread.new { ... }
t.abort_on_exception = true
Thread Variables
当创建一个线程时,该线程通常可以访问范围内的任何变量。一个线程块中的局部变量属于该线程,并且它们不受共享。
Thread 类提供一个特殊功能,它允许按名称创建和访问局部变量。您可以简单地将线程对象视为散列表,使用 []= 来写入元素,并使用 [] 来读取元素。
在此示例中,每个线程都会使用键 mycount 在一个局部变量中记录变量 count 的当前值。
#!/usr/bin/ruby
count = 0
arr = []
10.times do |i|
arr[i] = Thread.new {
sleep(rand(0)/10.0)
Thread.current["mycount"] = count
count += 1
}
end
arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"
这会产生以下结果:
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10
主线程等待子线程执行完成,然后打印出每个线程捕获的 count 值。
Thread Priorities
影响线程调度的第一个因素是线程优先级:高优先级的线程会先于低优先级的线程被调度。更确切地说,当没有更高优先级的线程等待运行时,线程才能获得 CPU 时间。
可以使用 priority = 和 priority 设置和查询 Ruby Thread 对象的优先级。新创建的线程的优先级与创建它的线程的优先级相同。主线程的初始优先级为 0。
无法在开始运行之前设置线程的优先级。然而,线程可以在其执行的第一个操作中提升或降低其自身的优先级。
Thread Exclusion
如果两个线程共享对相同数据的访问权,并且其中至少有一个线程修改了该数据,则你必须特别注意以确保没有线程可能会看到不一致状态下的数据。这称为线程互斥。
Mutex 是一个类,它实现了针对某个共享资源进行互斥访问的简单信号量锁。也就是说,在给定的时间,只有一个线程可能持有该锁。其他线程可以选择在队列中等待锁可用,或者仅仅选择立即获取一个指示锁不可用的错误。
通过将对共享数据的全部访问权限置于互斥锁的控制之下,我们可以确保一致性和原子操作。我们尝试两个示例,第一个是没有互斥锁的,第二个是有互斥锁的 −
Example without Mutax
#!/usr/bin/ruby
require 'thread'
count1 = count2 = 0
difference = 0
counter = Thread.new do
loop do
count1 += 1
count2 += 1
end
end
spy = Thread.new do
loop do
difference += (count1 - count2).abs
end
end
sleep 1
puts "count1 : #{count1}"
puts "count2 : #{count2}"
puts "difference : #{difference}"
这会产生以下结果 −
count1 : 1583766
count2 : 1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
count1 = count2 = 0
difference = 0
counter = Thread.new do
loop do
mutex.synchronize do
count1 += 1
count2 += 1
end
end
end
spy = Thread.new do
loop do
mutex.synchronize do
difference += (count1 - count2).abs
end
end
end
sleep 1
mutex.lock
puts "count1 : #{count1}"
puts "count2 : #{count2}"
puts "difference : #{difference}"
这会产生以下结果 −
count1 : 696591
count2 : 696591
difference : 0
Handling Deadlock
当我们开始使用 Mutex 对象进行线程互斥时,我们必须小心避免死锁。当所有线程都在等待获得另一个线程持有的资源时,就会发生死锁。由于所有线程都处于阻塞状态,因此它们无法释放所持有的锁。并且由于它们无法释放这些锁,因此没有其他线程可以获得这些锁。
这是条件变量发挥作用的地方。条件变量只是一个与资源关联并用于保护特定互斥锁的信号量。当你需要不可用的资源时,你会等待条件变量。该操作将释放对相应互斥锁的锁。当其他一些线程发出资源可用信号时,原始线程会退出等待并在临界区域同时重新获得锁。
Example
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
a = Thread.new {
mutex.synchronize {
puts "A: I have critical section, but will wait for cv"
cv.wait(mutex)
puts "A: I have critical section again! I rule!"
}
}
puts "(Later, back at the ranch...)"
b = Thread.new {
mutex.synchronize {
puts "B: Now I am critical, but am done with cv"
cv.signal
puts "B: I am still critical, finishing up"
}
}
a.join
b.join
这会产生以下结果 −
A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!
Thread States
有 5 个可能的返回值与下表所示的 5 种可能的状态相对应。status 方法返回线程的状态。
Thread state |
Return value |
Runnable |
run |
Sleeping |
Sleeping |
Aborting |
aborting |
Terminated normally |
false |
Terminated with exception |
nil |