Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

libs/flyweight/example/parallel.cpp

/* Boost.Flyweight example of parallel tokenization.
 *
 * Copyright 2024 Joaquin M Lopez Munoz.
 * Distributed under the Boost Software License, Version 1.0.
 * (See accompanying file LICENSE_1_0.txt or copy at
 * http://www.boost.org/LICENSE_1_0.txt)
 *
 * See http://www.boost.org/libs/flyweight for library home page.
 */

#include <boost/flyweight.hpp>
#include <boost/flyweight/concurrent_factory.hpp>
#include <boost/flyweight/no_locking.hpp>
#include <boost/flyweight/no_tracking.hpp>
#include <chrono>
#include <cstdlib>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

/* Handcrafted tokenizer for sequences of alphabetic characters */

inline bool match(char ch)
{
 return (ch>='a' && ch<='z') || (ch>='A' && ch<='Z');
}

template<typename ForwardIterator,typename F>
void tokenize(ForwardIterator first,ForwardIterator last,F f)
{
  goto start;
  for(;;)
  {
    for(;;){
      ++first;

    start:
      if(first==last)return;
      if(match(*first))break;
    }

    auto begin_word=first;
    for(;;){
      if(++first==last||!match(*first)){
        f(begin_word,first);
        if(first==last)return;
        else           break;
      }
    }
  }
}

/* Tokenize a string into words in parallel and store the results into a
 * std::vector<String>, String being std::string or a flyweight type.
 */

template<typename String>
void parse(const std::string& in,const char* type_name,std::size_t num_threads)
{
  using namespace std::chrono;
  using string_iterator=std::string::const_iterator;

  auto t1=steady_clock::now();

  /* Divide input in num_threads chunks, taking care that boundaries are not
   * placed in the middle of a token.
   */

  std::vector<string_iterator> boundaries(num_threads+1);

  boundaries[0]=in.begin();
  for(std::size_t i=0;i<num_threads;++i){
    auto& it=boundaries[i+1];
    it=boundaries[i]+(in.end()-boundaries[i])/(num_threads-i);
    while(it!=in.end()&&match(*it))++it;
  }

  /* do a first pass to precalculate # of words produced by each thread */

  std::vector<std::thread> threads(num_threads);
  std::vector<std::size_t> partial_num_words(num_threads);

  for(std::size_t i=0;i<num_threads;++i){
    threads[i]=std::thread([&,i]{
      std::size_t s=0;
      tokenize(
        boundaries[i],boundaries[i+1],
        [&](string_iterator,string_iterator){++s;});
      partial_num_words[i]=s;
    });
  }

  std::size_t              num_words=0;
  std::vector<std::size_t> thread_output_starts(num_threads);

  for(std::size_t i=0;i<num_threads;++i){
    threads[i].join();
    thread_output_starts[i]=num_words;
    num_words+=partial_num_words[i];
  }

  /* do a second pass, this time populating the result vector */

  std::vector<String> words(num_words,String());

  for(std::size_t i=0;i<num_threads;++i){
    threads[i]=std::thread([&,i]{
      auto out=words.begin()+thread_output_starts[i];
      tokenize(
        boundaries[i],boundaries[i+1],
        [&](string_iterator first,string_iterator last){
          *out++=String(first,last);
        });
    });
  }
  for(std::size_t i=0;i<num_threads;++i){threads[i].join();}

  auto t2=steady_clock::now();

  std::cout
    <<std::setw(20)<<type_name<<", "<<num_threads<<" thread(s): "
    <<num_words<<" words, "
    <<std::setw(9)<<duration_cast<duration<double>>(t2-t1).count()<< " s\n";
}

/* accept a file and parse it with std::string and various flyweight types */

int main(int argc, char** argv)
{
  using namespace boost::flyweights;
  using regular_flyweight=flyweight<std::string>;
  using concurrent_flyweight=flyweight<
    std::string,
    concurrent_factory<>,
    no_locking,
    no_tracking
  >;

  if(argc<2){
    std::cout<<"specify a file\n";
    std::exit(EXIT_FAILURE);
  }

  std::ifstream is(argv[1]);
  if(!is)
  {
    std::cout<<"can't open "<<argv[1]<<"\n";
    std::exit(EXIT_FAILURE);
  }

  std::string in(
    std::istreambuf_iterator<char>(is),std::istreambuf_iterator<char>{});

  parse<std::string>(in,"std::string",1);
  parse<std::string>(in,"std::string",8);
  parse<regular_flyweight>(in,"regular flyweight",1);
  parse<regular_flyweight>(in,"regular flyweight",8);
  parse<concurrent_flyweight>(in,"concurrent flyweight",1);
  parse<concurrent_flyweight>(in,"concurrent flyweight",8);
}