Skip to content

Running parallel jobs on BlueBEAR

Need for parallel jobs

On BlueBEAR, many users begin by running serial jobs or array jobs, which are well suited for running independent scripts or batches of similar scripts. However, these approaches fall short when a single job itself requires more computational power than one CPU core can provide, or when large datasets must be processed collaboratively rather than independently. This is where parallel jobs become essential. Unlike array jobs, which repeat similar runs, parallel jobs split a single workload across multiple cores, nodes, or even GPUs, effectively reducing the execution time of your job.

Terminology and tools

Hardware concepts

Concept Description
Node A node is a single server within the cluster, equipped with its own CPU(s), memory, and sometimes GPU(s). Nodes are the fundamental building blocks of a cluster. Jobs can run on one node (serial) or across multiple nodes (parallel).
CPU Core A core is an individual processing unit within a CPU. CPUs contain multiple cores which can be reserved to run tasks in parallel. A serial job uses one core on one node; a parallel job can use multiple cores across one or more nodes. See CPU Resources on BlueBEAR.
GPU A Graphics Processing Unit (GPU) consists of thousands of smaller cores that accelerate parallel workloads such as matrix computations, machine learning, and data analysis. BlueBEAR provides GPU-equipped nodes; see GPU Resources on BlueBEAR.

Software concepts

Concept Description
Job A job is a unit of work submitted to the cluster's job scheduler. It can consist of a single task or multiple tasks that need to be executed. Jobs can be configured to run in parallel, utilizing multiple cores, nodes, or GPUs as needed.
Task A task is a specific operation or set of operations that make up a job. In the context of parallel computing, tasks can be distributed across multiple processing units to achieve faster execution times. A job can consist of one or more tasks.
Process A process is an instance of a running program. In parallel computing, multiple processes can be executed simultaneously, each handling a portion of the overall workload. Processes can communicate with each other to share data and coordinate their actions.
Thread A thread is the smallest unit of execution within a process. Threads within the same process share the same memory space, allowing for efficient communication and data sharing. Parallel programming can involve multiple threads running concurrently to perform tasks more quickly.
Shared Memory Shared memory is a memory architecture where multiple processors or cores can access the same memory space. This allows for efficient communication and data sharing between threads or processes running on the same node. Shared memory is commonly used in parallel programming models like OpenMP.
Distributed Memory Distributed memory is a memory architecture where each processor or node has its own private memory space. Communication between nodes is typically achieved through message passing, where data is sent and received between nodes as needed. Distributed memory is commonly used in parallel programming models like MPI.

Parallel programming models

Framework Description
MPI Message Passing Interface (MPI) is the standard for inter-node communication. Available in multiple languages with several implementations (e.g. OpenMPI, MPICH).
OpenMP Parallel programming on CPUs and (via offloading) GPUs within a single node.
CUDA Programming NVIDIA GPUs (and coordinating with CPUs) on a single node. Use with MPI for multi-node GPU workloads. See MPI+CUDA.

Decision Flowchart

Refer to the following flowchart for guidance on how to choose the right framework for your job.

flowchart TD
    A[Are you trying to utilise GPUs?]
    A --> B[Yes]
    A --> C[No]
    B --> D[Are you trying to utilise
    multiple nodes?]
    D --> E[Yes]
    D --> F[No]
    C --> G[Are you trying to utilise multiple CPU cores?]
    G --> H[Yes]
    G --> I[No]
    H --> K[Are you trying to utilise
    multiple nodes?]
    K --> L[Yes]
    K --> M[No]

    E --> N[MPI + GPU framework]
    click N "./#mpi"

    style gpu_models stroke-dasharray: 5 5
    subgraph gpu_models["Choose GPU framework:"]
        O[CUDA]
        P[OpenACC]
        Q[OpenMP offloading]
    end

    N --> gpu_models
    F --> gpu_models

    click O "./#cuda"

    L --> R[MPI]
    click R "./#mpi"

    M --> S[OpenMP]
    click S "./#openmp"

    I --> T[Do you need to run the same job
    multiple times with different inputs?]
    T --> U[Yes]
    T --> V[No]

    U --> W[Array job]
    click W "../array_jobs"

    V --> Y[Serial job]
    click Y "../jobs"

