Overlapping Communication with Computation

Posted By on March 19, 2016


Download PDF
Topologies and Embedding
Collective Communication and Computation Operations

The MPI programs we developed so far used blocking send and receive operations whenever they needed to perform point-to-point communication. Recall that a blocking send operation remains blocked until the message has been copied out of the send buffer (either into a system buffer at the source process or sent to the destination process). Similarly, a blocking receive operation returns only after the message has been received and copied into the receive buffer. For example, consider Cannon’s matrix-matrix multiplication program described in Program 6.2. During each iteration of its main computational loop (lines 47- 57), it first computes the matrix multiplication of the sub-matrices stored in a and b, and then shifts the blocks of a and b, using MPI_Sendrecv_replace which blocks until the specified matrix block has been sent and received by the corresponding processes. In each iteration, each process spends O (n3/ p1.5) time for performing the matrix-matrix multiplication and O(n2/p) time for shifting the blocks of matrices A and B. Now, since the blocks of matrices A and B do not change as they are shifted among the processors, it will be preferable if we can overlap the transmission of these blocks with the computation for the matrix-matrix multiplication, as many recent distributed-memory parallel computers have dedicated communication controllers that can perform the transmission of messages without interrupting the CPUs.

 

6.5.1 Non-Blocking Communication Operations

In order to overlap communication with computation, MPI provides a pair of functions for performing non-blocking send and receive operations. These functions are MPI_Isend and MPI_Irecv. MPI_Isend starts a send operation but does not complete, that is, it returns before the data is copied out of the buffer. Similarly, MPI_Irecv starts a receive operation but returns before the data has been received and copied into the buffer. With the support of appropriate hardware, the transmission and reception of messages can proceed concurrently with the computations performed by the program upon the return of the above functions.

However, at a later point in the program, a process that has started a non-blocking send or receive operation must make sure that this operation has completed before it proceeds with its computations. This is because a process that has started a non-blocking send operation may want to overwrite the buffer that stores the data that are being sent, or a process that has started a non-blocking receive operation may want to use the data it requested. To check the completion of non-blocking send and receive operations, MPI provides a pair of functions MPI_Test and MPI_Wait. The first tests whether or not a non-blocking operation has finished and the second waits (i.e., gets blocked) until a non-blocking operation actually finishes.

The calling sequences of MPI_Isend and MPI_Irecv are the following:

int MPI_Isend(void *buf, int count, MPI_Datatype datatype, 
        int dest, int tag, MPI_Comm comm, MPI_Request *request) 
int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, 
        int source, int tag, MPI_Comm comm, MPI_Request *request)

Note that these functions have similar arguments as the corresponding blocking send and receive functions. The main difference is that they take an additional argument request. MPI_Isend and MPI_Irecv functions allocate a request object and return a pointer to it in the request variable. This request object is used as an argument in the MPI_Test and MPI_Wait functions to identify the operation whose status we want to query or to wait for its completion.

Note that the MPI_Irecv function does not take a status argument similar to the blocking receive function, but the status information associated with the receive operation is returned by the MPI_Test and MPI_Waitfunctions.

int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status) 
int MPI_Wait(MPI_Request *request, MPI_Status *status)

MPI_Test tests whether or not the non-blocking send or receive operation identified by its request has finished. It returns flag = {true} (non-zero value in C) if it completed, otherwise it returns {false} (a zero value in C). In the case that the non-blocking operation has finished, the request object pointed to by request is deallocated and request is set to MPI_REQUEST_NULL. Also the status object is set to contain information about the operation. If the operation has not finished, request is not modified and the value of the status object is undefined. The MPI_Wait function blocks until the non-blocking operation identified by requestcompletes. In that case it deal-locates the request object, sets it to MPI_REQUEST_NULL, and returns information about the completed operation in the status object.

For the cases that the programmer wants to explicitly deallocate a request object, MPI provides the following function.

int MPI_Request_free(MPI_Request *request)

Note that the deallocation of the request object does not have any effect on the associated non-blocking send or receive operation. That is, if it has not yet completed it will proceed until its completion. Hence, one must be careful before explicitly deallocating a request object, since without it, we cannot check whether or not the non-blocking operation has completed.

