Thursday, August 2, 2012

Implementing thread safe queue in C++11

Concurrency again!
I'm going to show you how to implement thread safe queue using lock/mutex and condition variable based synchronization in C++11.
As you know protecting shared data in multi-threaded  environment is very important.
at this following sample I demonstrated how to use mutexes and condition variables to write a thread-safe queue.
I put all description as comments to help you understand better.

#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template <class T>
class tsqueue
{
private :
	std::queue<T> mData;
	// check empty() method for description of "mutable"
	mutable std::mutex mMut; 
	std::condition_variable mEmptyCondition;
	void wait_for_debugging()
	{
		std::this_thread::sleep_for(std::chrono::milliseconds(200+rand()%200));
	}
public:
	void Push(T pValue)
	{
		// block execution here, if other thread already locked mMute!
		std::lock_guard<std::mutex> lock(mMut); 
		// if weare here no other thread is owned/locked mMute. so we can modify the internal data
		mData.push(pValue); 
		// OK! notify one of threads which was blocked because , queue empty and make them happy!
		mEmptyCondition.notify_one(); 
		wait_for_debugging();
	} // //lock.unlock(); !!!! does not have such method!
	// now mMute is unlocked in destcutor of lock_guard!

	std::shared_ptr<T> Pop()
	{
		// own the mMute like in Push method excep that lock can be unlocked manually without need to destructor of lock!
		std::unique_lock<std::mutex> lock(mMut); 
		// if we are here. mMute is locked and no other thread can access/modify the data!
		// wait() method first checks if mData is not empty, allowes execution to go on. else :
		// unlocks the mMut and waits for signla.
		// because mMute is released other threads have a chance to Push new data into queue 
		// ... in notify this condition variable!
		mEmptyCondition.wait(lock,[this]{ return !mData.empty();});
		// if we are are here, mData is not empty and mMut is locked !
		// be careful ! std::shared_ptr<T> ret(new T(mData.front()) performs 2 memory allocation!
		std::shared_ptr<T> ret=std::shared_ptr<T>(std::make_shared<T>(mData.front()));
		mData.pop();
		wait_for_debugging();
		return ret;
	}
	std::shared_ptr<T> TryPop()
	{
		std::lock_guard<std::mutex> lock(mMut); 
		if(mData.empty())
			return std::shared_ptr<T>(); // null
		std::shared_ptr<T> ret=std::shared_ptr<T>(std::make_shared<T>(mData.front()));
		mData.pop();
		wait_for_debugging();
		return ret;
	}
	bool Empty() const
	{
		//  this function is "const", so why are we allowed to modify a member vaiable "mMute"?....
		//we can, because we marked it "mutable"! see definition of mMute.
		std::lock_guard<std::mutex> lock(mMut); 
		return mData.empty();
	}
};

void test_tsq()
{
	tsqueue<int> tsq;
	int N=100;
	std::thread thrpush([&]{
		for(int i=0;i<N;i++)
		{
			tsq.Push(i);
			std::cout <<i<<" pushed \n";
		}
	});
	std::thread thrpop([&]{
		for(int i=0;i<N/2;i++)
		{
			std::cout <<"popping\n";
			std::shared_ptr<int> data=tsq.Pop();
			std::cout <<i<<": poped "<<*data<<"\n";
		}
	});
	std::thread thrtrypop([&]{
		int i=0;
		for(;i<N/2;)
		{
			std::cout <<"trying for pop\n";
			std::shared_ptr<int> data=tsq.TryPop();
			if(data.get()!=NULL)
			{
			std::cout <<i++<<": try poped "<<*data<<"\n";
			}
			else
				std::cout <<"failed trying pop \n";

		}
	});
	thrpush.join();
	thrpop.join();
	thrtrypop.join();
}


The only reminded point to be notices about mEmptyCondition.wait(...) method is that if this condition variable was notified after any push() method from other thread, It checks the mData.empty() condition for avoiding race condition. because there is a chance for "pop"ing last inserted item by other thread. So even being notified is not enough reason to continuing execution and modifying data. ( in this case because of notifyone(). only one condition variable is notified among all waiting condition variables)

No comments :

Post a Comment

