Friday, June 15, 2012

Producer Consumer Pattern implementation using pthread

At this post you will see how to use pthread library for multi-threaded programming and implementing Producer/Consumer pattern in simplest way. you will see how can you avoid busy waiting with condition variables and how to do thread synchronizing. at this example, producer generates random traffic and put it in buffer, and simultaneously consumer retrieves the produced traffic and check if they are suspicious or not. please take care that we can't pass none-static class member as pthread_create function. for handling the problem you need to add a static method to the class and use it in the pthread_create function. this project runs on bot Windows+MinGw and Linux. (( don't forget to add -lpthread for g++ when compiling this file ))
#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
#include <sstream>
using namespace std;

class Requests
{
private:
 queue<string> mBuffer;
 static const int MAX_BUFFER_LEN=100;
 static const int MAX_PROCESS_LEN=1000;
 int mProcessed;
 int mLen;
 pthread_mutex_t mLock;
 pthread_cond_t mEmptyCond;
 pthread_cond_t mFullCond;
public:
 Requests():mLen(0),mProcessed(0)
 {
  pthread_mutex_init(&mLock,NULL);
  pthread_cond_init(&mFullCond,NULL);
  pthread_cond_init(&mEmptyCond,NULL);
 }
 ~Requests()
 {
  pthread_mutex_destroy(&mLock);
  pthread_cond_destroy(&mFullCond);
  pthread_cond_destroy(&mEmptyCond);
 }
 void Add(string pRequest)
 {
  pthread_mutex_lock(&mLock);
  if(mLen==MAX_BUFFER_LEN)
  {
   cout << "Add waiting because of Full Condition...\n";
   pthread_cond_wait(&mFullCond,&mLock);
  }
  if(ProcessedEnough())
  {
   pthread_mutex_unlock(&mLock);
   return ;
  }
  mProcessed++;
  cout << "Pushing :"<<pRequest<<endl;
  mBuffer.push(pRequest);
  mLen++;
  pthread_mutex_unlock(&mLock);
  pthread_cond_signal(&mEmptyCond);

 }
 string Remove(bool &pProcessFinished)
 {
  pProcessFinished=false;
  pthread_mutex_lock(&mLock);
  if(mLen==0)
  {
   if(ProcessedEnough())
   {
    pProcessFinished=true;
    pthread_mutex_unlock(&mLock);
    pthread_cond_signal(&mFullCond);
    return "";
   }
   cout << "Remove waiting because of Empty Condition...\n";
   pthread_cond_wait(&mEmptyCond,&mLock);
  }

  string ret=mBuffer.front();
  mBuffer.pop();
  cout << ">>>>>>Popping :"<<ret<<endl;
  mLen--;
  pthread_mutex_unlock(&mLock);
  pthread_cond_signal(&mFullCond);

  return ret;
 }
 bool ProcessedEnough()
 {
  return mProcessed > MAX_PROCESS_LEN;
 }
};

class TrafficProducer
{
private :
 Requests *mRequestBuffer;
public:
 TrafficProducer(Requests *pReq):mRequestBuffer(pReq)
 {

 }
 void * ProduceTraffic()
 {
  int i=0;

  while(! mRequestBuffer->ProcessedEnough())
  {
   Sleep(rand()%10);
   ostringstream os;
   os << i++;
   mRequestBuffer->Add(os.str());

  }
  return NULL;
 }
 static void * Run(void *pContext)
 {
  return static_cast<TrafficProducer *>(pContext)->ProduceTraffic();
 }
};
class TrafficInspector
{
private :
 Requests *mRequestBuffer;
public:
 TrafficInspector(Requests *pReq):mRequestBuffer(pReq)
 {

 }
 void * InspectTraffic()
 {
  int i;
  string s;
  while(1)
  {
   Sleep(rand()%5);
   bool finished;
   s= mRequestBuffer->Remove(finished);
   if(finished)
    return NULL;
   i=atoi(s.c_str());
   if(i%19 == 0)
   {
    cout << "suspicios data recieved:"<<i<<endl;
   }
  }
  return NULL;
 }
 static void * Run(void *pContext)
 {
  return static_cast<TrafficInspector*>(pContext)->InspectTraffic();
 }
};
int main() {
 Requests reqBuff;
 TrafficProducer producer(&reqBuff);
 TrafficInspector consumer(&reqBuff);
 pthread_t th_producer;
 pthread_t th_consumer;
 pthread_create(&th_producer,NULL,TrafficProducer::Run,static_cast<void*>(&producer));
 pthread_create(&th_consumer,NULL,TrafficInspector::Run,static_cast<void*>(&consumer));
 pthread_join(th_producer,NULL);
 pthread_join(th_consumer,NULL);

 cout << "Finished" << endl; 
 return 0;
}



2 comments :

  1. I think this code has some serious issues.
    As I see it your condition_wait() are not checking the predicate in a loop.
    Furthermore you unlock() before you call cond_signal() which is also inviting races.
    I hope people do not copy this code as an example.

    ReplyDelete
    Replies
    1. Hi OetelaarNG,
      Appreciating your comment:
      1. pthread_cond_wait(...) unlocks "mLock" and let's other thread work on data store.
      2.
      ---- Add(..) ---
      pthread_mutex_unlock(&mLock);
      pthread_cond_signal(&mFullCond);
      ---
      we need to unlock here because if we don't, consumer can't peekup data from store!
      3. By the way this is a sample for single producer/single consumer.
      for multiple producer/multiple consumer, some conditions and extra works are needed.
      ----
      Thanks for your comment.

      Delete

Friday, June 15, 2012