A non-blocking communication operation can be matched with a corresponding blocking operation. For example, a process can send a message using a non-blocking send operation and this message can be received by the other process using a blocking receive operation.

Avoiding Deadlocks By using non-blocking communication operations we can remove most of the deadlocks associated with their blocking counterparts. For example, as we discussed in Section 6.3 the following piece of code is not safe.

 1   int a[10], b[10], myrank; 
 2   MPI_Status status; 
 3   ... 
 4   MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 
 5   if (myrank == 0) { 
 6     MPI_Send(a, 10, MPI_INT, 1, 1, MPI_COMM_WORLD); 
 7     MPI_Send(b, 10, MPI_INT, 1, 2, MPI_COMM_WORLD); 
 8   } 
 9   else if (myrank == 1) { 
10     MPI_Recv(b, 10, MPI_INT, 0, 2, &status, MPI_COMM_WORLD); 
11     MPI_Recv(a, 10, MPI_INT, 0, 1, &status, MPI_COMM_WORLD); 
12   } 
13   ...

However, if we replace either the send or receive operations with their non-blocking counterparts, then the code will be safe, and will correctly run on any MPI implementation.

 1   int a[10], b[10], myrank; 
 2   MPI_Status status; 
 3   MPI_Request requests[2]; 
 4   ... 
 5   MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 
 6   if (myrank == 0) { 
 7     MPI_Send(a, 10, MPI_INT, 1, 1, MPI_COMM_WORLD); 
 8     MPI_Send(b, 10, MPI_INT, 1, 2, MPI_COMM_WORLD); 
 9   } 
10   else if (myrank == 1) { 
11     MPI_Irecv(b, 10, MPI_INT, 0, 2, &requests[0], MPI_COMM_WORLD); 
12     MPI_Irecv(a, 10, MPI_INT, 0, 1, &requests[1], MPI_COMM_WORLD); 
13   } 
14   ...

This example also illustrates that the non-blocking operations started by any process can finish in any order depending on the transmission or reception of the corresponding messages. For example, the second receive operation will finish before the first does.

 

Example: Cannon’s Matrix-Matrix Multiplication (Using Non-Blocking Operations)

Program 6.3 shows the MPI program that implements Cannon’s algorithm using non-blocking send and receive operations. The various parameters are identical to those of Program 6.2.

