6 Replies - 657 Views - Last Post: 26 October 2013 - 09:50 PM Rate Topic: -----

#1 NecroWinter  Icon User is offline

  • D.I.C Regular

Reputation: 35
  • View blog
  • Posts: 318
  • Joined: 21-October 11

Threads, multiple producer/consumer

Posted 26 October 2013 - 05:12 PM

So I'm trying to do a multiple producer/consumer program. I have two producers, two consumers. I have a buffer (the array ary) of size 10. I need to fill up the buffer in order, and consume (display) the items in order up to the value 100. I think I have the consumer done correctly, but I can't really tell because I'm pretty new to threads. the producer is definitely wrong but I'm not sure what to do.




#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h>
#include <iostream>

using namespace std;


#define READ 1

#define WRITTEN 2


int ary[10];

int flag=READ;

int nextConsume = 0;
int nextProduce = 0;
int value = -1;

pthread_mutex_t lockit=PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t signalit=PTHREAD_COND_INITIALIZER;

void *produce(void *thread_number){
   int i;
   printf("I am the producer and my thread number is %ld \n",
   (long) thread_number);

   for (i=0; i<=99;i++){
      pthread_mutex_lock(&lockit);
         while (flag !=READ){
            pthread_cond_wait(&signalit,&lockit);
         }

if ((ary[nextProduce%10])==-1){
   ary[nextProduce%10]=i+1;
   nextProduce++;
}

  
   flag=WRITTEN;

   pthread_cond_broadcast(&signalit);

   pthread_mutex_unlock(&lockit);

	}/* end for*/

	printf("producer exiting\n");

	return(NULL);

}//end of producer

void *consume(void *thread_number){
	int i;
	printf("I am the consumer and my thread number is %ld \n",
	(long)thread_number);

	for (i=0;i<=99;i++){
		pthread_mutex_lock(&lockit);
		
		while (flag != WRITTEN){

			pthread_cond_wait(&signalit,&lockit);
		}

	while (ary[nextConsume%10]!=-1){
	cout << "[CONSUMER nc/mod:" << nextConsume << " m: " << nextConsume%10 << "]the data is: " << ary[nextConsume%10] << endl;
	ary[nextConsume%10]=-1;
	nextConsume++;
	}

	flag=READ;

	pthread_cond_broadcast(&signalit);

	pthread_mutex_unlock(&lockit);

	}/* end for*/

	printf("consumer exiting\n");
	return(NULL);

} // end consumer
void errorOutput(string name, int var){
	if (var<0){
		cout << name << " (error) output: " << var << endl;
		exit(var);
	}
}
int main(){
	pthread_t p1,p2,c1,c2;
	int  iret1, iret2, iret3, iret4;
	
	for (int i=0;i<10;i++){
	  ary[i]=-1;
	  cout << i << " = -1" << endl;
	}

	iret1 = pthread_create(&p1, NULL, produce, (void*) 0);
	errorOutput("iret1", iret1);
	iret2 = pthread_create(&p2, NULL, produce, (void*) 1);
	errorOutput("iret2", iret2);
	iret3 = pthread_create(&c1, NULL, consume, (void*) 2);
	errorOutput("iret3", iret3);
	iret4 = pthread_create(&c2, NULL, consume, (void*) 3);
	errorOutput("iret4", iret4);

   pthread_join(p1, NULL);
   pthread_join(p2, NULL);
   pthread_join(c1, NULL);
   pthread_join(c2, NULL); 

return 0;
}





heres the output