Producer Consumer Pattern implementation using pthread

At this post you will see how to use pthread library for multi-threaded programming and implementing Producer/Consumer pattern in simplest way. you will see how can you avoid busy waiting with condition variables and how to do thread synchronizing. at this example, producer generates random traffic and put it in buffer, and simultaneously consumer retrieves the produced traffic and check if they are suspicious or not. please take care that we can't pass none-static class member as pthread_create function. for handling the problem you need to add a static method to the class and use it in the pthread_create function. this project runs on bot Windows+MinGw and Linux. (( don't forget to add -lpthread for g++ when compiling this file ))
#include <iostream>
#include <pthread.h>
#include <queue>
#include <string>
#include <sstream>
using namespace std;

class Requests
{
private:
 queue<string> mBuffer;
 static const int MAX_BUFFER_LEN=100;
 static const int MAX_PROCESS_LEN=1000;
 int mProcessed;
 int mLen;
 pthread_mutex_t mLock;
 pthread_cond_t mEmptyCond;
 pthread_cond_t mFullCond;
public:
 Requests():mLen(0),mProcessed(0)
 {
  pthread_mutex_init(&mLock,NULL);
  pthread_cond_init(&mFullCond,NULL);
  pthread_cond_init(&mEmptyCond,NULL);
 }
 ~Requests()
 {
  pthread_mutex_destroy(&mLock);
  pthread_cond_destroy(&mFullCond);
  pthread_cond_destroy(&mEmptyCond);
 }
 void Add(string pRequest)
 {
  pthread_mutex_lock(&mLock);
  if(mLen==MAX_BUFFER_LEN)
  {
   cout << "Add waiting because of Full Condition...\n";
   pthread_cond_wait(&mFullCond,&mLock);
  }
  if(ProcessedEnough())
  {
   pthread_mutex_unlock(&mLock);
   return ;
  }
  mProcessed++;
  cout << "Pushing :"<<pRequest<<endl;
  mBuffer.push(pRequest);
  mLen++;
  pthread_mutex_unlock(&mLock);
  pthread_cond_signal(&mEmptyCond);

 }
 string Remove(bool &pProcessFinished)
 {
  pProcessFinished=false;
  pthread_mutex_lock(&mLock);
  if(mLen==0)
  {
   if(ProcessedEnough())
   {
    pProcessFinished=true;
    pthread_mutex_unlock(&mLock);
    pthread_cond_signal(&mFullCond);
    return "";
   }
   cout << "Remove waiting because of Empty Condition...\n";
   pthread_cond_wait(&mEmptyCond,&mLock);
  }

  string ret=mBuffer.front();
  mBuffer.pop();
  cout << ">>>>>>Popping :"<<ret<<endl;
  mLen--;
  pthread_mutex_unlock(&mLock);
  pthread_cond_signal(&mFullCond);

  return ret;
 }
 bool ProcessedEnough()
 {
  return mProcessed > MAX_PROCESS_LEN;
 }
};

class TrafficProducer
{
private :
 Requests *mRequestBuffer;
public:
 TrafficProducer(Requests *pReq):mRequestBuffer(pReq)
 {

 }
 void * ProduceTraffic()
 {
  int i=0;

  while(! mRequestBuffer->ProcessedEnough())
  {
   Sleep(rand()%10);
   ostringstream os;
   os << i++;
   mRequestBuffer->Add(os.str());

  }
  return NULL;
 }
 static void * Run(void *pContext)
 {
  return static_cast<TrafficProducer *>(pContext)->ProduceTraffic();
 }
};
class TrafficInspector
{
private :
 Requests *mRequestBuffer;
public:
 TrafficInspector(Requests *pReq):mRequestBuffer(pReq)
 {

 }
 void * InspectTraffic()
 {
  int i;
  string s;
  while(1)
  {
   Sleep(rand()%5);
   bool finished;
   s= mRequestBuffer->Remove(finished);
   if(finished)
    return NULL;
   i=atoi(s.c_str());
   if(i%19 == 0)
   {
    cout << "suspicios data recieved:"<<i<<endl;
   }
  }
  return NULL;
 }
 static void * Run(void *pContext)
 {
  return static_cast<TrafficInspector*>(pContext)->InspectTraffic();
 }
};
int main() {
 Requests reqBuff;
 TrafficProducer producer(&reqBuff);
 TrafficInspector consumer(&reqBuff);
 pthread_t th_producer;
 pthread_t th_consumer;
 pthread_create(&th_producer,NULL,TrafficProducer::Run,static_cast<void*>(&producer));
 pthread_create(&th_consumer,NULL,TrafficInspector::Run,static_cast<void*>(&consumer));
 pthread_join(th_producer,NULL);
 pthread_join(th_consumer,NULL);

 cout << "Finished" << endl; 
 return 0;
}



2 comments :

  1. I think this code has some serious issues.
    As I see it your condition_wait() are not checking the predicate in a loop.
    Furthermore you unlock() before you call cond_signal() which is also inviting races.
    I hope people do not copy this code as an example.

    ReplyDelete
    Replies
    1. Hi OetelaarNG,
      Appreciating your comment:
      1. pthread_cond_wait(...) unlocks "mLock" and let's other thread work on data store.
      2.
      ---- Add(..) ---
      pthread_mutex_unlock(&mLock);
      pthread_cond_signal(&mFullCond);
      ---
      we need to unlock here because if we don't, consumer can't peekup data from store!
      3. By the way this is a sample for single producer/single consumer.
      for multiple producer/multiple consumer, some conditions and extra works are needed.
      ----
      Thanks for your comment.

      Delete