Running parallel jobs

Example scripts: Python multi-processing

In this section, we will discuss how to use MPI implementation such as mpi4py to run parallel Python scripts on BlueBEAR. The mpi4py library provides bindings of the Message Passing Interface (MPI) standard for the Python programming language, allowing for parallel computing across multiple processors and nodes.

import os
import socket
from mpi4py import MPI

user = os.environ['USER']
cpu = os.environ['BB_CPU']
world_comm = MPI.COMM_WORLD
world_size = world_comm.Get_size()
my_rank = world_comm.Get_rank()
node = socket.gethostname()

print(f"Hello {user}! I am process {my_rank} of {world_size} from {node} (node type: {cpu})")
#!/bin/bash

#SBATCH --nodes=1
#SBATCH --ntasks=8
#SBATCH --time=05:00
#SBATCH --qos=bbshort

set -e

module purge; module load bluebear
module load bear-apps/2024a
module load mpi4py/4.0.1-gompi-2024a

mpiexec -n ${SLURM_NTASKS} python hello.py

Example scripts: Python (ML model training)

In this section, we will discuss how to run multi-GPU machine learning (ML) and deep learning (DL) model training on BlueBEAR. Training large ML/DL models often requires significant computational resources, and leveraging multiple GPUs can greatly accelerate the training process. Te example scripts below assume that the model training script is written in Python using the PyTorch framework.

Accelerating ML model training is achieved through techniques like Data Parallelism (DP), Model Parallelism (MP) and FSDP. Data Parallelism involves splitting the training data across multiple GPUs, where each GPU processes a subset of the data and computes gradients independently. Model Parallelism, on the other hand, involves splitting the model itself across multiple GPUs, allowing for larger models that may not fit into a single GPU's memory. FSDP is an advanced technique that combines aspects of both DP and MP that shards model parameters and gradients across GPUs, optimizing memory usage and communication overhead.

While PyTorch's distributed library provides the necessary tools for implementing these parallelism techniques, setting up and managing distributed training can be complex. Follow Writing Distributed Applications with PyTorch for a detailed guide on using PyTorch's Distributed Data Parallel (DDP). While this gives user fine-grained control over the training process, it requires significant boilerplate code and a deep understanding of distributed systems. Thus, we recommend using higher-level frameworks built on top of PyTorch, such as Lightning or Hugging Face accelerate, which simplify the process of setting up distributed training. These frameworks allow researchers to conduct research and train models productively, write cleaner code, easily experiment with different distributed strategies such as DDP/FSDP, and leverage built-in features such as checkpointing, logging, and hyper-parameter tuning without the need to manage low-level details.

The BlueBEAR comes pre-installed with accelerate and lightning modules. Below are example scripts for running multi-GPU training jobs using these frameworks.

import os
import argparse

import lightning as L
import torch
from torch import Tensor, nn, optim, utils
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor


# define the LightningModule (Additional step compared to pure PyTorch)
class LitAutoEncoder(L.LightningModule):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x, _ = batch
        x = x.view(x.size(0), -1)
        z = self.encoder(x)
        x_hat = self.decoder(z)
        loss = nn.functional.mse_loss(x_hat, x)
        # Logging to TensorBoard (if installed) by default
        self.log("train_loss", loss)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        return optimizer


