Remove finished threads from vector
An answer to this question on Stack Overflow.
Question
I have a number of jobs and I want to run a subset of them in parallel. E. g. I have 100 jobs to run and I want to run 10 threads at a time. This is my current code for this problem:
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
#include <random>
#include <mutex>
int main() {
constexpr std::size_t NUMBER_OF_THREADS(10);
std::atomic<std::size_t> numberOfRunningJobs(0);
std::vector<std::thread> threads;
std::mutex maxThreadsMutex;
std::mutex writeMutex;
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
for (std::size_t id(0); id < 100; ++id) {
if (numberOfRunningJobs >= NUMBER_OF_THREADS - 1) {
maxThreadsMutex.lock();
}
++numberOfRunningJobs;
threads.emplace_back([id, &numberOfRunningJobs, &maxThreadsMutex, &writeMutex, &distribution, &generator]() {
auto waitSeconds(distribution(generator));
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
writeMutex.lock();
std::cout << id << " " << waitSeconds << std::endl;
writeMutex.unlock();
--numberOfRunningJobs;
maxThreadsMutex.unlock();
});
}
for (auto &thread : threads) {
thread.join();
}
return 0;
}
In the for loop I check how many jobs are running and if a slot is free, I add a new thread to the vector. At the end of each thread I decrement the number of running jobs and unlock the mutex to start one new thread. This solves my task but there is one point I don't like. I need a vector of size 100 to store all threads and I need to join all 100 threads at the end. I want to remove each thread from vector after it finished so that the vector contains a maximum of 10 threads and I have to join 10 threads at the end. I think about passing the vector and an iterator by reference to the lambda so that I can remove the element at the end but I don't know how. How can I optimize my code to use a maximum of 10 elements in the vector?
Answer
Since you don't seem to require extremely fine-grained thread control, I'd recommend approaching this problem with OpenMP. OpenMP is an industry-standard directive-based approach for parallelizing C, C++, and FORTRAN code. Every major compiler for these languages implements it.
Using it results in a significant reduction in the complexity of your code:
#include <iostream>
#include <random>
int main() {
constexpr std::size_t NUMBER_OF_THREADS(10);
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
//Distribute the loop between threads ensuring that only
//a specific number of threads are ever active at once.
#pragma omp parallel for num_threads(NUMBER_OF_THREADS)
for (std::size_t id(0); id < 100; ++id) {
#pragma omp critical //Serialize access to generator
auto waitSeconds(distribution(generator));
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
#pragma omp critical //Serialize access to cout
std::cout << id << " " << waitSeconds << std::endl;
}
return 0;
}
To use OpenMP you compile with:
g++ main.cpp -fopenmp
Generating and directly coordinating threads is sometimes necessary, but the massive number of new languages and libraries designed to make parallelism easier speaks to the number of use cases in which a simpler path to parallelism is sufficient.