Adding elements to vectors safely from C++11 threads
An answer to this question on Stack Overflow.
Question
My program needs to generate a large number of sample strings and since generating the strings is computationally intensive, I want to parallelize the process. My code is like this:
mutex mtx;
void my_thread(vector<string> &V, int length)
{
string s=generate_some_string(length); //computationally intensive part
mtx.lock();
V.push_back(s);
mtx.unlock();
}
int main()
{
vector<string> S;
while(S.size()<1000)
{
vector<thread> ths;
ths.resize(10);
for(int i=0; i<10;i++)
{
ths[i]=thread(my_thread,ref(S),100 );
}
for(auto &th: ths) th.join();
}
}
I get the "Double free or corruption" error when I run it.
Answer
Your code
Your use of threads looks generally correct, so the problem is likely with generate_some_string affecting a global state. You could fix this by either:
Using a better library.
Using MPI for parallelism as it will spawn processes which have separate memories.
Parallelism Philosophy
The above seems obvious in retrospect, so there's a question as to why it wasn't immediately apparent. I think this has something to do with the way you're achieving parallelism.
C++11 threads gives you a lot of flexibility, but it also requires that you construct your parallelism explicitly. Most of the time this is not what you want. It's easier, and less buggy, to give the compiler information about how it can parallelize your code and let it take care of the low-level details.
The following shows how you can do this using OpenMP: an industry-standard set of compiler-directives included in all modern compilers and used extensively in high-performance computing.
You'll notice that the code is generally easier to read than what you've written and, hence, easier to debug.
All code below will be compiled with the command (modified appropriately for your compiler:
g++ -O3 main.cpp -fopenmp
Solution 0: Use an easier form of parallelism
First off, I'd suggest using OpenMP for your parallelism. It's an industry standard, eliminates much of the pain of having to deal with threads, and allows you to express parallelism at a conceptual level.
Solution 1: Private Memory
You could solve your problem by having each thread write to its own private memory and then merging the private memories together. This avoids the mutex entirely, which may result in faster code and possibly avoids the problem you're experiencing altogether.
Note that each thread is producing multiple computationally-intensive strings, but that this work is automatically divided between the available threads. It is
#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>
const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH = 50;
using namespace std::chrono_literals;
//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
double sum=0;
for(int i=0;i<length;i++){
std::this_thread::sleep_for(2ms);
sum+=std::sqrt(i);
}
return std::to_string(sum);
}
int main(){
//Build a vector that contains vectors of strings. Each thread will have its
//own vector of strings
std::vector< std::vector<std::string> > vecs(omp_get_max_threads());
//Loop over lengths
for(int length=10;length<MAX_STRING_LENGTH;length++){
//Progress so the user does not get impatient
std::cout<<length<<std::endl;
//Parallelize across all cores
#pragma omp parallel for
for(int i=0;i<STRINGS_PER_LENGTH;i++){
//Each thread independently generates its string and puts it into its own
//private memory space
vecs[omp_get_thread_num()].push_back(GenerateSomeString(length));
}
}
//Merge all the threads' results together
std::vector<std::string> S;
for(auto &v: vecs)
S.insert(S.end(),v.begin(),v.end());
//Throw away the thread private memory
vecs.clear();
vecs.shrink_to_fit();
}
Solution 2: Use reductions
We can define a custom reduction operator to merge vectors. Using this operator in the parallel part of our code allows us to eliminate the vector of vectors and the clean-up afterwards. Instead, as the threads complete their work, OpenMP safely handles combining their results.
#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>
using namespace std::chrono_literals;
const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH = 50;
//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
double sum=0;
for(int i=0;i<length;i++){
std::this_thread::sleep_for(2ms);
sum+=std::sqrt(i);
}
return std::to_string(sum);
}
int main(){
//Global vector, must not be accessed by individual threads
std::vector<std::string> S;
#pragma omp declare reduction (merge : std::vector<std::string> : omp_out.insert(omp_out.end(), omp_in.begin(), omp_in.end()))
//Loop over lengths
for(int length=10;length<50;length++){
//Progress so the user does not get impatient
std::cout<<length<<std::endl;
//Parallelize across all cores
std::vector<std::string> private_memory;
#pragma omp parallel for reduction(merge: private_memory)
for(int i=0;i<STRINGS_PER_LENGTH;i++){
//Each thread independently generates its string and puts it into its own
//private memory space
private_memory.push_back(GenerateSomeString(length));
}
}
}
Solution 3: Use critical
We can eliminate the reduction entirely by placing the push_back into a critical section, which limits access to that part of the code to a single thread at a time.
//Compile with g++ -O3 main.cpp -fopenmp
#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>
using namespace std::chrono_literals;
const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH = 50;
//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
double sum=0;
for(int i=0;i<length;i++){
std::this_thread::sleep_for(2ms);
sum+=std::sqrt(i);
}
return std::to_string(sum);
}
int main(){
//Global vector, must not be accessed by individual threads
std::vector<std::string> S;
//Loop over lengths
for(int length=10;length<50;length++){
//Progress so the user does not get impatient
std::cout<<length<<std::endl;
//Parallelize across all cores
#pragma omp parallel for
for(int i=0;i<STRINGS_PER_LENGTH;i++){
//Each thread independently generates its string and puts it into its own
//private memory space
const auto temp = GenerateSomeString(length);
//Only one thread can access this part of the code at a time
#pragma omp critical
S.push_back(temp);
}
}
}