Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.

libs/mpi/example/parallel_example.cpp

// Copyright (C) 2005-2006 Matthias Troyer

// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

// An example of a parallel Monte Carlo simulation using some nodes to produce
// data and others to aggregate the data
#include <iostream>

#include <boost/mpi.hpp>
#include <boost/random/parallel.hpp>
#include <boost/random.hpp>
#include <boost/foreach.hpp>
#include <iostream>
#include <cstdlib>

namespace mpi = boost::mpi;

enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};


void calculate_samples(int sample_length)
{
  int num_samples = 100;
  std::vector<double> sample(sample_length);

  // setup communicator by splitting

  mpi::communicator world;
  mpi::communicator calculate_communicator = world.split(0);

  unsigned int num_calculate_ranks = calculate_communicator.size();
  
  // the master of the accumulaion ranks is the first of them, hence
  // with a rank just one after the last calculation rank
  int master_accumulate_rank = num_calculate_ranks;
  
  // the master of the calculation ranks sends the skeleton of the sample 
  // to the master of the accumulation ranks

  if (world.rank()==0)
    world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
  
  // next we extract the content of the sample vector, to be used in sending
  // the content later on
  
  mpi::content sample_content = mpi::get_content(sample);
  
  // now intialize the parallel random number generator
  
  boost::lcg64 engine(
        boost::random::stream_number = calculate_communicator.rank(),
        boost::random::total_streams = calculate_communicator.size()
      );
      
  boost::variate_generator<boost::lcg64&,boost::uniform_real<> > 
    rng(engine,boost::uniform_real<>());
    
  for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
    
    // calculate sample by filling the vector with random numbers
    // note that std::generate will not work since it takes the generator
    // by value, and boost::ref cannot be used as a generator.
    // boost::ref should be fixed so that it can be used as generator
    
    BOOST_FOREACH(double& x, sample)
      x = rng();
    
    // send sample to accumulation ranks
    // Ideally we want to do this as a broadcast with an inter-communicator 
    // between the calculation and accumulation ranks. MPI2 should support 
    // this, but here we present an MPI1 compatible solution.
    
    // send content of sample to first (master) accumulation process
    
    world.send(master_accumulate_rank,sample_tag,sample_content);
    
    // gather some results from all calculation ranks
    
    double local_result = sample[0];
    std::vector<double> gathered_results(calculate_communicator.size());
    mpi::all_gather(calculate_communicator,local_result,gathered_results);
  }
  
  // we are done: the master tells the accumulation ranks to quit
  if (world.rank()==0)
    world.send(master_accumulate_rank,quit_tag);
}



void accumulate_samples()
{
  std::vector<double> sample;

  // setup the communicator for all accumulation ranks by splitting

  mpi::communicator world;
  mpi::communicator accumulate_communicator = world.split(1);

  bool is_master_accumulate_rank = accumulate_communicator.rank()==0;

  // the master receives the sample skeleton
  
  if (is_master_accumulate_rank) 
    world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
    
  // and broadcasts it to all accumulation ranks
  mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
  
  // next we extract the content of the sample vector, to be used in receiving
  // the content later on
  
  mpi::content sample_content = mpi::get_content(sample);
  
  // accumulate until quit is called
  double sum=0.;
  while (true) {
  
    
    // the accumulation master checks whether we should quit
    if (world.iprobe(0,quit_tag)) {
      world.recv(0,quit_tag);
      for (int i=1; i<accumulate_communicator.size();++i)
        accumulate_communicator.send(i,quit_tag);
      std::cout << sum << "\n";
      break; // We're done
    }

    // the otehr accumulation ranks check whether we should quit
    if (accumulate_communicator.iprobe(0,quit_tag)) {
      accumulate_communicator.recv(0,quit_tag);
      std::cout << sum << "\n";
      break; // We're done
    }
    
    // check whether the master accumulation rank has received a sample
    if (world.iprobe(mpi::any_source,sample_tag)) {
      BOOST_ASSERT(is_master_accumulate_rank);
      
      // receive the content
      world.recv(mpi::any_source,sample_tag,sample_content);
      
      // now we need to braodcast
      // the problam is we do not have a non-blocking broadcast that we could 
      // abort if we receive a quit message from the master. We thus need to
      // first tell all accumulation ranks to start a broadcast. If the sample
      // is small, we could just send the sample in this message, but here we
      // optimize the code for large samples, so that the overhead of these
      // sends can be ignored, and we count on an optimized broadcast
      // implementation with O(log N) complexity

      for (int i=1; i<accumulate_communicator.size();++i)
        accumulate_communicator.send(i,sample_broadcast_tag);
      
      // now broadcast the contents of the sample to all accumulate ranks
      mpi::broadcast(accumulate_communicator,sample_content,0);
      
      // and handle the sample by summing the appropriate value
      sum += sample[0];
    }
  
    // the other accumulation ranks wait for a mesage to start the broadcast
    if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
      BOOST_ASSERT(!is_master_accumulate_rank);
      
      accumulate_communicator.recv(0,sample_broadcast_tag);
      
      // receive broadcast of the sample contents
      mpi::broadcast(accumulate_communicator,sample_content,0);
      
      // and handle the sample
      
      // and handle the sample by summing the appropriate value
      sum += sample[accumulate_communicator.rank()];
    }
  }
}

int main(int argc, char** argv)
{
  mpi::environment env(argc, argv);
  mpi::communicator world;

  // half of the processes generate, the others accumulate
  // the sample size is just the number of accumulation ranks
  if (world.rank() < world.size()/2)
    calculate_samples(world.size()-world.size()/2);
  else
    accumulate_samples();

  return 0;
}