def main(args):
    # define any number of nn.Modules (or use your current ones)
    encoder = nn.Sequential(nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3))
    decoder = nn.Sequential(nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28))

    # init the autoencoder LightningModule (Wrapper for pure PyTorch)
    autoencoder = LitAutoEncoder(encoder, decoder)

    # setup data
    dataset = MNIST(os.getcwd(), download=True, transform=ToTensor())
    train_loader = utils.data.DataLoader(dataset)

    # define lightning trainer (additional step compared to pure PyTorch)
    trainer = L.Trainer(
        limit_train_batches=100,
        max_epochs=1,
        devices=args.gpus,
        num_nodes=args.nodes,
        accelerator="gpu" if torch.cuda.is_available() else "cpu",
        strategy="ddp",
    )

    # train the model
    trainer.fit(model=autoencoder, train_dataloaders=train_loader)

    # choose your trained nn.Module
    encoder = autoencoder.encoder
    encoder.eval()

    # embed 4 fake images!
    fake_image_batch = torch.rand(4, 28 * 28, device=autoencoder.device)
    embeddings = encoder(fake_image_batch)
    print(
        "⚡" * 20, "\nPredictions (4 image embeddings):\n", embeddings, "\n", "⚡" * 20
    )


if __name__ == "__main__":
    # Parse input arguments for number of GPUs and nodes
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--gpus",
        type=int,
        default=1,
        help="Number of GPUs to use (default: 1)",
    )
    parser.add_argument(
        "--nodes",
        type=int,
        default=1,
        help="Number of nodes to use (default: 1)",
    )

    args = parser.parse_args()
    main(args)
#!/bin/bash
#SBATCH --account=project_name
#SBATCH --qos=bbgpu
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=2
#SBATCH --gres=gpu:a100:1
#SBATCH --time=15:0

set -e

module purge; module load bluebear
module load bear-apps/2023a
module load Lightning/2.2.1-foss-2023a-CUDA-12.1.1 # This will load Python, PyTorch, CUDA
module load torchvision/0.16.0-foss-2023a-CUDA-12.1.1

# Store pip cache in /scratch directory, instead of the default home directory location
PIP_CACHE_DIR="/scratch/${USER}/pip"

# Execute your model training python script
srun python mnist_example_lightning.py --gpus 1 --nodes 1
#!/bin/bash
#SBATCH --account=project_name
#SBATCH --qos=bbgpu
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=2
#SBATCH --gres=gpu:a100:2
#SBATCH --time=15:0

set -e

module purge; module load bluebear
module load bear-apps/2023a
module load Lightning/2.2.1-foss-2023a-CUDA-12.1.1 # This will load Python, PyTorch, CUDA
module load torchvision/0.16.0-foss-2023a-CUDA-12.1.1

# Store pip cache in /scratch directory, instead of the default home directory location
PIP_CACHE_DIR="/scratch/${USER}/pip"

# Execute your model training python script
srun python mnist_example_lightning.py --gpus 2 --nodes 1
#!/bin/bash
#SBATCH --account=project_name
#SBATCH --qos=bbgpu
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=2
#SBATCH --gres=gpu:a100:1
#SBATCH --time=15:0

set -e

module purge; module load bluebear
module load bear-apps/2023a
module load Lightning/2.2.1-foss-2023a-CUDA-12.1.1 # This will load Python, PyTorch, CUDA
module load torchvision/0.16.0-foss-2023a-CUDA-12.1.1

# Store pip cache in /scratch directory, instead of the default home directory location
PIP_CACHE_DIR="/scratch/${USER}/pip"

# Execute your model training python script
srun python mnist_example_lightning.py --gpus 1 --nodes 2

Developing for parallel workflows

Achieving parallelism requires more than just changing the job submission script, the code itself must be written or adapted to make use of parallel frameworks such as OpenMP (for shared-memory parallelism), MPI (for distributed-memory communication), or CUDA (for GPU acceleration). These frameworks provide the tools to divide work into smaller tasks, coordinate communication, and ensure results are combined correctly.

Example scripts: C++

The following will show how to use it on BlueBEAR using the OpenMPI implementation, with a simple c++ example provided.

