parallel-patterns parallel-patterns

GPU并行算法设计模式与实现,包括并行归约、扫描/前缀和、直方图、并行排序、流压缩等高效算法

机器学习 0 次安装 0 次浏览 更新于 2/25/2026

parallel-patterns

您是 parallel-patterns - 一个专门针对GPU并行算法设计模式和实现的专业技能。这项技能提供了在GPU上实现高效并行算法的专家能力。

概览

这项技能支持AI驱动的并行算法开发,包括:

  • 实现并行归约算法(基于树的,warp)
  • 生成扫描(前缀和)实现
  • 设计直方图和分箱算法
  • 实现并行排序算法(基数,归并)
  • 生成流压缩代码
  • 设计工作高效的并行模式
  • 处理多通道大数据算法
  • 针对特定GPU架构进行优化

前提条件

  • CUDA Toolkit 11.0+
  • CUB库(包含在CUDA中)
  • Thrust库(包含在CUDA中)

能力

1. 并行归约

实现高效归约:

// Warp级别的归约(不需要共享内存用于单个warp)
__device__ float warpReduce(float val) {
    for (int offset = warpSize / 2; offset > 0; offset >>= 1) {
        val += __shfl_down_sync(0xffffffff, val, offset);
    }
    return val;
}

// 块级别的归约与共享内存
template<int BLOCK_SIZE>
__device__ float blockReduce(float val) {
    __shared__ float shared[32];  // 每个warp一个插槽

    int lane = threadIdx.x % warpSize;
    int wid = threadIdx.x / warpSize;

    // Warp级别的归约
    val = warpReduce(val);

    // 将warp结果写入共享内存
    if (lane == 0) shared[wid] = val;
    __syncthreads();

    // 第一个warp减少warp结果
    val = (threadIdx.x < BLOCK_SIZE / warpSize) ? shared[lane] : 0.0f;
    if (wid == 0) val = warpReduce(val);

    return val;
}

// 完整的并行归约内核
template<int BLOCK_SIZE>
__global__ void reduceKernel(const float* input, float* output, int n) {
    float sum = 0.0f;

    // 网格步长循环大数组
    for (int i = blockIdx.x * BLOCK_SIZE + threadIdx.x;
         i < n;
         i += gridDim.x * BLOCK_SIZE) {
        sum += input[i];
    }

    // 块归约
    sum = blockReduce<BLOCK_SIZE>(sum);

    // 写块结果
    if (threadIdx.x == 0) {
        atomicAdd(output, sum);
    }
}

2. 前缀和(扫描)

工作高效的扫描实现:

// 在一个warp内的包容性扫描
__device__ float warpInclusiveScan(float val) {
    for (int offset = 1; offset < warpSize; offset <<= 1) {
        float n = __shfl_up_sync(0xffffffff, val, offset);
        if (threadIdx.x % warpSize >= offset) val += n;
    }
    return val;
}

// 块级别的包容性扫描(Blelloch算法)
template<int BLOCK_SIZE>
__device__ void blockInclusiveScan(float* data) {
    int tid = threadIdx.x;

    // 上扫(归约)阶段
    for (int stride = 1; stride < BLOCK_SIZE; stride <<= 1) {
        int index = (tid + 1) * stride * 2 - 1;
        if (index < BLOCK_SIZE) {
            data[index] += data[index - stride];
        }
        __syncthreads();
    }

    // 清除最后一个元素以进行独家扫描
    // if (tid == 0) data[BLOCK_SIZE - 1] = 0;
    // __syncthreads();

    // 下扫阶段
    for (int stride = BLOCK_SIZE / 2; stride > 0; stride >>= 1) {
        int index = (tid + 1) * stride * 2 - 1;
        if (index + stride < BLOCK_SIZE) {
            data[index + stride] += data[index];
        }
        __syncthreads();
    }
}

// 使用CUB进行生产代码
#include <cub/cub.cuh>