0 = -1
1 = -1
2 = -1
3 = -1
4 = -1
5 = -1
6 = -1
7 = -1
8 = -1
9 = -1
I am the producer and my thread number is 1
I am the producer and my thread number is 0
I am the consumer and my thread number is 2
[CONSUMER nc/mod:0 m: 0]the data is: 1
I am the consumer and my thread number is 3
[CONSUMER nc/mod:1 m: 1]the data is: 2
[CONSUMER nc/mod:2 m: 2]the data is: 3
[CONSUMER nc/mod:3 m: 3]the data is: 4
[CONSUMER nc/mod:4 m: 4]the data is: 5
[CONSUMER nc/mod:5 m: 5]the data is: 1
[CONSUMER nc/mod:6 m: 6]the data is: 2
[CONSUMER nc/mod:7 m: 7]the data is: 3
[CONSUMER nc/mod:8 m: 8]the data is: 4
[CONSUMER nc/mod:9 m: 9]the data is: 5
[CONSUMER nc/mod:10 m: 0]the data is: 6
[CONSUMER nc/mod:11 m: 1]the data is: 7
[CONSUMER nc/mod:12 m: 2]the data is: 8
[CONSUMER nc/mod:13 m: 3]the data is: 9
[CONSUMER nc/mod:14 m: 4]the data is: 6
[CONSUMER nc/mod:15 m: 5]the data is: 7
[CONSUMER nc/mod:16 m: 6]the data is: 8
[CONSUMER nc/mod:17 m: 7]the data is: 9
[CONSUMER nc/mod:18 m: 8]the data is: 10
[CONSUMER nc/mod:19 m: 9]the data is: 10
[CONSUMER nc/mod:20 m: 0]the data is: 11
[CONSUMER nc/mod:21 m: 1]the data is: 11
[CONSUMER nc/mod:22 m: 2]the data is: 12
[CONSUMER nc/mod:23 m: 3]the data is: 12
[CONSUMER nc/mod:24 m: 4]the data is: 13
[CONSUMER nc/mod:25 m: 5]the data is: 14
[CONSUMER nc/mod:26 m: 6]the data is: 15
[CONSUMER nc/mod:27 m: 7]the data is: 16
[CONSUMER nc/mod:28 m: 8]the data is: 17
[CONSUMER nc/mod:29 m: 9]the data is: 18
[CONSUMER nc/mod:30 m: 0]the data is: 19
[CONSUMER nc/mod:31 m: 1]the data is: 20
[CONSUMER nc/mod:32 m: 2]the data is: 21
[CONSUMER nc/mod:33 m: 3]the data is: 22
[CONSUMER nc/mod:34 m: 4]the data is: 23
[CONSUMER nc/mod:35 m: 5]the data is: 24
[CONSUMER nc/mod:36 m: 6]the data is: 25
[CONSUMER nc/mod:37 m: 7]the data is: 26
[CONSUMER nc/mod:38 m: 8]the data is: 27
[CONSUMER nc/mod:39 m: 9]the data is: 28
[CONSUMER nc/mod:40 m: 0]the data is: 29
[CONSUMER nc/mod:41 m: 1]the data is: 13
[CONSUMER nc/mod:42 m: 2]the data is: 14
[CONSUMER nc/mod:43 m: 3]the data is: 15
[CONSUMER nc/mod:44 m: 4]the data is: 16
[CONSUMER nc/mod:45 m: 5]the data is: 17
[CONSUMER nc/mod:46 m: 6]the data is: 18
[CONSUMER nc/mod:47 m: 7]the data is: 19
[CONSUMER nc/mod:48 m: 8]the data is: 30
[CONSUMER nc/mod:49 m: 9]the data is: 20
[CONSUMER nc/mod:50 m: 0]the data is: 21
[CONSUMER nc/mod:51 m: 1]the data is: 22
[CONSUMER nc/mod:52 m: 2]the data is: 23
[CONSUMER nc/mod:53 m: 3]the data is: 24
[CONSUMER nc/mod:54 m: 4]the data is: 31
[CONSUMER nc/mod:55 m: 5]the data is: 32
[CONSUMER nc/mod:56 m: 6]the data is: 33
[CONSUMER nc/mod:57 m: 7]the data is: 34
[CONSUMER nc/mod:58 m: 8]the data is: 35
[CONSUMER nc/mod:59 m: 9]the data is: 36
[CONSUMER nc/mod:60 m: 0]the data is: 25
[CONSUMER nc/mod:61 m: 1]the data is: 37
[CONSUMER nc/mod:62 m: 2]the data is: 38
[CONSUMER nc/mod:63 m: 3]the data is: 26
[CONSUMER nc/mod:64 m: 4]the data is: 27
[CONSUMER nc/mod:65 m: 5]the data is: 28
[CONSUMER nc/mod:66 m: 6]the data is: 29
[CONSUMER nc/mod:67 m: 7]the data is: 39
[CONSUMER nc/mod:68 m: 8]the data is: 30
[CONSUMER nc/mod:69 m: 9]the data is: 31
[CONSUMER nc/mod:70 m: 0]the data is: 40
[CONSUMER nc/mod:71 m: 1]the data is: 41
[CONSUMER nc/mod:72 m: 2]the data is: 42
[CONSUMER nc/mod:73 m: 3]the data is: 43
[CONSUMER nc/mod:74 m: 4]the data is: 44
[CONSUMER nc/mod:75 m: 5]the data is: 45
[CONSUMER nc/mod:76 m: 6]the data is: 46
[CONSUMER nc/mod:77 m: 7]the data is: 47
[CONSUMER nc/mod:78 m: 8]the data is: 48
[CONSUMER nc/mod:79 m: 9]the data is: 49
[CONSUMER nc/mod:80 m: 0]the data is: 50
[CONSUMER nc/mod:81 m: 1]the data is: 51
[CONSUMER nc/mod:82 m: 2]the data is: 52
[CONSUMER nc/mod:83 m: 3]the data is: 32
[CONSUMER nc/mod:84 m: 4]the data is: 33
[CONSUMER nc/mod:85 m: 5]the data is: 34
[CONSUMER nc/mod:86 m: 6]the data is: 53
[CONSUMER nc/mod:87 m: 7]the data is: 54
[CONSUMER nc/mod:88 m: 8]the data is: 55
[CONSUMER nc/mod:89 m: 9]the data is: 35
[CONSUMER nc/mod:90 m: 0]the data is: 36
[CONSUMER nc/mod:91 m: 1]the data is: 56
[CONSUMER nc/mod:92 m: 2]the data is: 37
[CONSUMER nc/mod:93 m: 3]the data is: 38
[CONSUMER nc/mod:94 m: 4]the data is: 39
[CONSUMER nc/mod:95 m: 5]the data is: 40
[CONSUMER nc/mod:96 m: 6]the data is: 57
[CONSUMER nc/mod:97 m: 7]the data is: 58
[CONSUMER nc/mod:98 m: 8]the data is: 59
[CONSUMER nc/mod:99 m: 9]the data is: 60
[CONSUMER nc/mod:100 m: 0]the data is: 61
[CONSUMER nc/mod:101 m: 1]the data is: 62
[CONSUMER nc/mod:102 m: 2]the data is: 63
[CONSUMER nc/mod:103 m: 3]the data is: 41
[CONSUMER nc/mod:104 m: 4]the data is: 64
[CONSUMER nc/mod:105 m: 5]the data is: 65
[CONSUMER nc/mod:106 m: 6]the data is: 66
[CONSUMER nc/mod:107 m: 7]the data is: 67
[CONSUMER nc/mod:108 m: 8]the data is: 68
[CONSUMER nc/mod:109 m: 9]the data is: 69
[CONSUMER nc/mod:110 m: 0]the data is: 70
[CONSUMER nc/mod:111 m: 1]the data is: 71
[CONSUMER nc/mod:112 m: 2]the data is: 72
[CONSUMER nc/mod:113 m: 3]the data is: 73
[CONSUMER nc/mod:114 m: 4]the data is: 74
[CONSUMER nc/mod:115 m: 5]the data is: 75
[CONSUMER nc/mod:116 m: 6]the data is: 76
[CONSUMER nc/mod:117 m: 7]the data is: 77
[CONSUMER nc/mod:118 m: 8]the data is: 78
[CONSUMER nc/mod:119 m: 9]the data is: 79
[CONSUMER nc/mod:120 m: 0]the data is: 80
[CONSUMER nc/mod:121 m: 1]the data is: 81
[CONSUMER nc/mod:122 m: 2]the data is: 82
[CONSUMER nc/mod:123 m: 3]the data is: 42
[CONSUMER nc/mod:124 m: 4]the data is: 43
[CONSUMER nc/mod:125 m: 5]the data is: 44
[CONSUMER nc/mod:126 m: 6]the data is: 83
[CONSUMER nc/mod:127 m: 7]the data is: 84
[CONSUMER nc/mod:128 m: 8]the data is: 45
[CONSUMER nc/mod:129 m: 9]the data is: 46
[CONSUMER nc/mod:130 m: 0]the data is: 47
[CONSUMER nc/mod:131 m: 1]the data is: 48
[CONSUMER nc/mod:132 m: 2]the data is: 49
[CONSUMER nc/mod:133 m: 3]the data is: 50
[CONSUMER nc/mod:134 m: 4]the data is: 51
[CONSUMER nc/mod:135 m: 5]the data is: 85
[CONSUMER nc/mod:136 m: 6]the data is: 86
[CONSUMER nc/mod:137 m: 7]the data is: 87
[CONSUMER nc/mod:138 m: 8]the data is: 52
[CONSUMER nc/mod:139 m: 9]the data is: 53
[CONSUMER nc/mod:140 m: 0]the data is: 54
[CONSUMER nc/mod:141 m: 1]the data is: 55
[CONSUMER nc/mod:142 m: 2]the data is: 56
[CONSUMER nc/mod:143 m: 3]the data is: 88
[CONSUMER nc/mod:144 m: 4]the data is: 89
[CONSUMER nc/mod:145 m: 5]the data is: 90
[CONSUMER nc/mod:146 m: 6]the data is: 57
[CONSUMER nc/mod:147 m: 7]the data is: 58
[CONSUMER nc/mod:148 m: 8]the data is: 59
[CONSUMER nc/mod:149 m: 9]the data is: 60
[CONSUMER nc/mod:150 m: 0]the data is: 61
[CONSUMER nc/mod:151 m: 1]the data is: 91
[CONSUMER nc/mod:152 m: 2]the data is: 92
[CONSUMER nc/mod:153 m: 3]the data is: 62
[CONSUMER nc/mod:154 m: 4]the data is: 63
[CONSUMER nc/mod:155 m: 5]the data is: 93
[CONSUMER nc/mod:156 m: 6]the data is: 94
[CONSUMER nc/mod:157 m: 7]the data is: 95
[CONSUMER nc/mod:158 m: 8]the data is: 64
[CONSUMER nc/mod:159 m: 9]the data is: 65
[CONSUMER nc/mod:160 m: 0]the data is: 66
[CONSUMER nc/mod:161 m: 1]the data is: 67
[CONSUMER nc/mod:162 m: 2]the data is: 68
[CONSUMER nc/mod:163 m: 3]the data is: 69
[CONSUMER nc/mod:164 m: 4]the data is: 96
[CONSUMER nc/mod:165 m: 5]the data is: 97
[CONSUMER nc/mod:166 m: 6]the data is: 70
[CONSUMER nc/mod:167 m: 7]the data is: 71
[CONSUMER nc/mod:168 m: 8]the data is: 72
[CONSUMER nc/mod:169 m: 9]the data is: 73
[CONSUMER nc/mod:170 m: 0]the data is: 74
[CONSUMER nc/mod:171 m: 1]the data is: 98
[CONSUMER nc/mod:172 m: 2]the data is: 99
[CONSUMER nc/mod:173 m: 3]the data is: 100
producer exiting
[CONSUMER nc/mod:174 m: 4]the data is: 75

