-
Notifications
You must be signed in to change notification settings - Fork 0
/
prodconsmultiple.cpp
157 lines (131 loc) · 5.08 KB
/
prodconsmultiple.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include <mpi.h>
#include <iostream>
#include <thread>
#include <random>
#include <chrono>
using namespace std;
using namespace std::this_thread ;
using namespace std::chrono ;
const int id_buffer = 4,
num_consumidores = 5,
num_productores = 4,
num_procesos = 10,
num_items = 20,
tam_vector = 10,
tag_prod = 0,
tag_cons = 1,
prod_ind = num_items / num_productores,
cons_ind = num_items / num_consumidores;
template< int min, int max > int aleatorio()
{
static default_random_engine generador( (random_device())() );
static uniform_int_distribution<int> distribucion_uniforme( min, max ) ;
return distribucion_uniforme( generador );
}
int producir(int id_prod)
{
static int contador = 0 ;
int valor = id_prod * prod_ind +contador;
sleep_for( milliseconds( aleatorio<10,100>()) );
contador++ ;
cout << "Productor " << id_prod << " ha producido valor " << valor << endl << flush;
return valor ;
}
void funcion_productor(int id_prod)
{
for ( unsigned int i= 0 ; i < prod_ind ; i++ )
{
// producir valor
int valor_prod = producir(id_prod);
// enviar valor
cout << "Productor " << id_prod <<" va a enviar valor " << valor_prod << endl << flush;
MPI_Send( &valor_prod, 1, MPI_INT, id_buffer, tag_prod, MPI_COMM_WORLD );
}
}
///////////////////////////////////////////////////////////////////////////////
void funcion_buffer()
{
int buffer[tam_vector], // buffer con celdas ocupadas y vacías
valor, // valor recibido o enviado
primera_libre = 0, // índice de primera celda libre
primera_ocupada = 0, // índice de primera celda ocupada
num_celdas_ocupadas = 0, // número de celdas ocupadas
tag_emisor_aceptable ; // identificador de emisor aceptable
MPI_Status estado ; // metadatos del mensaje recibido
for( unsigned int i=0 ; i < num_items*2 ; i++ )
{
// 1. determinar si puede enviar solo prod., solo cons, o todos
if ( num_celdas_ocupadas == 0 ) // si buffer vacío
tag_emisor_aceptable = tag_prod; // $~~~$ solo prod.
else if ( num_celdas_ocupadas == tam_vector ) // si buffer lleno
tag_emisor_aceptable = tag_cons; // $~~~$ solo cons.
else // si no vacío ni lleno
tag_emisor_aceptable = MPI_ANY_TAG; // $~~~$ cualquiera
// 2. recibir un mensaje del emisor o emisores aceptables
MPI_Recv( &valor, 1, MPI_INT, MPI_ANY_SOURCE, tag_emisor_aceptable, MPI_COMM_WORLD, &estado );
// 3. procesar el mensaje recibido
switch( estado.MPI_TAG) // leer emisor del mensaje en metadatos
{
case tag_prod: // si ha sido el productor: insertar en buffer
buffer[primera_libre] = valor ;
primera_libre = (primera_libre+1) % tam_vector ;
num_celdas_ocupadas++ ;
cout << "Buffer ha recibido valor " << valor << endl ;
break;
case tag_cons: // si ha sido el consumidor: extraer y enviarle
int id_cons = estado.MPI_SOURCE;
valor = buffer[primera_ocupada] ;
primera_ocupada = (primera_ocupada+1) % tam_vector ;
num_celdas_ocupadas-- ;
cout << "Buffer va a enviar valor " << valor << endl ;
MPI_Send( &valor, 1, MPI_INT, id_cons,0, MPI_COMM_WORLD);
break;
}
}
}
///////////////////////////////////////////////////////////////////////////////
void consumir( int valor_cons, int id_cons)
{
// espera bloqueada
sleep_for( milliseconds( aleatorio<110,200>()) );
cout << "Consumidor " << id_cons << " ha consumido valor " << valor_cons << endl << flush ;
}
void funcion_consumidor(int id_cons)
{
int peticion,
valor_rec = 1 ;
MPI_Status estado ;
for( unsigned int i=0 ; i < cons_ind; i++ )
{
MPI_Send( &peticion, 1, MPI_INT, id_buffer, tag_cons, MPI_COMM_WORLD);
MPI_Recv ( &valor_rec, 1, MPI_INT, id_buffer, 0, MPI_COMM_WORLD,&estado );
cout << "Consumidor " << id_cons << " ha recibido valor " << valor_rec << endl << flush ;
consumir( valor_rec, id_cons);
}
}
int main(int argc, char *argv[]){
int id_actual, num_proceso_actual;
MPI_Init( &argc, &argv );
MPI_Comm_rank( MPI_COMM_WORLD, &id_actual );
MPI_Comm_size( MPI_COMM_WORLD, &num_proceso_actual );
if (num_proceso_actual == num_procesos){
if (id_actual < num_productores){
funcion_productor(id_actual);
}
else if (id_actual == num_productores){
funcion_buffer();
}
else{
funcion_consumidor(id_actual);
}
}
else{
if (id_actual == 0){
cout << "Número de procesos introducidos: " << num_proceso_actual << endl;
cout << "Número de procesos esperado: " << num_procesos << endl;
cout << "(programa abortado)" << endl;
}
}
MPI_Finalize();
return(0);
}