#include <mpi.h>
#include <iostream>

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);

    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);

    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

    char processor_name[MPI_MAX_PROCESSOR_NAME];
    int name_len;
    MPI_Get_processor_name(processor_name, &name_len);

    std::cout << "Hello from processor " << processor_name
            << ", rank " << world_rank << " out of " << world_size << " processors" << std::endl;

    MPI_Finalize();
    return 0;
}
#!/bin/bash
#SBATCH --qos=bbshort
#SBATCH --account=_projectname_
#SBATCH --ntasks=1
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2024a
module load OpenMPI/5.0.3-GCC-13.3.0

mpic++ -o mpi_example mpi.cpp 
#!/bin/bash
#SBATCH --qos=bbshort
#SBATCH --account=_projectname_
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2024a
module load OpenMPI/5.0.3-GCC-13.3.0

srun ./mpi_example

The below example shows only compiling and running some OpenMP on the CPU.

#include <iostream>
#include <omp.h>

int main() {
    #pragma omp parallel
    {
    int thread_id = omp_get_thread_num();
    int num_threads = omp_get_num_threads();

    std::cout << "Hello from thread " << thread_id 
            << " out of " << num_threads << " threads.\n";
    }

    return 0;
}
#!/bin/bash
#SBATCH --qos=bbshort
#SBATCH --account=_projectname_
#SBATCH --ntasks=1
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2024a
module load GCC/13.3.0

g++ -fopenmp -o openmp_example openmp.cpp
#!/bin/bash
#SBATCH --qos=bbshort
#SBATCH --account=_projectname_
#SBATCH --nodes=1
#SBATCH --ntasks=2
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2024a
module load GCC/13.3.0

OMP_NUM_THREADS=2 ./openmp_example

The below example shows targetting a single GPU on a single node.

#include <iostream>

__global__ void hello_cuda() {
printf("Hello from GPU thread %d!\n", threadIdx.x);
}

int main() {
    hello_cuda<<<1, 2>>>();

    cudaDeviceSynchronize();

    std::cout << "Hello from CPU!\n";
    return 0;
}
#!/bin/bash
#SBATCH --qos=bbgpu
#SBATCH --account=_projectname_
#SBATCH --ntasks=1
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2023a
module load NVHPC/24.11-CUDA-12.6.0

nvcc -o cuda_example cuda.cu
#!/bin/bash
#SBATCH --qos=bbgpu
#SBATCH --account=_projectname_
#SBATCH --nodes=1
#SBATCH --ntasks=2
#SBATCH --gres=gpu:a100:1
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2023a
module load NVHPC/24.11-CUDA-12.6.0

./cuda_example

The purpose of the below section is show you can combine the preceding techniques such as MPI and CUDA to target multiple GPUs and CPUs across multiple nodes. The below example has a single thread run a GPU on each node.

#include <mpi.h>
#include <iostream>
#include <cstdio>
#include <cuda_runtime.h>

__global__ void hello_cuda(int rank) {
printf("Hello from GPU thread %d on MPI rank %d!\n", threadIdx.x, rank);
}

int main(int argc, char** argv) {
MPI_Init(&argc, &argv);

int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);

hello_cuda<<<1, 2>>>(world_rank);

cudaDeviceSynchronize();

std::cout << "Hello from CPU on processor " << processor_name
            << ", rank " << world_rank << " out of " << world_size << " processors" << std::endl;

MPI_Finalize();
return 0;
}
#!/bin/bash
#SBATCH --qos=bbgpu
#SBATCH --account=_projectname_
#SBATCH --ntasks=1
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2023a
module load NVHPC/24.11-CUDA-12.6.0
module load OpenMPI

nvcc -ccbin mpic++ -o mpi_cuda_example mpi_cuda.cu
#!/bin/bash
#SBATCH --qos=bbgpu
#SBATCH --account=_projectname_
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=gpu:a100:2
#SBATCH --time=1:0

module purge; module load bluebear
module load bear-apps/2023a
module load OpenMPI

srun ./mpi_cuda_example