cuda06- 流 并发

📅 2026/7/4 9:26:18 👁️ 阅读次数 📝 编程学习
cuda06- 流  并发

目录

  • 1. CUDA流
    • 1.1 流的定义
    • 1.3 流优先级
    • 1.4 cuda事件Event
    • 1.4 stream同步
      • 1.4.1 阻塞流和非阻塞流
      • 1.4.2 隐式同步
      • 1.4.3 显式同步
      • 1.4.4 配置event
    • 1.5 并发执行
      • 1.5.1 虚假依赖关系
      • 1.5.2 openMP优化并行计算
      • 1.5.3 使用环境变量调整流行为
      • 1.5.4 创建流之间的依赖关系
      • 1.5.5 空流的阻塞行为
    • 1.6 重叠内存拷贝和壳函数执行
      • 1.6.1 使用深度有先的调度方法
      • 1.6.2 使用广度有先的调度方法
    • 重叠cpu和gpu的运行
    • 流回调函数接口

博主公司重组,求推荐大模型部署ai infra, base上海的工作。my phone 15601237103

1. CUDA流

1.1 流的定义

空流,即隐式声明的流。内核启动和数据cp默认使用空流。
非空流, 使用cudaStreamCreate开辟的流。

  • 支持的并发类型:

主机计算 和 设备计算的重叠
主机计算 和 主机设与备之间数据传输
主机设与备之间数据传输 和 设备计算
并发设备计算

  • 异步数据传输:
__host__ ​__device__​cudaError_tcudaMemcpyAsync(void*dst,constvoid*src,size_tcount,cudaMemcpyKind kind,cudaStream_tstream=0)

当使用cudaMemcpyAsync做异步传输时候,必须要使用固定主机内存。可以使用

cudaMallocHost cudaHostAlloc
  • 在非默认流中启动内核,必须要提供流作为型参
kernel_name<<<grid,block,sharedMemSize,stream>>>(args);
  • 非默认流的声明 和 释放
cudaStream stream;cudaStreamCreate(&stream);cudaStreamDestroy(stream);

cudaStreamDestroy调用完后会立即返回,流相关的资源会被自动释放。

  • 检查流中所有操作是否全部完成
cudaStreamSynchronize(stream);cudaStreamQuery(stream);

cudaStreamSynchronize阻塞主机直到操作完成。cudaStreamQuery立即返回。

Fermi架构支持16路stream并发。
Kepler架构支持16路stream并发。

1.3 流优先级

  • 创建指定优先级的流
cudaStreamCreateWithPriority
  • 获取流允许优先级的范围
cudaDeviceGetStreamPriorityRange

1.4 cuda事件Event

Event使用场景:同步流执行。
Event使用场景: 监控设备进展,在stream中插入一个点用于监控流操作是否已经到达指定点。

  • 流创建和销毁
__host__​cudaError_t cudaEventCreate(cudaEvent_t*event)__host__​__device__​cudaError_t cudaEventDestroy(cudaEvent_t event)
  • 流同步
__host__​cudaError_t cudaEventSynchronize(cudaEvent_t event)
  • 两个事件的运行时间
__host__​__device__​cudaError_t cudaEventRecord(cudaEvent_t event, cudaStream_t stream=0)__host__​cudaError_t cudaEventElapsedTime(float* ms, cudaEvent_t start, cudaEvent_t end)

1.4 stream同步

非空流是异步流。
空流/默认流是同步流。
非空流包含 : 阻塞流和非阻塞流。

1.4.1 阻塞流和非阻塞流

__host__​__device__​cudaError_t cudaStreamCreateWithFlags(cudaStream_t* pStream, unsigned int flags)

CudastreamDefault :默认流创建标志。
cudaStreamNonBlocking: 非阻塞流。

1.4.2 隐式同步

调用cudaMemcpy函数,可以隐式同步设备和主机。
包含隐式同步的操作:
锁页主机主机内存分配
设备内存分配
设备内存初始化
同一个设备上两个地址之间的内存复制
一级缓存/共享内存配置的修改

1.4.3 显式同步

__host__​__device__​cudaError_t cudaDeviceSynchronize(void)__host__​cudaError_t cudaStreamSynchronize(cudaStream_t stream)__host__​cudaError_t cudaEventSynchronize(cudaEvent_t event)

stream中等待event

cudaError_tcudaStreamWaitEvent(cudaStream_tstream,cudaEvent_tevent,unsignedintflags);

CUDA 流(stream)等待某个事件(event)完成,当 event 完成后,stream 才会继续执行后续的操作。

1.4.4 配置event

__host__​__device__​cudaError_tcudaEventCreateWithFlags(cudaEvent_t*event,unsignedintflags)Valid flags include:cudaEventDefault:Default.使用该事件来准确地测量 CUDA 操作的执行时间,并且可以在事件完成后进行同步 cudaEventBlockingSync:阻塞式同步,直到事件完成 A host thread that usescudaEventSynchronize()to wait on an event created with this flag will block until the event actually completes.cudaEventDisableTiming:只同步,不记录时间戳,will provide the best performance when used withcudaStreamWaitEvent()andcudaEventQuery().cudaEventInterprocess:可以用于进程间事件。Specifies that the created event may be used as an interprocess event bycudaIpcGetEventHandle().

1.5 并发执行

1.5.1 虚假依赖关系

虚假依赖关系:所有的stream队列要复用到一个硬件队列中,深度优先启动的流就阻塞了后面其他流。

深度优先启动内核函数。排布在一起的K2 K3来自同一个stream,形成虚假依赖关系

