Andrew Moa Blog Site

矩阵乘法运算(三)-使用MPI并行加速

MPI是一种并行计算协议,也是目前高性能计算集群最为常用的接口程序。MPI通过进程间消息进行通讯,可以跨节点调用多核心执行并行计算,这是OpenMP所不具备的。不同平台均有mpi实现,比如Windows下的MS-MPI和Intel MPI,Linux下的OpenMPI和MPICH等。

1. MPI并行加速循环计算

1.1 C实现

mpi需要对主程序接口进行初始化,建立消息广播机制;同时将需要计算的数组进行分割,广播到不同的进程中。以往这些操作都是openmp或其他并行库内部实现的,程序员不需要关心底层如何实现。但使用mpi需要程序员对每个进程的全局和局部空间进行手动分配,需要控制每个消息的广播,无疑增加了额外的学习成本。

这里将main.c文件修改如下。

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <math.h>

#define MAX(a, b) ((a) > (b) ? (a) : (b))
#define MIN(a, b) ((a) < (b) ? (a) : (b))

extern void matrix_multiply_float(int n, int rank, int size, float local_A[], float B[], float local_C[]);
extern void matrix_multiply_double(int n, int rank, int size, double local_A[], double B[], double local_C[]);

// Initialize matrix
void initialize_matrix_float(int n, float matrix[])
{
    srand((unsigned)time(NULL));
    for (int i = 0; i < n; i++)
    {
        for (int j = 0; j < n; j++)
        {
            matrix[i * n + j] = rand() / (float)(RAND_MAX);
        }
    }
}

void initialize_matrix_double(int n, double matrix[])
{
    srand((unsigned)time(NULL));
    for (int i = 0; i < n; i++)
    {
        for (int j = 0; j < n; j++)
        {
            matrix[i * n + j] = rand() / (double)(RAND_MAX);
        }
    }
}