Program 6.3 Non-Blocking Cannon’s Matrix-Matrix Multiplication
 1   MatrixMatrixMultiply_NonBlocking(int n, double *a, double *b, 
 2                                    double *c, MPI_Comm comm) 
 3   { 
 4     int i, j, nlocal; 
 5     double *a_buffers[2], *b_buffers[2]; 
 6     int npes, dims[2], periods[2]; 
 7     int myrank, my2drank, mycoords[2]; 
 8     int uprank, downrank, leftrank, rightrank, coords[2]; 
 9     int shiftsource, shiftdest; 
10     MPI_Status status; 
11     MPI_Comm comm_2d; 
12     MPI_Request reqs[4]; 
13 
14     /* Get the communicator related information */ 
15     MPI_Comm_size(comm, &npes); 
16     MPI_Comm_rank(comm, &myrank); 
17 
18     /* Set up the Cartesian topology */ 
19     dims[0] = dims[1] = sqrt(npes); 
20 
21     /* Set the periods for wraparound connections */ 
22     periods[0] = periods[1] = 1; 
23 
24     /* Create the Cartesian topology, with rank reordering */ 
25     MPI_Cart_create(comm, 2, dims, periods, 1, &comm_2d); 
26 
27     /* Get the rank and coordinates with respect to the new topology */ 
28     MPI_Comm_rank(comm_2d, &my2drank); 
29     MPI_Cart_coords(comm_2d, my2drank, 2, mycoords); 
30 
31     /* Compute ranks of the up and left shifts */ 
32     MPI_Cart_shift(comm_2d, 0, -1, &rightrank, &leftrank); 
33     MPI_Cart_shift(comm_2d, 1, -1, &downrank, &uprank); 
34 
35     /* Determine the dimension of the local matrix block */ 
36     nlocal = n/dims[0]; 
37 
38     /* Setup the a_buffers and b_buffers arrays */ 
39     a_buffers[0] = a; 
40     a_buffers[1] = (double *)malloc(nlocal*nlocal*sizeof(double)); 
41     b_buffers[0] = b; 
42     b_buffers[1] = (double *)malloc(nlocal*nlocal*sizeof(double)); 
43 
44     /* Perform the initial matrix alignment. First for A and then for B */ 
45     MPI_Cart_shift(comm_2d, 0, -mycoords[0], &shiftsource, &shiftdest); 
46     MPI_Sendrecv_replace(a_buffers[0], nlocal*nlocal, MPI_DOUBLE, 
47         shiftdest, 1, shiftsource, 1, comm_2d, &status); 
48 
49     MPI_Cart_shift(comm_2d, 1, -mycoords[1], &shiftsource, &shiftdest); 
50     MPI_Sendrecv_replace(b_buffers[0], nlocal*nlocal, MPI_DOUBLE, 
51         shiftdest, 1, shiftsource, 1, comm_2d, &status); 
52 
53     /* Get into the main computation loop */ 
54     for (i=0; i<dims[0]; i++) { 
55       MPI_Isend(a_buffers[i%2], nlocal*nlocal, MPI_DOUBLE, 
56           leftrank, 1, comm_2d, &reqs[0]); 
57       MPI_Isend(b_buffers[i%2], nlocal*nlocal, MPI_DOUBLE, 
58           uprank, 1, comm_2d, &reqs[1]); 
59       MPI_Irecv(a_buffers[(i+1)%2], nlocal*nlocal, MPI_DOUBLE, 
60           rightrank, 1, comm_2d, &reqs[2]); 
61       MPI_Irecv(b_buffers[(i+1)%2], nlocal*nlocal, MPI_DOUBLE, 
62           downrank, 1, comm_2d, &reqs[3]); 
63 
64       /* c = c + a*b */ 
65       MatrixMultiply(nlocal, a_buffers[i%2], b_buffers[i%2], c); 
66 
67       for (j=0; j<4; j++) 
68         MPI_Wait(&reqs[j], &status); 
69     } 
70 
71     /* Restore the original distribution of a and b */ 
72     MPI_Cart_shift(comm_2d, 0, +mycoords[0], &shiftsource, &shiftdest); 
73     MPI_Sendrecv_replace(a_buffers[i%2], nlocal*nlocal, MPI_DOUBLE, 
74         shiftdest, 1, shiftsource, 1, comm_2d, &status); 
75 
76     MPI_Cart_shift(comm_2d, 1, +mycoords[1], &shiftsource, &shiftdest); 
77     MPI_Sendrecv_replace(b_buffers[i%2], nlocal*nlocal, MPI_DOUBLE, 
78         shiftdest, 1, shiftsource, 1, comm_2d, &status); 
79 
80     MPI_Comm_free(&comm_2d); /* Free up communicator */ 
81 
82     free(a_buffers[1]); 
83     free(b_buffers[1]); 
84  }

There are two main differences between the blocking program (Program 6.2) and this non-blocking one. The first difference is that the non-blocking program requires the use of the additional arrays a_buffers andb_buffers, that are used as the buffer of the blocks of A and B that are being received while the computation involving the previous blocks is performed. The second difference is that in the main computational loop, it first starts the non-blocking send operations to send the locally stored blocks of A and B to the processes left and up the grid, and then starts the non-blocking receive operations to receive the blocks for the next iteration from the processes right and down the grid. Having initiated these four non-blocking operations, it proceeds to perform the matrix-matrix multiplication of the blocks it currently stores. Finally, before it proceeds to the next iteration, it uses MPI_Wait to wait for the send and receive operations to complete.

Note that in order to overlap communication with computation we have to use two auxiliary arrays – one for A and one for B. This is to ensure that incoming messages never overwrite the blocks of A and B that are used in the computation, which proceeds concurrently with the data transfer. Thus, increased performance (by overlapping communication with computation) comes at the expense of increased memory requirements. This is a trade-off that is often made in message-passing programs, since communication overheads can be quite high for loosely coupled distributed memory parallel computers.

Topologies and Embedding
Collective Communication and Computation Operations

Download PDF

Posted by Akash Kurup

Founder and C.E.O, World4Engineers Educationist and Entrepreneur by passion. Orator and blogger by hobby

Website: http://world4engineers.com