Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.

libs/beast/example/http/client/crawl/http_crawl.cpp

//
// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/boostorg/beast
//

//------------------------------------------------------------------------------
//
// Example: HTTP crawl (asynchronous)
//
//------------------------------------------------------------------------------

#include "urls_large_data.hpp"

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <functional>
#include <iomanip>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <map>

using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
namespace http = boost::beast::http;    // from <boost/beast/http.hpp>
namespace chrono = std::chrono;         // from <chrono>

//------------------------------------------------------------------------------

// This structure aggregates statistics on all the sites
class crawl_report
{
    boost::asio::io_context& ioc_;
    boost::asio::strand<
        boost::asio::io_context::executor_type> strand_;
    std::atomic<std::size_t> index_;
    std::vector<char const*> const& hosts_;
    std::size_t count_ = 0;

public:
    crawl_report(boost::asio::io_context& ioc)
        : ioc_(ioc)
        , strand_(ioc_.get_executor())
        , index_(0)
        , hosts_(urls_large_data())
    {
    }

    // Run an aggregation function on the strand.
    // This allows synchronization without a mutex.
    template<class F>
    void
    aggregate(F const& f)
    {
        boost::asio::post(
            strand_,
            [&, f]
            {
                f(*this);
                if(count_ % 100 == 0)
                {
                    std::cerr <<
                        "Progress: " << count_ << " of " << hosts_.size() << "\n";
                    //std::cerr << *this;
                }
                ++count_;
            });
    }

    // Returns the next host to check
    char const*
    get_host()
    {
        auto const n = index_++;
        if(n >= hosts_.size())
            return nullptr;
        return hosts_[n];
    }

    // Counts the number of timer failures
    std::size_t timer_failures = 0;

    // Counts the number of name resolution failures
    std::size_t resolve_failures = 0;

    // Counts the number of connection failures
    std::size_t connect_failures = 0;

    // Counts the number of write failures
    std::size_t write_failures = 0;

    // Counts the number of read failures
    std::size_t read_failures = 0;

    // Counts the number of success reads
    std::size_t success = 0;

    // Counts the number received of each status code
    std::map<unsigned, std::size_t> status_codes;
};

std::ostream&
operator<<(std::ostream& os, crawl_report const& report)
{
    // Print the report
    os <<
        "Crawl report\n" <<
        "   Failure counts\n" <<
        "       Timer   : " << report.timer_failures << "\n" <<
        "       Resolve : " << report.resolve_failures << "\n" <<
        "       Connect : " << report.connect_failures << "\n" <<
        "       Write   : " << report.write_failures << "\n" <<
        "       Read    : " << report.read_failures << "\n" <<
        "       Success : " << report.success << "\n" <<
        "   Status codes\n"
        ;
    for(auto const& result : report.status_codes)
        os <<
        "       " << std::setw(3) << result.first << ": " << result.second <<
                    " (" << http::obsolete_reason(static_cast<http::status>(result.first)) << ")\n";
    os.flush();
    return os;
}

//------------------------------------------------------------------------------

// Performs HTTP GET requests and aggregates the results into a report
class worker : public std::enable_shared_from_this<worker>
{
    enum
    {
        // Use a small timeout to keep things lively
        timeout = 5
    };

    crawl_report& report_;
    tcp::resolver resolver_;
    tcp::socket socket_;
    boost::asio::steady_timer timer_;
    boost::asio::strand<
        boost::asio::io_context::executor_type> strand_;
    boost::beast::flat_buffer buffer_; // (Must persist between reads)
    http::request<http::empty_body> req_;
    http::response<http::string_body> res_;

public:
    worker(worker&&) = default;

    // Resolver and socket require an io_context
    worker(
        crawl_report& report,
        boost::asio::io_context& ioc)
        : report_(report)
        , resolver_(ioc)
        , socket_(ioc)
        , timer_(ioc,
            (chrono::steady_clock::time_point::max)())
        , strand_(ioc.get_executor())
    {
        // Set up the common fields of the request
        req_.version(11);
        req_.method(http::verb::get);
        req_.target("/");
        req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
    }

    // Start the asynchronous operation
    void
    run()
    {
        // Run the timer. The timer is operated
        // continuously, this simplifies the code.
        on_timer({});

        do_get_host();
    }