void inclusiveScan(float* d_in, float* d_out, int n) {
    void* d_temp_storage = nullptr;
    size_t temp_storage_bytes = 0;

    // 获取所需的存储
    cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes,
        d_in, d_out, n);

    // 分配临时存储
    cudaMalloc(&d_temp_storage, temp_storage_bytes);

    // 运行扫描
    cub::DeviceScan::InclusiveSum(d_temp_storage, temp_storage_bytes,
        d_in, d_out, n);

    cudaFree(d_temp_storage);
}

3. 直方图

并行直方图计算:

// 共享内存直方图(少量箱)
__global__ void histogramSmall(const int* input, int* histogram, int n, int numBins) {
    __shared__ int localHist[256];

    // 初始化本地直方图
    for (int i = threadIdx.x; i < numBins; i += blockDim.x) {
        localHist[i] = 0;
    }
    __syncthreads();

    // 累积到本地直方图
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < n) {
        int bin = input[idx];
        atomicAdd(&localHist[bin], 1);
    }
    __syncthreads();

    // 合并到全局直方图
    for (int i = threadIdx.x; i < numBins; i += blockDim.x) {
        atomicAdd(&histogram[i], localHist[i]);
    }
}

// 每个线程的私有直方图用于大箱数
__global__ void histogramPrivate(const int* input, int* histogram,
                                  int n, int numBins) {
    extern __shared__ int sharedHist[];

    int tid = threadIdx.x;
    int* myHist = &sharedHist[tid * numBins];

    // 初始化私有直方图
    for (int i = 0; i < numBins; i++) {
        myHist[i] = 0;
    }

    // 累积
    for (int i = blockIdx.x * blockDim.x + tid; i < n; i += gridDim.x * blockDim.x) {
        myHist[input[i]]++;
    }

    __syncthreads();

    // 减少私有直方图
    for (int bin = tid; bin < numBins; bin += blockDim.x) {
        int sum = 0;
        for (int t = 0; t < blockDim.x; t++) {
            sum += sharedHist[t * numBins + bin];
        }
        atomicAdd(&histogram[bin], sum);
    }
}

4. 基数排序

并行基数排序:

// 使用CUB/Thrust进行生产
#include <cub/cub.cuh>

void radixSort(unsigned int* d_keys, unsigned int* d_values, int n) {
    void* d_temp_storage = nullptr;
    size_t temp_storage_bytes = 0;

    // 双缓冲区高效排序
    cub::DoubleBuffer<unsigned int> d_keys_db(d_keys, d_keys_alt);
    cub::DoubleBuffer<unsigned int> d_values_db(d_values, d_values_alt);

    // 获取存储需求
    cub::DeviceRadixSort::SortPairs(d_temp_storage, temp_storage_bytes,
        d_keys_db, d_values_db, n);

    cudaMalloc(&d_temp_storage, temp_storage_bytes);

    // 排序
    cub::DeviceRadixSort::SortPairs(d_temp_storage, temp_storage_bytes,
        d_keys_db, d_values_db, n);

    cudaFree(d_temp_storage);
}

// 基数排序的基本构建块
__global__ void countRadixDigits(const unsigned int* keys, int* counts,
                                  int n, int shift) {
    __shared__ int localCounts[16];  // 4-bit基数

    if (threadIdx.x < 16) localCounts[threadIdx.x] = 0;
    __syncthreads();

    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < n) {
        int digit = (keys[idx] >> shift) & 0xF;
        atomicAdd(&localCounts[digit], 1);
    }
    __syncthreads();

    if (threadIdx.x < 16) {
        atomicAdd(&counts[blockIdx.x * 16 + threadIdx.x], localCounts[threadIdx.x]);
    }
}

5. 流压缩

过滤和压缩数组:

// 简单的压缩与原子计数器
__global__ void compactAtomic(const int* input, const int* flags,
                               int* output, int* counter, int n) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < n && flags[idx]) {
        int pos = atomicAdd(counter, 1);
        output[pos] = input[idx];
    }
}

// 使用扫描的高效压缩
// 第1步:生成标志
__global__ void generateFlags(const float* input, int* flags, int n, float threshold) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < n) {
        flags[idx] = (input[idx] > threshold) ? 1 : 0;
    }
}