[CONSUMER nc/mod:175 m: 5]the data is: 76
[CONSUMER nc/mod:176 m: 6]the data is: 77
[CONSUMER nc/mod:177 m: 7]the data is: 78
[CONSUMER nc/mod:178 m: 8]the data is: 79
[CONSUMER nc/mod:179 m: 9]the data is: 80
[CONSUMER nc/mod:180 m: 0]the data is: 81
[CONSUMER nc/mod:181 m: 1]the data is: 82
[CONSUMER nc/mod:182 m: 2]the data is: 83
consumer exiting
[CONSUMER nc/mod:183 m: 3]the data is: 84
[CONSUMER nc/mod:184 m: 4]the data is: 85
[CONSUMER nc/mod:185 m: 5]the data is: 86
[CONSUMER nc/mod:186 m: 6]the data is: 87
[CONSUMER nc/mod:187 m: 7]the data is: 88
[CONSUMER nc/mod:188 m: 8]the data is: 89
[CONSUMER nc/mod:189 m: 9]the data is: 90
[CONSUMER nc/mod:190 m: 0]the data is: 91
[CONSUMER nc/mod:191 m: 1]the data is: 92
[CONSUMER nc/mod:192 m: 2]the data is: 93
[CONSUMER nc/mod:193 m: 3]the data is: 94
[CONSUMER nc/mod:194 m: 4]the data is: 95
[CONSUMER nc/mod:195 m: 5]the data is: 96
[CONSUMER nc/mod:196 m: 6]the data is: 97
[CONSUMER nc/mod:197 m: 7]the data is: 98
[CONSUMER nc/mod:198 m: 8]the data is: 99
producer exiting
[CONSUMER nc/mod:199 m: 9]the data is: 100
consumer exiting


