parallel-patterns
您是 parallel-patterns - 一个专门针对GPU并行算法设计模式和实现的专业技能。这项技能提供了在GPU上实现高效并行算法的专家能力。
概览
这项技能支持AI驱动的并行算法开发,包括:
- 实现并行归约算法(基于树的,warp)
- 生成扫描(前缀和)实现
- 设计直方图和分箱算法
- 实现并行排序算法(基数,归并)
- 生成流压缩代码
- 设计工作高效的并行模式
- 处理多通道大数据算法
- 针对特定GPU架构进行优化
前提条件
- CUDA Toolkit 11.0+
- CUB库(包含在CUDA中)
- Thrust库(包含在CUDA中)
能力
1. 并行归约
实现高效归约:
// Warp级别的归约(不需要共享内存用于单个warp)
__device__ float warpReduce(float val) {
for (int offset = warpSize / 2; offset > 0; offset >>= 1) {
val += __shfl_down_sync(0xffffffff, val, offset);
}
return val;
}
// 块级别的归约与共享内存
template<int BLOCK_SIZE>
__device__ float blockReduce(float val) {
__shared__ float shared[32]; // 每个warp一个插槽
int lane = threadIdx.x % warpSize;
int wid = threadIdx.x / warpSize;
// Warp级别的归约
val = warpReduce(val);
// 将warp结果写入共享内存
if (lane == 0) shared[wid] = val;
__syncthreads();
// 第一个warp减少warp结果
val = (threadIdx.x < BLOCK_SIZE / warpSize) ? shared[lane] : 0.0f;
if (wid == 0) val = warpReduce(val);
return val;
}
// 完整的并行归约内核
template<int BLOCK_SIZE>
__global__ void reduceKernel(const float* input, float* output, int n) {
float sum = 0.0f;
// 网格步长循环大数组
for (int i = blockIdx.x * BLOCK_SIZE + threadIdx.x;
i < n;
i += gridDim.x * BLOCK_SIZE) {
sum += input[i];
}
// 块归约
sum = blockReduce<BLOCK_SIZE>(sum);
// 写块结果
if (threadIdx.x == 0) {
atomicAdd(output, sum);
}
}
2. 前缀和(扫描)
工作高效的扫描实现:
// 在一个warp内的包容性扫描
__device__ float warpInclusiveScan(float val) {
for (int offset = 1; offset < warpSize; offset <<= 1) {
float n = __shfl_up_sync(0xffffffff, val, offset);
if (threadIdx.x % warpSize >= offset) val += n;
}
return val;
}
// 块级别的包容性扫描(Blelloch算法)
template<int BLOCK_SIZE>
__device__ void blockInclusiveScan(float* data) {
int tid = threadIdx.x;
// 上扫(归约)阶段
for (int stride = 1; stride < BLOCK_SIZE; stride <<= 1) {
int index = (tid + 1) * stride * 2 - 1;
if (index < BLOCK_SIZE) {
data[index] += data[index - stride];
}
__syncthreads();
}
// 清除最后一个元素以进行独家扫描
// if (tid == 0) data[BLOCK_SIZE - 1] = 0;
// __syncthreads();
// 下扫阶段
for (int stride = BLOCK_SIZE / 2; stride > 0; stride >>= 1) {
int index = (tid + 1) * stride * 2 - 1;
if (index + stride < BLOCK_SIZE) {
data[index + stride] += data[index];
}
__syncthreads();
}
}
// 使用CUB进行生产代码
#include <cub/cub.cuh>
void inclusiveScan(float* d_in, float* d_out, int n) {
void* d_temp_storage = nullptr;
size_t temp_storage_bytes = 0;
// 获取所需的存储
cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes,
d_in, d_out, n);
// 分配临时存储
cudaMalloc(&d_temp_storage, temp_storage_bytes);
// 运行扫描
cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes,
d_in, d_out, n);
cudaFree(d_temp_storage);
}
3. 直方图
并行直方图计算:
// 共享内存直方图(少量箱)
__global__ void histogramSmall(const int* input, int* histogram, int n, int numBins) {
__shared__ int localHist[256];
// 初始化本地直方图
for (int i = threadIdx.x; i < numBins; i += blockDim.x) {
localHist[i] = 0;
}
__syncthreads();
// 累积到本地直方图
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
int bin = input[idx];
atomicAdd(&localHist[bin], 1);
}
__syncthreads();
// 合并到全局直方图
for (int i = threadIdx.x; i < numBins; i += blockDim.x) {
atomicAdd(&histogram[i], localHist[i]);
}
}
// 每个线程的私有直方图用于大箱数
__global__ void histogramPrivate(const int* input, int* histogram,
int n, int numBins) {
extern __shared__ int sharedHist[];
int tid = threadIdx.x;
int* myHist = &sharedHist[tid * numBins];
// 初始化私有直方图
for (int i = 0; i < numBins; i++) {
myHist[i] = 0;
}
// 累积
for (int i = blockIdx.x * blockDim.x + tid; i < n; i += gridDim.x * blockDim.x) {
myHist[input[i]]++;
}
__syncthreads();
// 减少私有直方图
for (int bin = tid; bin < numBins; bin += blockDim.x) {
int sum = 0;
for (int t = 0; t < blockDim.x; t++) {
sum += sharedHist[t * numBins + bin];
}
atomicAdd(&histogram[bin], sum);
}
}
4. 基数排序
并行基数排序:
// 使用CUB/Thrust进行生产
#include <cub/cub.cuh>
void radixSort(unsigned int* d_keys, unsigned int* d_values, int n) {
void* d_temp_storage = nullptr;
size_t temp_storage_bytes = 0;
// 双缓冲区高效排序
cub::DoubleBuffer<unsigned int> d_keys_db(d_keys, d_keys_alt);
cub::DoubleBuffer<unsigned int> d_values_db(d_values, d_values_alt);
// 获取存储需求
cub::DeviceRadixSort::SortPairs(d_temp_storage, temp_storage_bytes,
d_keys_db, d_values_db, n);
cudaMalloc(&d_temp_storage, temp_storage_bytes);
// 排序
cub::DeviceRadixSort::SortPairs(d_temp_storage, temp_storage_bytes,
d_keys_db, d_values_db, n);
cudaFree(d_temp_storage);
}
// 基数排序的基本构建块
__global__ void countRadixDigits(const unsigned int* keys, int* counts,
int n, int shift) {
__shared__ int localCounts[16]; // 4-bit基数
if (threadIdx.x < 16) localCounts[threadIdx.x] = 0;
__syncthreads();
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
int digit = (keys[idx] >> shift) & 0xF;
atomicAdd(&localCounts[digit], 1);
}
__syncthreads();
if (threadIdx.x < 16) {
atomicAdd(&counts[blockIdx.x * 16 + threadIdx.x], localCounts[threadIdx.x]);
}
}
5. 流压缩
过滤和压缩数组:
// 简单的压缩与原子计数器
__global__ void compactAtomic(const int* input, const int* flags,
int* output, int* counter, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n && flags[idx]) {
int pos = atomicAdd(counter, 1);
output[pos] = input[idx];
}
}
// 使用扫描的高效压缩
// 第1步:生成标志
__global__ void generateFlags(const float* input, int* flags, int n, float threshold) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
flags[idx] = (input[idx] > threshold) ? 1 : 0;
}
}
// 第2步:对标志进行独占扫描(给出输出位置)
// 第3步:散射到输出位置
__global__ void scatter(const float* input, const int* flags,
const int* positions, float* output, int n) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n && flags[idx]) {
output[positions[idx]] = input[idx];
}
}
// 使用CUB选择
#include <cub/cub.cuh>
void compactWithCUB(float* d_in, float* d_out, int* d_num_selected, int n) {
void* d_temp_storage = nullptr;
size_t temp_storage_bytes = 0;
auto select_op = [] __device__ (float val) { return val > 0.0f; };
cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes,
d_in, d_out, d_num_selected, n, select_op);
cudaMalloc(&d_temp_storage, temp_storage_bytes);
cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes,
d_in, d_out, d_num_selected, n, select_op);
cudaFree(d_temp_storage);
}
6. 并行合并
合并排序序列:
// 用于合并路径的二分搜索
__device__ int binarySearch(const int* data, int target, int left, int right) {
while (left < right) {
int mid = (left + right) / 2;
if (data[mid] < target) {
left = mid + 1;
} else {
right = mid;
}
}
return left;
}
// 合并路径并行合并
__global__ void parallelMerge(const int* A, int sizeA,
const int* B, int sizeB,
int* output) {
int tid = blockIdx.x * blockDim.x + threadIdx.x;
int totalSize = sizeA + sizeB;
if (tid < totalSize) {
// 查找合并路径坐标
int diagonal = tid;
int aStart = max(0, diagonal - sizeB);
int aEnd = min(diagonal, sizeA);
// 在合并路径上进行二分搜索
while (aStart < aEnd) {
int aMid = (aStart + aEnd) / 2;
int bMid = diagonal - aMid - 1;
if (bMid >= 0 && bMid < sizeB && A[aMid] > B[bMid]) {
aEnd = aMid;
} else {
aStart = aMid + 1;
}
}
int aIdx = aStart;
int bIdx = diagonal - aStart;
// 确定哪个元素进入这个位置
if (aIdx < sizeA && (bIdx >= sizeB || A[aIdx] <= B[bIdx])) {
output[tid] = A[aIdx];
} else {
output[tid] = B[bIdx];
}
}
}
7. 工作分配模式
负载平衡的工作分配:
// 持久线程模式
__global__ void persistentKernel(int* workQueue, int* queueSize,
int* results) {
__shared__ int nextItem;
while (true) {
// 获取下一个工作项
if (threadIdx.x == 0) {
nextItem = atomicAdd(queueSize, -1) - 1;
}
__syncthreads();
if (nextItem < 0) break;
// 处理工作项
int workItem = workQueue[nextItem];
// ...做工作...
}
}
// 动态并行性用于不规则工作负载
__global__ void adaptiveKernel(int* data, int n, int threshold) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx >= n) return;
int workload = computeWorkload(data[idx]);
if (workload > threshold) {
// 为重工作启动子内核
processHeavy<<<1, 128>>>(data, idx);
} else {
// 内联处理轻工作
processLight(data, idx);
}
}
流程集成
这项技能与以下流程集成:
parallel-algorithm-design.js- 算法设计工作流reduction-scan-implementation.js- 归约/扫描实现atomic-operations-synchronization.js- 原子模式
输出格式
{
"operation": "generate-pattern",
"pattern": "parallel-reduction",
"configuration": {
"data_type": "float",
"reduction_op": "sum",
"block_size": 256,
"use_warp_shuffle": true
},
"generated_code": {
"kernel_file": "reduction.cu",
"lines": 85
},
"performance_estimate": {
"memory_bound": true,
"bandwidth_utilization": 0.85,
"operations_per_element": 1
}
}
依赖关系
- CUDA Toolkit 11.0+
- CUB库
- Thrust库
约束
- 归约需要关联操作
- 扫描需要关联操作以确保正确性
- 直方图性能取决于箱计数分布
- 排序性能因键分布而异