    void
    on_timer(boost::system::error_code ec)
    {
        if(ec && ec != boost::asio::error::operation_aborted)
        {
            // Should never happen
            report_.aggregate(
            [](crawl_report& rep)
            {
                ++rep.timer_failures;
            });
            return;
        }

        // Verify that the timer really expired since the deadline may have moved.
        if(timer_.expiry() <= chrono::steady_clock::now())
        {
            socket_.shutdown(tcp::socket::shutdown_both, ec);
            socket_.close(ec);
            return;
        }

        // Wait on the timer
        timer_.async_wait(
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &worker::on_timer,
                    shared_from_this(),
                    std::placeholders::_1)));
    }

    void
    do_get_host()
    {
        // Grab another host
        auto const host = report_.get_host();

        // nullptr means no more work
        if(! host)
        {
            timer_.cancel_one();
            return;
        }

        // The Host HTTP field is required
        req_.set(http::field::host, host);

        // Set the timer
        timer_.expires_after(chrono::seconds(timeout));

        // Set up an HTTP GET request message
        // Look up the domain name
        resolver_.async_resolve(
            host,
            "http",
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &worker::on_resolve,
                    shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2)));
    }

    void
    on_resolve(
        boost::system::error_code ec,
        tcp::resolver::results_type results)
    {
        if(ec)
        {
            report_.aggregate(
            [](crawl_report& rep)
            {
                ++rep.resolve_failures;
            });
            return do_get_host();
        }

        // Set the timer
        timer_.expires_after(chrono::seconds(timeout));

        // Make the connection on the IP address we get from a lookup
        boost::asio::async_connect(
            socket_,
            results.begin(),
            results.end(),
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &worker::on_connect,
                    shared_from_this(),
                    std::placeholders::_1)));
    }

    void
    on_connect(boost::system::error_code ec)
    {
        if(ec)
        {
            report_.aggregate(
            [](crawl_report& rep)
            {
                ++rep.connect_failures;
            });
            return do_get_host();
        }

        // Set the timer
        timer_.expires_after(chrono::seconds(timeout));

        // Send the HTTP request to the remote host
        http::async_write(
            socket_,
            req_,
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &worker::on_write,
                    shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2)));
    }

    void
    on_write(
        boost::system::error_code ec,
        std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
        {
            report_.aggregate(
            [](crawl_report& rep)
            {
                ++rep.write_failures;
            });
            return do_get_host();
        }
        
        // Set the timer
        timer_.expires_after(chrono::seconds(timeout));

        // Receive the HTTP response
        http::async_read(
            socket_,
            buffer_,
            res_,
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &worker::on_read,
                    shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2)));
    }

    void
    on_read(
        boost::system::error_code ec,
        std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);

        if(ec)
        {
            report_.aggregate(
            [](crawl_report& rep)
            {
                ++rep.read_failures;
            });
            return do_get_host();
        }

        auto const code = res_.result_int();
        report_.aggregate(
        [code](crawl_report& rep)
        {
            ++rep.success;
            ++rep.status_codes[code];
        });

        // Gracefully close the socket
        socket_.shutdown(tcp::socket::shutdown_both, ec);
        socket_.close(ec);

        // If we get here then the connection is closed gracefully

        do_get_host();
    }
};

class timer
{
    using clock_type = chrono::system_clock;

    clock_type::time_point when_;

public:
    using duration = clock_type::duration;

    timer()
        : when_(clock_type::now())
    {
    }

    duration
    elapsed() const
    {
        return clock_type::now() - when_;
    }
};

int main(int argc, char* argv[])
{
    // Check command line arguments.
    if (argc != 2)
    {
        std::cerr <<
            "Usage: http-crawl <threads>\n" <<
            "Example:\n" <<
            "    http-crawl 100 1\n";
        return EXIT_FAILURE;
    }
    auto const threads = std::max<int>(1, std::atoi(argv[1]));

    // The io_context is required for all I/O
    boost::asio::io_context ioc{1};

    // The work keeps io_context::run from returning
    auto work = boost::asio::make_work_guard(ioc);

    // The report holds the aggregated statistics
    crawl_report report{ioc};

    timer t;

    // Create and launch the worker threads.
    std::vector<std::thread> workers;
    workers.reserve(threads + 1);
    for(int i = 0; i < threads; ++i)
        workers.emplace_back(
        [&report]
        {
            // We use a separate io_context for each worker because
            // the asio resolver simulates asynchronous operation using
            // a dedicated worker thread per io_context, and we want to
            // do a lot of name resolutions in parallel.
            boost::asio::io_context ioc{1};
            std::make_shared<worker>(report, ioc)->run();
            ioc.run();
        });

    // Add another thread to run the main io_context which
    // is used to aggregate the statistics
    workers.emplace_back(
    [&ioc]
    {
        ioc.run();
    });

    // Now block until all threads exit
    for(std::size_t i = 0; i < workers.size(); ++i)
    {
        auto& thread = workers[i];

        // If this is the last thread, reset the
        // work object so that it can return from run.
        if(i == workers.size() - 1)
            work.reset();

        // Wait for the thread to exit
        thread.join();
    }

    std::cout <<
        "Elapsed time:    " << chrono::duration_cast<chrono::seconds>(t.elapsed()).count() << " seconds\n";
    std::cout << report;

    return EXIT_SUCCESS;
}