// 第2步:对标志进行独占扫描(给出输出位置)
// 第3步:散射到输出位置
__global__ void scatter(const float* input, const int* flags,
                        const int* positions, float* output, int n) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < n && flags[idx]) {
        output[positions[idx]] = input[idx];
    }
}

// 使用CUB选择
#include <cub/cub.cuh>

void compactWithCUB(float* d_in, float* d_out, int* d_num_selected, int n) {
    void* d_temp_storage = nullptr;
    size_t temp_storage_bytes = 0;

    auto select_op = [] __device__ (float val) { return val > 0.0f; };

    cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes,
        d_in, d_out, d_num_selected, n, select_op);

    cudaMalloc(&d_temp_storage, temp_storage_bytes);

    cub::DeviceSelect::If(d_temp_storage, temp_storage_bytes,
        d_in, d_out, d_num_selected, n, select_op);

    cudaFree(d_temp_storage);
}

6. 并行合并

合并排序序列:

// 用于合并路径的二分搜索
__device__ int binarySearch(const int* data, int target, int left, int right) {
    while (left < right) {
        int mid = (left + right) / 2;
        if (data[mid] < target) {
            left = mid + 1;
        } else {
            right = mid;
        }
    }
    return left;
}

// 合并路径并行合并
__global__ void parallelMerge(const int* A, int sizeA,
                               const int* B, int sizeB,
                               int* output) {
    int tid = blockIdx.x * blockDim.x + threadIdx.x;
    int totalSize = sizeA + sizeB;

    if (tid < totalSize) {
        // 查找合并路径坐标
        int diagonal = tid;
        int aStart = max(0, diagonal - sizeB);
        int aEnd = min(diagonal, sizeA);

        // 在合并路径上进行二分搜索
        while (aStart < aEnd) {
            int aMid = (aStart + aEnd) / 2;
            int bMid = diagonal - aMid - 1;

            if (bMid >= 0 && bMid < sizeB && A[aMid] > B[bMid]) {
                aEnd = aMid;
            } else {
                aStart = aMid + 1;
            }
        }

        int aIdx = aStart;
        int bIdx = diagonal - aStart;

        // 确定哪个元素进入这个位置
        if (aIdx < sizeA && (bIdx >= sizeB || A[aIdx] <= B[bIdx])) {
            output[tid] = A[aIdx];
        } else {
            output[tid] = B[bIdx];
        }
    }
}

7. 工作分配模式

负载平衡的工作分配:

// 持久线程模式
__global__ void persistentKernel(int* workQueue, int* queueSize,
                                  int* results) {
    __shared__ int nextItem;

    while (true) {
        // 获取下一个工作项
        if (threadIdx.x == 0) {
            nextItem = atomicAdd(queueSize, -1) - 1;
        }
        __syncthreads();

        if (nextItem < 0) break;

        // 处理工作项
        int workItem = workQueue[nextItem];
        // ...做工作...
    }
}

// 动态并行性用于不规则工作负载
__global__ void adaptiveKernel(int* data, int n, int threshold) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx >= n) return;

    int workload = computeWorkload(data[idx]);

    if (workload > threshold) {
        // 为重工作启动子内核
        processHeavy<<<1, 128>>>(data, idx);
    } else {
        // 内联处理轻工作
        processLight(data, idx);
    }
}

流程集成

这项技能与以下流程集成:

  • parallel-algorithm-design.js - 算法设计工作流
  • reduction-scan-implementation.js - 归约/扫描实现
  • atomic-operations-synchronization.js - 原子模式

输出格式

{
  "operation": "generate-pattern",
  "pattern": "parallel-reduction",
  "configuration": {
    "data_type": "float",
    "reduction_op": "sum",
    "block_size": 256,
    "use_warp_shuffle": true
  },
  "generated_code": {
    "kernel_file": "reduction.cu",
    "lines": 85
  },
  "performance_estimate": {
    "memory_bound": true,
    "bandwidth_utilization": 0.85,
    "operations_per_element": 1
  }
}

依赖关系

  • CUDA Toolkit 11.0+
  • CUB库
  • Thrust库

约束

  • 归约需要关联操作
  • 扫描需要关联操作以确保正确性
  • 直方图性能取决于箱计数分布
  • 排序性能因键分布而异