Thursday, August 2, 2012

Implementing thread safe queue in C++11

Concurrency again!
I'm going to show you how to implement thread safe queue using lock/mutex and condition variable based synchronization in C++11.
As you know protecting shared data in multi-threaded  environment is very important.
at this following sample I demonstrated how to use mutexes and condition variables to write a thread-safe queue.
I put all description as comments to help you understand better.

#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template <class T>
class tsqueue
{
private :
	std::queue<T> mData;
	// check empty() method for description of "mutable"
	mutable std::mutex mMut; 
	std::condition_variable mEmptyCondition;
	void wait_for_debugging()
	{
		std::this_thread::sleep_for(std::chrono::milliseconds(200+rand()%200));
	}
public:
	void Push(T pValue)
	{
		// block execution here, if other thread already locked mMute!
		std::lock_guard<std::mutex> lock(mMut); 
		// if weare here no other thread is owned/locked mMute. so we can modify the internal data
		mData.push(pValue); 
		// OK! notify one of threads which was blocked because , queue empty and make them happy!
		mEmptyCondition.notify_one(); 
		wait_for_debugging();
	} // //lock.unlock(); !!!! does not have such method!
	// now mMute is unlocked in destcutor of lock_guard!

	std::shared_ptr<T> Pop()
	{
		// own the mMute like in Push method excep that lock can be unlocked manually without need to destructor of lock!
		std::unique_lock<std::mutex> lock(mMut); 
		// if we are here. mMute is locked and no other thread can access/modify the data!
		// wait() method first checks if mData is not empty, allowes execution to go on. else :
		// unlocks the mMut and waits for signla.
		// because mMute is released other threads have a chance to Push new data into queue 
		// ... in notify this condition variable!
		mEmptyCondition.wait(lock,[this]{ return !mData.empty();});
		// if we are are here, mData is not empty and mMut is locked !
		// be careful ! std::shared_ptr<T> ret(new T(mData.front()) performs 2 memory allocation!
		std::shared_ptr<T> ret=std::shared_ptr<T>(std::make_shared<T>(mData.front()));
		mData.pop();
		wait_for_debugging();
		return ret;
	}
	std::shared_ptr<T> TryPop()
	{
		std::lock_guard<std::mutex> lock(mMut); 
		if(mData.empty())
			return std::shared_ptr<T>(); // null
		std::shared_ptr<T> ret=std::shared_ptr<T>(std::make_shared<T>(mData.front()));
		mData.pop();
		wait_for_debugging();
		return ret;
	}
	bool Empty() const
	{
		//  this function is "const", so why are we allowed to modify a member vaiable "mMute"?....
		//we can, because we marked it "mutable"! see definition of mMute.
		std::lock_guard<std::mutex> lock(mMut); 
		return mData.empty();
	}
};

void test_tsq()
{
	tsqueue<int> tsq;
	int N=100;
	std::thread thrpush([&]{
		for(int i=0;i<N;i++)
		{
			tsq.Push(i);
			std::cout <<i<<" pushed \n";
		}
	});
	std::thread thrpop([&]{
		for(int i=0;i<N/2;i++)
		{
			std::cout <<"popping\n";
			std::shared_ptr<int> data=tsq.Pop();
			std::cout <<i<<": poped "<<*data<<"\n";
		}
	});
	std::thread thrtrypop([&]{
		int i=0;
		for(;i<N/2;)
		{
			std::cout <<"trying for pop\n";
			std::shared_ptr<int> data=tsq.TryPop();
			if(data.get()!=NULL)
			{
			std::cout <<i++<<": try poped "<<*data<<"\n";
			}
			else
				std::cout <<"failed trying pop \n";

		}
	});
	thrpush.join();
	thrpop.join();
	thrtrypop.join();
}


The only reminded point to be notices about mEmptyCondition.wait(...) method is that if this condition variable was notified after any push() method from other thread, It checks the mData.empty() condition for avoiding race condition. because there is a chance for "pop"ing last inserted item by other thread. So even being notified is not enough reason to continuing execution and modifying data. ( in this case because of notifyone(). only one condition variable is notified among all waiting condition variables)

No comments :

Post a Comment