// Execute matrix multiply and print results
int mpi_float(int dim, int loop_num, double *ave_gflops, double *max_gflops, double *ave_time, double *min_time)
{
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    // Use volatile to prevent compiler optimizations
    volatile float *a, *b, *local_c;
    struct timespec start_ns, end_ns;
    double cpu_time, total_cpu_time;

    // 计算每个进程处理的行数
    int rows_per_process = dim / size;
    int remainder = dim % size;
    if (rank < remainder)
    {
        rows_per_process++;
    }

    for (int i = 0; i < loop_num; i++)
    {
        int check_indices[2];
        float check_value;

        // 主进程分配完整矩阵内存
        if (rank == 0)
        {
            a = (float *)malloc(dim * dim * sizeof(float));
            b = (float *)malloc(dim * dim * sizeof(float));
            if (a == NULL || b == NULL)
            {
                fprintf(stderr, "Memory allocation failed\n");
                return 0;
            }
            initialize_matrix_float(dim, a);
            initialize_matrix_float(dim, b);

            // 生成校验值
            int check_row = rand() % dim;
            int check_col = rand() % dim;
            check_value = 0.0;
            for (int k = 0; k < dim; k++)
            {
                check_value += a[check_row * dim + k] * b[k * dim + check_col];
            }

            check_indices[0] = check_row;
            check_indices[1] = check_col;

            // 广播校验行列索引和校验值
            MPI_Bcast(check_indices, 2, MPI_INT, 0, MPI_COMM_WORLD);
            MPI_Bcast(&check_value, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
        }
        else
        {
            a = NULL;
            b = NULL;
            MPI_Bcast(check_indices, 2, MPI_INT, 0, MPI_COMM_WORLD);
            MPI_Bcast(&check_value, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);
        }

        // 为每个进程分配局部矩阵空间
        float *local_A = (float *)malloc(rows_per_process * dim * sizeof(float));
        local_c = (float *)calloc(rows_per_process * dim, sizeof(float));
        if (local_A == NULL || local_c == NULL)
        {
            fprintf(stderr, "Memory allocation failed\n");
            return 0;
        }

        // 分发矩阵 A 的行到各个进程
        int *sendcounts = (int *)malloc(size * sizeof(int));
        int *displs = (int *)malloc(size * sizeof(int));
        int offset = 0;
        for (int p = 0; p < size; p++)
        {
            sendcounts[p] = (p < remainder) ? (rows_per_process * dim) : ((rows_per_process - 1) * dim);
            displs[p] = offset;
            offset += sendcounts[p];
        }
        MPI_Scatterv(a, sendcounts, displs, MPI_FLOAT, local_A, rows_per_process * dim, MPI_FLOAT, 0, MPI_COMM_WORLD);

        // 所有进程都需要完整的矩阵 B
        float *full_B = (float *)malloc(dim * dim * sizeof(float));
        if (full_B == NULL)
        {
            fprintf(stderr, "Memory allocation failed for full_B\n");
            return 0;
        }
        if (rank == 0)
        {
            memcpy(full_B, b, dim * dim * sizeof(float));
        }
        MPI_Bcast(full_B, dim * dim, MPI_FLOAT, 0, MPI_COMM_WORLD);

        timespec_get(&start_ns, TIME_UTC);
        matrix_multiply_float(dim, rank, size, local_A, full_B, local_c);
        timespec_get(&end_ns, TIME_UTC);
        cpu_time = (end_ns.tv_sec - start_ns.tv_sec) + (end_ns.tv_nsec - start_ns.tv_nsec) / 1e9;

        // 使用 MPI_Reduce 对所有进程的 cpu_time 求和
        MPI_Reduce(&cpu_time, &total_cpu_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

        if (rank == 0)
        {
            // 计算平均值
            cpu_time = total_cpu_time / size;
        }

        // 将平均值广播给所有进程
        MPI_Bcast(&cpu_time, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);

        double gflops = 1e-9 * dim * dim * dim * 2 / cpu_time;
        if (rank == 0)
        {
            printf("%d\t: %d x %d Matrix multiply wall time : %.6fs(%.3fGflops)\n", i + 1, dim, dim, cpu_time, gflops);
            fflush(stdout); // 强制刷新标准输出缓冲区
        }

        // 收集所有进程的局部结果到主进程
        float *c = NULL;
        if (rank == 0)
        {
            c = (float *)malloc(dim * dim * sizeof(float));
            if (c == NULL)
            {
                fprintf(stderr, "Memory allocation failed for c matrix\n");
                return 0;
            }
        }
        MPI_Gatherv(local_c, rows_per_process * dim, MPI_FLOAT, c, sendcounts, displs, MPI_FLOAT, 0, MPI_COMM_WORLD);

        // 主进程进行校验
        if (rank == 0)
        {
            int check_row = check_indices[0];
            int check_col = check_indices[1];
            float result_value = c[check_row * dim + check_col];
            if (fabs(result_value - check_value) > 0.001)
            {
                fprintf(stderr, "Verification failed at iteration %d: expected %.6f, got %.6f\n", i + 1, check_value, result_value);
            }
            free(c);
        }

        // Free memory
        if (rank == 0)
        {
            free(a);
            free(b);
        }
        free(local_A);
        free(local_c);
        free(sendcounts);
        free(displs);
        free(full_B);
        *ave_gflops += gflops;
        *max_gflops = MAX(*max_gflops, gflops);
        *ave_time += cpu_time;
        *min_time = MIN(*min_time, cpu_time);
    }
    *ave_gflops /= loop_num;
    *ave_time /= loop_num;
    return 1;
}

int mpi_double(int dim, int loop_num, double *ave_gflops, double *max_gflops, double *ave_time, double *min_time)
{
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    // Use volatile to prevent compiler optimizations
    volatile double *a, *b, *local_c;
    struct timespec start_ns, end_ns;
    double cpu_time, total_cpu_time;

    // 计算每个进程处理的行数
    int rows_per_process = dim / size;
    int remainder = dim % size;
    if (rank < remainder)
    {
        rows_per_process++;
    }

    for (int i = 0; i < loop_num; i++)
    {
        int check_indices[2];
        double check_value;

        // 主进程分配完整矩阵内存
        if (rank == 0)
        {
            a = (double *)malloc(dim * dim * sizeof(double));
            b = (double *)malloc(dim * dim * sizeof(double));
            if (a == NULL || b == NULL)
            {
                fprintf(stderr, "Memory allocation failed\n");
                return 0;
            }
            initialize_matrix_double(dim, a);
            initialize_matrix_double(dim, b);

            // 生成校验值
            int check_row = rand() % dim;
            int check_col = rand() % dim;
            check_value = 0.0;
            for (int k = 0; k < dim; k++)
            {
                check_value += a[check_row * dim + k] * b[k * dim + check_col];
            }

            check_indices[0] = check_row;
            check_indices[1] = check_col;

            // 广播校验行列索引和校验值
            MPI_Bcast(check_indices, 2, MPI_INT, 0, MPI_COMM_WORLD);
            MPI_Bcast(&check_value, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
        }
        else
        {
            a = NULL;
            b = NULL;
            MPI_Bcast(check_indices, 2, MPI_INT, 0, MPI_COMM_WORLD);
            MPI_Bcast(&check_value, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
        }

        // 为每个进程分配局部矩阵空间
        double *local_A = (double *)malloc(rows_per_process * dim * sizeof(double));
        local_c = (double *)calloc(rows_per_process * dim, sizeof(double));
        if (local_A == NULL || local_c == NULL)
        {
            fprintf(stderr, "Memory allocation failed\n");
            return 0;
        }

        // 分发矩阵 A 的行到各个进程
        int *sendcounts = (int *)malloc(size * sizeof(int));
        int *displs = (int *)malloc(size * sizeof(int));
        int offset = 0;
        for (int p = 0; p < size; p++)
        {
            sendcounts[p] = (p < remainder) ? (rows_per_process * dim) : ((rows_per_process - 1) * dim);
            displs[p] = offset;
            offset += sendcounts[p];
        }
        MPI_Scatterv(a, sendcounts, displs, MPI_DOUBLE, local_A, rows_per_process * dim, MPI_DOUBLE, 0, MPI_COMM_WORLD);

        // 所有进程都需要完整的矩阵 B
        double *full_B = (double *)malloc(dim * dim * sizeof(double));
        if (full_B == NULL)
        {
            fprintf(stderr, "Memory allocation failed for full_B\n");
            return 0;
        }
        if (rank == 0)
        {
            memcpy(full_B, b, dim * dim * sizeof(double));
        }
        MPI_Bcast(full_B, dim * dim, MPI_DOUBLE, 0, MPI_COMM_WORLD);

        timespec_get(&start_ns, TIME_UTC);
        matrix_multiply_double(dim, rank, size, local_A, full_B, local_c);
        timespec_get(&end_ns, TIME_UTC);
        cpu_time = (end_ns.tv_sec - start_ns.tv_sec) + (end_ns.tv_nsec - start_ns.tv_nsec) / 1e9;

        // 使用 MPI_Reduce 对所有进程的 cpu_time 求和
        MPI_Reduce(&cpu_time, &total_cpu_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

        if (rank == 0)
        {
            // 计算平均值
            cpu_time = total_cpu_time / size;
        }

        // 将平均值广播给所有进程
        MPI_Bcast(&cpu_time, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);

        double gflops = 1e-9 * dim * dim * dim * 2 / cpu_time;
        if (rank == 0)
        {
            printf("%d\t: %d x %d Matrix multiply wall time : %.6fs(%.3fGflops)\n", i + 1, dim, dim, cpu_time, gflops);
            fflush(stdout); // 强制刷新标准输出缓冲区
        }

        // 收集所有进程的局部结果到主进程
        double *c = NULL;
        if (rank == 0)
        {
            c = (double *)malloc(dim * dim * sizeof(double));
            if (c == NULL)
            {
                fprintf(stderr, "Memory allocation failed for c matrix\n");
                return 0;
            }
        }
        MPI_Gatherv(local_c, rows_per_process * dim, MPI_DOUBLE, c, sendcounts, displs, MPI_DOUBLE, 0, MPI_COMM_WORLD);

        // 主进程进行校验
        if (rank == 0)
        {
            int check_row = check_indices[0];
            int check_col = check_indices[1];
            double result_value = c[check_row * dim + check_col];
            if (fabs(result_value - check_value) > 0.000001)
            {
                fprintf(stderr, "Verification failed at iteration %d: expected %.6f, got %.6f\n", i + 1, check_value, result_value);
            }
            free(c);
        }

        // Free memory
        if (rank == 0)
        {
            free(a);
            free(b);
        }
        free(local_A);
        free(local_c);
        free(sendcounts);
        free(displs);
        free(full_B);
        *ave_gflops += gflops;
        *max_gflops = MAX(*max_gflops, gflops);
        *ave_time += cpu_time;
        *min_time = MIN(*min_time, cpu_time);
    }
    *ave_gflops /= loop_num;
    *ave_time /= loop_num;
    return 1;
}

int main(int argc, char *argv[])
{
    int rank;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    int n = 10;                                // Default matrix size exponent
    int loop_num = 5;                          // Number of iterations for averaging
    double ave_gflops = 0.0, max_gflops = 0.0; // Average and maximum Gflops
    double ave_time = 0.0, min_time = 1e9;     // Average and minimum time
    int use_double = 0;                        // Default to float precision

    // Help message
    if (argc == 1 || (argc == 2 && (strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0)))
    {
        if (rank == 0)
        {
            printf("Usage: mpiexec/mpirun [-n/-np $NUM_PROCS] %s [-n SIZE] [-l LOOP_NUM] [-float|-double]\n", argv[0]);
            printf("  -n SIZE      Specify matrix size, like 2^SIZE (default: 10)\n");
            printf("  -l LOOP_NUM  Specify number of iterations (default: 5)\n");
            printf("  -float       Use float precision (default)\n");
            printf("  -double      Use double precision\n");
            printf("  -h, --help   Show this help message\n");
        }
        MPI_Finalize();
        return 0;
    }

    // Parse -n, -l, -float, -double options
    int double_flag = 0, float_flag = 0;
    for (int argi = 1; argi < argc; ++argi)
    {
        if (strcmp(argv[argi], "-n") == 0 && argi + 1 < argc)
        {
            n = atoi(argv[argi + 1]);
            argi++;
        }
        else if (strcmp(argv[argi], "-l") == 0 && argi + 1 < argc)
        {
            loop_num = atoi(argv[argi + 1]);
            argi++;
        }
        else if (strcmp(argv[argi], "-double") == 0)
        {
            double_flag = 1;
        }
        else if (strcmp(argv[argi], "-float") == 0)
        {
            float_flag = 1;
        }
    }
    if (double_flag && float_flag)
    {
        if (rank == 0)
        {
            fprintf(stderr, "Error: Cannot specify both -double and -float options.\n");
        }
        MPI_Finalize();
        return 1;
    }
    use_double = double_flag ? 1 : 0;

    int dim = (int)pow(2, n);

    if (use_double)
    {
        if (rank == 0)
        {
            printf("Using double precision for matrix multiplication.\n");
        }
        if (!mpi_double(dim, loop_num, &ave_gflops, &max_gflops, &ave_time, &min_time))
        {
            MPI_Finalize();
            return 1;
        }
    }
    else
    {
        if (rank == 0)
        {
            printf("Using float precision for matrix multiplication.\n");
        }
        if (!mpi_float(dim, loop_num, &ave_gflops, &max_gflops, &ave_time, &min_time))
        {
            MPI_Finalize();
            return 1;
        }
    }

    if (rank == 0)
    {
        printf("Average Gflops: %.3f, Max Gflops: %.3f\n", ave_gflops, max_gflops);
        printf("Average Time: %.6fs, Min Time: %.6fs\n", ave_time, min_time);
    }

    MPI_Finalize();
    return 0;
}

为了确保合并之后的矩阵没有错误,主程序中增加了随机坐标校验机制。

阅读时长26分钟
Andrew Moa

矩阵乘法运算(一)-使用OpenMP加速循环计算

说到矩阵,只要是理工科都会想到被线性代数课支配的恐惧。矩阵乘法运算,往大了说各种工业和科研数值计算离不开它,往小了说各种跑分软件也会用到它,矩阵乘法运算的耗时也是评判计算机浮点运算性能的重要指标。本文的目的是通过矩阵乘法运算验证各种实现方式的性能差异,并对比不同计算平台的性能差异,为高性能计算开发提供参考。

1. 矩阵乘法算法

矩阵乘法运算也成为矩阵点乘,可以用下图表示1,通常由A矩阵对应行和B矩阵对应列的元素相乘并累加,形成C矩阵对应行列的值。要求A矩阵的列数要与B矩阵的行数相等,C矩阵的尺寸则相当于B矩阵的行数×A矩阵的列数。

06d7ac8e02b5dab1769e5284772bdf9e.png

用C语言表示一个M×N矩阵点乘一个N×T矩阵,一般写成3层循环嵌套的形式。

...
	for (int i = 0; i < M; i++)
	{
		for (int j = 0; j < T; j++)
		{
			C[i,j] = 0.0;
			for (int k = 0; k < N; k++)
			{
				C[i,j] += A[i,k] * B[k,j];
			}
		}
	}
...

通常认为一个N×N维矩阵的乘法次数为N的3次方,用O(n3)表示计算复杂度。也有一些算法,将其中某些乘法运算转化成加法,减少了乘法运算次数。比如Strassen算法2,理论上计算复杂度只有O(n2.807),降低了大矩阵相乘的耗时。更新的算法比如Coppersmith-Winograd方法,计算复杂度更是号称只有O(n2.3727)。这种算法上的差异不在本文讨论之列。

2. 循环嵌套并行化

现代计算机处理器一般都为多核心,为充分利用处理器性能,使用openmp等并行库对计算程序进行并行优化不失为一个好的选项。使用单核心进行循环嵌套求解没有实际意义,这里不做演示,有兴趣的可以在下面的代码中单独去掉并行库后进行编译计算。

2.1 C实现

C实现很简单,通过三层循环嵌套实现矩阵相乘算法。这里通过openmp将前两层循环合并分配到不同线程中展开计算3,以充分利用多核心处理器的性能。以下是openmp.c的示例代码。

#include <omp.h>

void matrix_multiply_float(int n, float A[], float B[], float C[])
{
#pragma omp parallel for collapse(2) shared(A, B, C)
	for (int i = 0; i < n; i++)
	{
		for (int j = 0; j < n; j++)
		{
			C[i * n + j] = 0;
			for (int k = 0; k < n; k++)
			{
				C[i * n + j] += A[i * n + k] * B[k * n + j];
			}
		}
	}
}

void matrix_multiply_double(int n, double A[], double B[], double C[])
{
#pragma omp parallel for collapse(2) shared(A, B, C)
	for (int i = 0; i < n; i++)
	{
		for (int j = 0; j < n; j++)
		{
			C[i * n + j] = 0;
			for (int k = 0; k < n; k++)
			{
				C[i * n + j] += A[i * n + k] * B[k * n + j];
			}
		}
	}
}

main.c中定义一些命令行开关,用户可以自定义矩阵尺寸、循环次数、计算精度等。这里定义了一个性能参数,GFLOPs,用来衡量单位时间内执行浮点运算的能力,1GFLOPs相当于1秒内执行109次浮点运算。

阅读时长34分钟
Andrew Moa

使用VSCode开发STAR-CCM+用户程序:通过Fortran构建动态库

虽然STAR-CCM+官方文档专门说明,Windows下不支持 FORTRAN1。但实际上,只要编译器支持,在Windows下使用Fortran编译出来的用户程序一样能正常在STAR-CCM+中正常加载运行。

1. 构建CMake工程

首先我们参考官方文档当中的教程案例2,编写CMake项目,项目结构如下:

STARCCM_FORTRAN_SAMPLE
   CMakeLists.txt	# CMake配置文件
   README.md	# 说明文件,非必须
├───.vscode
      launch.json	# 启动调试模式时自动生成的文件,非必须
      settings.json	# 定义CMake相关变量
└───src
        initVelocity.f
        StarReal.f.in
        sutherlandViscosity.f
        uflib.f
        zeroGradT.f

CMake配置文件CMakeLists.txt主要内容如下:

cmake_minimum_required(VERSION 3.10)

# Project name
project(UserFortran LANGUAGES Fortran)

set(CMAKE_Fortran_STANDARD 2008)

# Check for STARCCM_USER_LIB_DIR
if(NOT DEFINED STARCCM_USER_LIB_DIR)
    message(FATAL_ERROR "STARCCM_USER_LIB_DIR is not defined. Please specify the path to the STAR-CCM+ UserFunctions library directory.")
    # For example, in Windows : C:/Program Files/Siemens/19.06.009-R8/STAR-CCM+19.06.009-R8/star/lib/win64/clang17.0vc14.2-r8/lib
    # In Linux : /opt/Siemens/19.06.009-R8/STAR-CCM+19.06.009-R8/star/lib/linux-x86_64-2.28/gnu11.4-r8/lib"
else()
    message(STATUS "STARCCM_USER_LIB_DIR location : " ${STARCCM_USER_LIB_DIR})
endif()

# Check for STARCCM_STD_LIB_DIR
if(NOT DEFINED STARCCM_STD_LIB_DIR)
    message(STATUS "STARCCM_STD_LIB_DIR undefined. using system standard library. ")
    # For example, in Linux : /opt/Siemens/19.06.009-R8/STAR-CCM+19.06.009-R8/star/lib/linux-x86_64-2.28/system/gnu11.4-64"
else()
    message(STATUS "STARCCM_STD_LIB_DIR location : " ${STARCCM_STD_LIB_DIR})
endif()

# STAR-CCM+ output precision
if(USE_DOUBLE_PRECISION)
    message(STATUS "Using double precision for STAR-CCM+")
    set(STAR_REAL "1D0")
else()
    message(STATUS "Using float precision for STAR-CCM+")
    set(STAR_REAL "1.0")
endif()

# generate the StarReal.f file
configure_file(src/StarReal.f.in ${CMAKE_BINARY_DIR}/StarReal.f @ONLY)

# Include directories
include_directories(${PROJECT_SOURCE_DIR}/include
)

# Link with STARCCM LIB directory
link_directories(${STARCCM_USER_LIB_DIR}
    ${STARCCM_STD_LIB_DIR}
)

# Specify the source files
set(SOURCES
    ${CMAKE_BINARY_DIR}/StarReal.f
    src/initVelocity.f
    src/sutherlandViscosity.f
    src/zeroGradT.f
    src/uflib.f
)

# Add library
add_library(${CMAKE_PROJECT_NAME} SHARED
    ${SOURCES}
)

# Link library
target_link_libraries(${CMAKE_PROJECT_NAME} UserFunctions)

# Install target
install(TARGETS ${CMAKE_PROJECT_NAME}
    RUNTIME DESTINATION bin
    LIBRARY DESTINATION lib
    ARCHIVE DESTINATION lib/static
    )

其中有两个关键设置:

阅读时长4分钟
Andrew Moa