for(inti=0;i<n_streams;i++){kernel_1<<<grid,block,0,streams[i]>>>();kernel_2<<<grid,block,0,streams[i]>>>();kernel_3<<<grid,block,0,streams[i]>>>();kernel_4<<<grid,block,0,streams[i]>>>();}


nvvp抓流

广度优先启动内核函数。确保每个kernel函数来自不同的stream,相邻的任务不存在虚假依赖关系。

for(inti=0;i<n_streams;i++)kernel_1<<<grid,block,0,streams[i]>>>();for(inti=0;i<n_streams;i++)kernel_2<<<grid,block,0,streams[i]>>>();for(inti=0;i<n_streams;i++)kernel_3<<<grid,block,0,streams[i]>>>();for(inti=0;i<n_streams;i++)kernel_4<<<grid,block,0,streams[i]>>>();


nvvp显示并发性更好一点? todo,我的抓图和书上的不同。

1.5.2 openMP优化并行计算

OpenMP:采用基于指令(pragma)的编程模型,通过在源代码中插入特定的编译指令来指示编译器如何并行化代码。
优化kernel的启动部分

omp_set_num_threads(n_streams);#pragmaomp parallel{inti=omp_get_thread_num();kernel_1<<<grid,block,0,streams[i]>>>();kernel_2<<<grid,block,0,streams[i]>>>();kernel_3<<<grid,block,0,streams[i]>>>();kernel_4<<<grid,block,0,streams[i]>>>();}

在时间轴上并没有看到任何并行优化的表现…,似乎并不如手写广度优先的展开。

1.5.3 使用环境变量调整流行为

我们设置最大连接数为4, stream数量为8,

#defineNSTREAM8constchar*iname="CUDA_DEVICE_MAX_CONNECTIONS";setenv(iname,"4",1);

我的期望是流的数量超过了最大连接数,那么多个流会共享一个连接,下图不符合预期。

书本上的预期应该是这样子:

1.5.4 创建流之间的依赖关系

原代码是这样子。

for(inti=0;i<n_streams;i++){kernel_1<<<grid,block,0,streams[i]>>>();kernel_2<<<grid,block,0,streams[i]>>>();kernel_3<<<grid,block,0,streams[i]>>>();kernel_4<<<grid,block,0,streams[i]>>>();CHECK(cudaEventRecord(kernelEvent[i],streams[i]));CHECK(cudaStreamWaitEvent(streams[n_streams-1],kernelEvent[i],0));}

nvvp抓流是这样效果,

但是预期应该是前三路并行,第四路等待前三路完成后执行:

代码我改成这样:

for(inti=0;i<n_streams-1;i++)kernel_1<<<grid,block,0,streams[i]>>>();for(inti=0;i<n_streams-1;i++)kernel_2<<<grid,block,0,streams[i]>>>();for(inti=0;i<n_streams-1;i++)kernel_3<<<grid,block,0,streams[i]>>>();for(inti=0;i<n_streams-1;i++){kernel_4<<<grid,block,0,streams[i]>>>();CHECK(cudaEventRecord(kernelEvent[i],streams[i]));}CHECK(cudaStreamWaitEvent(streams[n_streams-1],kernelEvent[2],0));CHECK(cudaStreamWaitEvent(streams[n_streams-1],kernelEvent[1],0));CHECK(cudaStreamWaitEvent(streams[n_streams-1],kernelEvent[0],0));kernel_1<<<grid,block,0,streams[3]>>>();kernel_2<<<grid,block,0,streams[3]>>>();kernel_3<<<grid,block,0,streams[3]>>>();kernel_4<<<grid,block,0,streams[3]>>>();

才可以看到流等待事件的效果:

1.5.5 空流的阻塞行为

空流是一个 阻塞流,这意味着它会阻塞其他非空流的执行,直到空流中的操作完成。
空流中的操作是同步的,即它们会按照提交的顺序依次执行。
优先并行考虑应该使用非空流:

cudaStreamCreate(&stream1);cudaStreamCreateWithFlags(&stream,cudaStreamNonBlocking);

1.6 重叠内存拷贝和壳函数执行

1.6.1 使用深度有先的调度方法


可以看到并行情况:
不同流中内核并行,
内核和其他流中数据拷贝并行,
不同方向(H2D,D2H)的数据拷贝并行

1.6.2 使用广度有先的调度方法


我的测试表示:广度优先调度多个内核的方法,耗时更少。虽然H2D的cp没有和其他流上内核执行出现并行,但是我的这个显卡内核并行度更高,也许这就是广度更快的原因。

重叠cpu和gpu的运行

所有的内核启动默认情况下都是异步的。
cudaMemcpyAsync,如果要实现异步(数据cp和计算的异步),要使用cudaMallocHost申请的page-lock内存。
否则,使用非锁页内存(如malloc)无法实现异步,只能达到cudaMemcpy的同步效果,不会报错。

流回调函数接口

回调函数目前没有看到太多使用场景

voidCUDART_CBmy_callback(cudaStream_tstream,cudaError_tstatus,void*data){printf("callback from stream %d\n",*((int*)data));}for(inti=0;i<n_streams;i++){stream_ids[i]=i;kernel_1<<<grid,block,0,streams[i]>>>();kernel_2<<<grid,block,0,streams[i]>>>();kernel_3<<<grid,block,0,streams[i]>>>();kernel_4<<<grid,block,0,streams[i]>>>();CHECK(cudaStreamAddCallback(streams[i],my_callback,(void*)(stream_ids+i),0));}