之前討論的並行,都是線程級別的,即CUDA開啟多個線程,並行執行核函數內的代碼。GPU最多就上千個核心,同一時間只能並行執行上千個任務。當我們處理千萬級別的數據,整個大任務無法被GPU一次執行,所有的計算任務需要放在一個隊列中,排隊順序執行。CUDA將放入隊列順序執行的一系列操作稱為流(Stream)。
由於異構計算的硬件特性,CUDA中以下操作是相互獨立的,通過編程,是可以操作他們並發地執行的:
針對這種互相獨立的硬件架構,CUDA使用多流作為一種高並發的方案:
實際上,流水線作業的思想被廣泛應用於CPU和GPU等計算機芯片設計上,以加速程序。
以向量加法為例,上圖中第一行的Stream 0部分是我們之前的邏輯,沒有使用多流技術,程序的三大步驟是順序執行的:
當數據量很大時,每個步驟的耗時很長,後面的步驟必須等前面執行完畢才能繼續,整體的耗時相當長。
以2000萬維的向量加法為例,向量大約有幾十M大小,將整個向量在主機和設備間拷貝將占用占用上百毫秒的時間,有可能遠比核函數計算的時間多得多。將程序改為多流後,每次只計算一小部分,流水線並發執行,會得到非常大的性能提升。
默認情況下,CUDA使用0號流,又稱默認流。不使用多流時,所有任務都在默認流中順序執行,效率較低。在使用多流之前,必須先了解多流的一些規則:
參照上圖,可將這三個規則解釋為:
某個流內的操作是順序的,非默認流之間是異步的,默認流有阻塞作用。
如果想使用多流時,必須先定義流:
stream = numba.cuda.stream()
CUDA的數據拷貝以及核函數都有專門的stream
參數來接收流,以告知該操作放入哪個流中執行:
numba.cuda.to_device(obj, stream=0, copy=True, to=None)
numba.cuda.copy_to_host(self, ary=None, stream=0)
核函數調用的地方除了要寫清執行配置,還要加一項stream
參數:
kernel[blocks_per_grid, threads_per_block, stream=0]
根據這些函數定義也可以知道,不指定stream
參數時,這些函數都使用默認的0號流。
對於程序員來說,需要將數據和計算做拆分,分別放入不同的流裡,構成一個流水線操作。
將之前的向量加法的例子改為多流處理,完整的代碼為:
from numba import cuda
import numpy as np
import math
from time import time
@cuda.jit
def vector_add(a, b, result, n):
idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
if idx < n :
result[idx] = a[idx] + b[idx]
def main():
n = 20000000
x = np.random.uniform(10,20,n)
y = np.random.uniform(10,20,n)
# x = np.arange(n).astype(np.int32)
# y = 2 * x
start = time()
# 使用默認流
# Host To Device
x_device = cuda.to_device(x)
y_device = cuda.to_device(y)
z_device = cuda.device_array(n)
z_streams_device = cuda.device_array(n)
threads_per_block = 1024
blocks_per_grid = math.ceil(n / threads_per_block)
# Kernel
vector_add[blocks_per_grid, threads_per_block](x_device, y_device, z_device, n)
# Device To Host
default_stream_result = z_device.copy_to_host()
cuda.synchronize()
print("gpu vector add time " + str(time() - start))
start = time()
# 使用5個流
number_of_streams = 5
# 每個流處理的數據量為原來的 1/5
# 符號//得到一個整數結果
segment_size = n // number_of_streams
# 創建5個cuda stream
stream_list = list()
for i in range (0, number_of_streams):
stream = cuda.stream()
stream_list.append(stream)
threads_per_block = 1024
# 每個stream的處理的數據變為原來的1/5
blocks_per_grid = math.ceil(segment_size / threads_per_block)
streams_result = np.empty(n)
# 啟動多個stream
for i in range(0, number_of_streams):
# 傳入不同的參數,讓函數在不同的流執行
# Host To Device
x_i_device = cuda.to_device(x[i * segment_size : (i + 1) * segment_size], stream=stream_list[i])
y_i_device = cuda.to_device(y[i * segment_size : (i + 1) * segment_size], stream=stream_list[i])
# Kernel
vector_add[blocks_per_grid, threads_per_block, stream_list[i]](
x_i_device,
y_i_device,
z_streams_device[i * segment_size : (i + 1) * segment_size],
segment_size)
# Device To Host
streams_result[i * segment_size : (i + 1) * segment_size] = z_streams_device[i * segment_size : (i + 1) * segment_size].copy_to_host(stream=stream_list[i])
cuda.synchronize()
print("gpu streams vector add time " + str(time() - start))
if (np.array_equal(default_stream_result, streams_result)):
print("result correct")
if __name__ == "__main__":
main()
運行結果:
gpu vector add time 7.996402740478516
gpu streams vector add time 0.3867764472961426
多流不僅需要程序員掌握流水線思想,還需要用戶對數據和計算進行拆分,並編寫更多的代碼,但是收益非常明顯。對於計算密集型的程序,這種技術非常值得認真研究。