summaryrefslogtreecommitdiff
path: root/src/buildtool/multithreading/task_system.hpp
blob: c2e46779c09a76f08352dc217f289da2d3fbccd1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#ifndef INCLUDED_SRC_BUILDTOOL_MULTITHREADING_TASK_SYSTEM_HPP
#define INCLUDED_SRC_BUILDTOOL_MULTITHREADING_TASK_SYSTEM_HPP

#include <algorithm>
#include <atomic>
#include <thread>
#include <vector>

#include "src/buildtool/multithreading/notification_queue.hpp"

class TaskSystem {
  public:
    // Constructors create as many threads as specified (or
    // std::thread::hardware_concurrency() many if not specified) running
    // `TaskSystem::Run(index)` on them, where `index` is their position in
    // `threads_`
    TaskSystem();
    explicit TaskSystem(std::size_t number_of_threads);

    TaskSystem(TaskSystem const&) = delete;
    TaskSystem(TaskSystem&&) = delete;
    auto operator=(TaskSystem const&) -> TaskSystem& = delete;
    auto operator=(TaskSystem &&) -> TaskSystem& = delete;

    // Destructor calls sets to "done" all notification queues and joins the
    // threads. Note that joining the threads will wait until the Run method
    // they are running is finished
    ~TaskSystem();

    // Queue a task. Task will be added to the first notification queue that is
    // found to be unlocked or, if none is found (after kNumberOfAttemps
    // iterations), to the one in `index+1` position waiting until it's
    // unlocked.
    template <typename FunctionType>
    void QueueTask(FunctionType&& f) noexcept {
        auto idx = index_++;

        for (std::size_t i = 0; i < thread_count_ * kNumberOfAttempts; ++i) {
            if (queues_[(idx + i) % thread_count_].try_push(
                    std::forward<FunctionType>(f))) {
                return;
            }
        }
        queues_[idx % thread_count_].push(std::forward<FunctionType>(f));
    }

    [[nodiscard]] auto NumberOfThreads() const noexcept -> std::size_t {
        return thread_count_;
    }

  private:
    std::size_t const thread_count_{
        std::max(1U, std::thread::hardware_concurrency())};
    std::vector<std::thread> threads_{};
    std::vector<NotificationQueue> queues_{};
    std::atomic<std::size_t> index_{0};
    WaitableOneWayFlag queues_read_{};
    WaitableZeroCounter num_threads_running_{};

    static constexpr std::size_t kNumberOfAttempts = 5;

    void Run(std::size_t idx);
};

#endif  // INCLUDED_SRC_BUILDTOOL_MULTITHREADING_TASK_SYSTEM_HPP