thanks in advance

Is This A Good Question/Topic? 0
  • +

Replies To: Threads, multiple producer/consumer

#2 eker676  Icon User is offline

  • Software Engineer
  • member icon

Reputation: 378
  • View blog
  • Posts: 1,833
  • Joined: 18-April 09

Re: Threads, multiple producer/consumer

Posted 26 October 2013 - 06:24 PM

I don't feel like booting into Linux right now so I can't test this but everything looks good to me.

What are you concerned about?

You have two consumers and two producers, nothing is guaranteed about the order the threads will run in, except that they will flip-flop. Therefore, it would be completely plausible to have a producer send 0-50, then block and have the other producer send 0-50, then have the first producer send 51-100, etc.

All of the output looks plausible for the scenario. (The output you've bolded could have occurred due to a situation similar to the one I described above.)

This post has been edited by eker676: 26 October 2013 - 06:30 PM

Was This Post Helpful? 0
  • +
  • -

#3 NecroWinter  Icon User is offline

  • D.I.C Regular

Reputation: 35
  • View blog
  • Posts: 318
  • Joined: 21-October 11

Re: Threads, multiple producer/consumer

Posted 26 October 2013 - 06:42 PM

the consumer needs to output everything in order. I need to have things "synchronized" without restricting parallelism
Was This Post Helpful? 0
  • +
  • -

#4 eker676  Icon User is offline

  • Software Engineer
  • member icon

Reputation: 378
  • View blog
  • Posts: 1,833
  • Joined: 18-April 09

Re: Threads, multiple producer/consumer

Posted 26 October 2013 - 06:52 PM

Ahh, okay.

for (i=0; i<=99;i++){
  pthread_mutex_lock(&lockit);
  while (flag !=READ){
    pthread_cond_wait(&signalit,&lockit);
  }

  if ((ary[nextProduce%10])==-1){
     ary[nextProduce%10]=i+1;
     nextProduce++;
  }


You don't have a synchronous data stream here. You have two distinct parallel tasks. If i was shared between threads then the output would be as you expect.

This post has been edited by eker676: 26 October 2013 - 06:53 PM

Was This Post Helpful? 0
  • +
  • -

#5 NecroWinter  Icon User is offline

  • D.I.C Regular

Reputation: 35
  • View blog
  • Posts: 318
  • Joined: 21-October 11

Re: Threads, multiple producer/consumer

Posted 26 October 2013 - 07:09 PM

Okay, so I think I may have figured it out. I exchanged "i" in my code to a new global variable known as value. Before the code was using the variable "i" from the for loop which would mess things up bad.

Is that what you were getting at? If not, then its possible I have more bugs than I realize. But as it stands right now I'm producing the correct output.
Was This Post Helpful? 0
  • +
  • -

#6 NecroWinter  Icon User is offline

  • D.I.C Regular

Reputation: 35
  • View blog
  • Posts: 318
  • Joined: 21-October 11

Re: Threads, multiple producer/consumer

Posted 26 October 2013 - 08:47 PM

So I just realized I had another error. Its not present in the old code posted in the OP, but it played a big role in why my output was correct. In short, I only allowed the loop in both functions to do half the work, so the other thread always started where the other left off. Right now, im trying to at least allow for the possibility that one thread can in fact do everything if thats how the OS has it. You'll see I commented out some for loop code, thats what im talking about. It made for some strange errors in which it would start to read "101" when 100 is the last entry.


Heres my current code, I cant seem to get the second consumer to terminate. I'm pretty new to threads in general so if I'm just flat out doing something wrong that I havent brought up please tell me.



#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h>
#include <iostream>

using namespace std;


#define READ 1

#define WRITTEN 2

//int data;
int ary[10];

int flag=READ;

int nextConsume = 0;
int nextProduce = 0;
int value = 0;

pthread_mutex_t lockit=PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t signalit=PTHREAD_COND_INITIALIZER;

void *produce(void *thread_number){
   int i;
   printf("I am the producer and my thread number is %ld \n",
   (long) thread_number);
//for (i=0; i<=99;i++)
   while (value<99){
      pthread_mutex_lock(&lockit);
   
   if (ary[9]==100){cout << "ary9" << endl; 
      flag=WRITTEN;
      pthread_cond_broadcast(&signalit);
      pthread_mutex_unlock(&lockit);
      break;
     
  }
  while (flag !=READ){
    pthread_cond_wait(&signalit,&lockit);
  }


  if ((ary[nextProduce%10])==-1){
    cout << "_____producer  " << (long) thread_number << " producing in : " << nextProduce%10 << " producing: " << value+1 << endl;
    ary[nextProduce%10]=value+1;
    nextProduce++;
    value++;
  }  
   flag=WRITTEN;
   pthread_cond_broadcast(&signalit);
   pthread_mutex_unlock(&lockit);

}/* end for*/

	cout << "producer " << (long) thread_number << " exiting " << endl;

	return(NULL);

}//end of producer

void *consume(void *thread_number){
	int i;
	printf("I am the consumer and my thread number is %ld \n",
	(long)thread_number);

	for (i=0;i<=99;i++){
		pthread_mutex_lock(&lockit);
		
		
		while (flag != WRITTEN){
		  pthread_cond_wait(&signalit,&lockit);
		}
		if (ary[9]==100){
		  flag=READ;
		  pthread_cond_broadcast(&signalit);
		  pthread_mutex_unlock(&lockit);
		  break;
		}

	if (ary[nextConsume%10]!=-1){
	cout << "[CONSUMER " << (long) thread_number << " nc/mod:" << nextConsume << " m: " << nextConsume%10 << "]the data is: " << ary[nextConsume%10] << "[/CONSUMER]" <<  endl;
	ary[nextConsume%10]=-1;
	nextConsume++;
	}

	flag=READ;

	pthread_cond_broadcast(&signalit);

	pthread_mutex_unlock(&lockit);

	}/* end for*/

	cout << "consumer " << (long) thread_number << " exiting " << endl;

	return(NULL);

} // end consumer
void errorOutput(string name, int var){
	if (var<0){
		cout << name << " (error) output: " << var << endl;
		exit(var);
	}
}
int main(){
	pthread_t p1,p2,c1,c2;
	int  iret1, iret2, iret3, iret4;
	
	for (int i=0;i<10;i++){
	  ary[i]=-1;
	  //cout << i << " = -1" << endl;
	}

	iret1 = pthread_create(&p1, NULL, produce, (void*) 0);
	errorOutput("iret1", iret1);
	iret2 = pthread_create(&p2, NULL, produce, (void*) 1);
	errorOutput("iret2", iret2);
	iret3 = pthread_create(&c1, NULL, consume, (void*) 2);
	errorOutput("iret3", iret3);
	iret4 = pthread_create(&c2, NULL, consume, (void*) 3);
	errorOutput("iret4", iret4);

   pthread_join(p1, NULL);
   pthread_join(p2, NULL);
   pthread_join(c1, NULL);
   pthread_join(c2, NULL); 

return 0;
}


Was This Post Helpful? 0
  • +
  • -

#7 eker676  Icon User is offline

  • Software Engineer
  • member icon

Reputation: 378
  • View blog
  • Posts: 1,833
  • Joined: 18-April 09

Re: Threads, multiple producer/consumer

Posted 26 October 2013 - 09:50 PM

Hmm, the second consumer thread is deadlocking on the wait condition. If you change the consumer loop to use the producer style loop (while ( value < 99 )), all the threads will terminate but you may not consume the last value.

You might need some special exit condition handling so that only one consumer handles the last value. (i.e. Break out of the loop, double check the buffer for the last value and terminate.)
Was This Post Helpful? 0
  • +
  • -

Page 1 of 1