Part 5: Complete Implementation
This section walks through the key parts of the producer-consumer implementation, explaining the design decisions and important patterns. You'll understand how all the concepts come together in working code.
π Full Source
Code: The complete, runnable implementation is in the code folder. This
page
explains the key highlightsβrefer to the source files for the complete picture.
5.1 Problem Requirements
What We're Building
We're implementing a commodity price monitoring system with the following components:
- Multiple producer processes β Each simulates a commodity price feed (GOLD, SILVER, etc.) by generating random prices following a normal distribution
- One consumer process β Displays a real-time dashboard showing current prices, moving averages, and trend indicators
- Shared memory β The communication channel where producers write and the consumer reads
- Three semaphores β Coordinate access to prevent race conditions and buffer overflow/underflow
System architecture: multiple producers write to shared buffer, consumer reads and displays
Supported Commodities:
ALUMINIUM, COPPER, COTTON, CRUDEOIL, GOLD, LEAD, MENTHAOIL, NATURALGAS, NICKEL, SILVER, ZINC
Command Line Interface:
# Consumer (must start first - creates IPC resources)
./consumer <BUFFER_SIZE>
./consumer 8 # Create buffer with 8 slots
# Producer (connects to existing resources)
./producer <NAME> <MEAN> <STDDEV> <SLEEP_MS> <BUFFER_SIZE>
./producer GOLD 2000.0 5.0 500 8 # Generate GOLD prices
# Mean: $2000, StdDev: $5
# Every 500ms, buffer size 8
Why Consumer Creates Resources:
The consumer is the resource ownerβit creates the shared memory and semaphores on startup and removes them on shutdown. This design ensures:
- Clear ownership: one process responsible for lifecycle
- Clean startup: consumer initializes everything before producers connect
- Clean shutdown: Ctrl+C in consumer cleans up all IPC resources
5.2 Data Structures
Design Overview
The shared memory layout uses a header + array pattern:
- Header: Contains the producer and consumer indices (for circular buffer)
- Buffer Array: Holds the actual commodity data
#define MAX_COMMODITY_NAME 11 // Max 10 chars + null terminator
#define SHARED_MEMORY_KEY 777 // Key for shmget()
#define SEMAPHORE_KEY 555 // Key for semget()
// A single item in the circular buffer
struct buffer_item {
char commodity_name[MAX_COMMODITY_NAME]; // e.g., "GOLD"
double commodity_price; // e.g., 1987.25
};
// Header stored at the start of shared memory
struct shared_memory_data {
int producer_index; // Next write position (0 to buffer_size-1)
int consumer_index; // Next read position (0 to buffer_size-1)
};
// For semctl() operations
union semaphore_union {
int val; // Value to set
struct semid_ds *buf; // Status buffer
unsigned short *array; // Array operations
};
Memory Layout Visualization:
The shared memory region contains a header followed by the circular buffer array:
5.3 Consumer Implementation (Highlights)
The consumer is more complex than the producer because it:
- Creates and initializes IPC resources
- Handles signals for graceful cleanup
- Maintains state for dashboard display (moving averages, etc.)
Note: The full source code is available in the code folder.
5.3.1 Signal Handler for Cleanup
IPC resources persist in the kernel even after your program exits. Without proper cleanup, you'd leave orphaned shared memory and semaphores. Signal handlers ensure cleanup happens on Ctrl+C:
// Flag checked by main loop - must be sig_atomic_t for signal safety
static volatile sig_atomic_t g_running = 1;
// Global IDs needed for cleanup function
static int g_shared_memory_id = -1;
static int g_semaphore_id = -1;
static void* g_shared_memory_address = (void*)-1;
// Signal handler - called when Ctrl+C is pressed
void signal_handler(int signum) {
(void)signum; // Suppress unused parameter warning
g_running = 0; // Signal main loop to exit
}
// Cleanup function - removes IPC resources
void cleanup_resources(void) {
// Detach shared memory
if (g_shared_memory_address != (void*)-1) {
shmdt(g_shared_memory_address);
}
// Remove shared memory segment
if (g_shared_memory_id != -1) {
shmctl(g_shared_memory_id, IPC_RMID, NULL);
}
// Remove semaphore set
if (g_semaphore_id != -1) {
semctl(g_semaphore_id, 0, IPC_RMID);
}
}
5.3.2 Creating Shared Memory
The consumer creates the shared memory segment and initializes the header:
// Calculate total size needed
int size = sizeof(shared_memory_data) + buffer_size * sizeof(buffer_item);
// Create shared memory segment
g_shared_memory_id = shmget(SHARED_MEMORY_KEY, size, 0666 | IPC_CREAT);
if (g_shared_memory_id == -1) {
fprintf(stderr, "shmget failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
// Attach to the segment
g_shared_memory_address = shmat(g_shared_memory_id, NULL, 0);
if (g_shared_memory_address == (void*)-1) {
fprintf(stderr, "shmat failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
// Initialize the header
shared_memory_data* header = (shared_memory_data*)g_shared_memory_address;
header->producer_index = 0;
header->consumer_index = 0;
5.3.3 Creating and Initializing Semaphores
// Create semaphore set with 3 semaphores
g_semaphore_id = semget(SEMAPHORE_KEY, 3, 0666 | IPC_CREAT);
if (g_semaphore_id == -1) {
fprintf(stderr, "semget failed: %s\n", strerror(errno));
cleanup_resources();
exit(EXIT_FAILURE);
}
// Initialize semaphore values
union semaphore_union arg;
// Semaphore 0: mutex = 1 (unlocked)
arg.val = 1;
semctl(g_semaphore_id, 0, SETVAL, arg);
// Semaphore 1: empty_slots = buffer_size (all slots empty)
arg.val = buffer_size;
semctl(g_semaphore_id, 1, SETVAL, arg);
// Semaphore 2: available_products = 0 (no items yet)
arg.val = 0;
semctl(g_semaphore_id, 2, SETVAL, arg);
5.3.4 Main Consumer Loop
The consumer loop waits for items, reads them, and updates the dashboard:
// Semaphore indices for clarity
const int mutual_exclusion = 0; // mutex
const int empty_slots = 1; // empty
const int available_products = 2; // full
while (g_running) {
// Step 1: Wait for a product (P(full))
if (my_semaphore_operation(g_semaphore_id, available_products, -1) == -1) {
if (!g_running) break; // Clean exit on Ctrl+C
fprintf(stderr, "[Consumer] Error waiting for product\n");
break;
}
// Step 2: Enter critical section (P(mutex))
if (my_semaphore_operation(g_semaphore_id, mutual_exclusion, -1) == -1) {
// CRITICAL: Restore semaphore state before exiting
my_semaphore_operation(g_semaphore_id, available_products, +1);
if (!g_running) break;
fprintf(stderr, "[Consumer] Error acquiring mutex\n");
break;
}
// Step 3: Read from buffer (inside critical section)
int idx = shared_data->consumer_index;
struct buffer_item item = buffer_array[idx];
shared_data->consumer_index = (idx + 1) % buffer_size;
// Step 4: Exit critical section (V(mutex))
my_semaphore_operation(g_semaphore_id, mutual_exclusion, +1);
// Step 5: Signal that a slot is now empty (V(empty))
my_semaphore_operation(g_semaphore_id, empty_slots, +1);
// Step 6: Process the item (update dashboard - outside critical section)
update_dashboard(item);
}
β οΈ Critical: Semaphore State Rollback
Notice the pattern: if acquiring the mutex fails after we've already decremented
available_products, we must restore it by incrementing it back. This prevents
semaphore
state corruption when the program is interrupted between the two operations.
Without this rollback, the semaphore would be permanently decremented, causing future runs to deadlock.
5.4 Producer Implementation (Highlights)
The producer is simpler than the consumer because it only connects to existing IPC resourcesβit never creates or destroys them. This keeps cleanup responsibility clear.
Note: The full source code is available in the code folder.
5.4.1 Connecting to Existing Resources
Notice the absence of IPC_CREAT flagβproducers expect resources to already exist:
// Get existing shared memory (OMIT IPC_CREAT = attach only, don't create)
int size = sizeof(shared_memory_data) + buffer_size * sizeof(buffer_item);
int shmid = shmget(SHARED_MEMORY_KEY, size, 0);
if (shmid == -1) {
fprintf(stderr, "shmget failed: %s\n", strerror(errno));
fprintf(stderr, "Is the consumer running?\n");
exit(EXIT_FAILURE);
}
// Get existing semaphores (again, no IPC_CREAT)
int semid = semget(SEMAPHORE_KEY, 3, 0);
if (semid == -1) {
fprintf(stderr, "semget failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
// Attach to shared memory
void* shm_addr = shmat(shmid, NULL, 0);
if (shm_addr == (void*)-1) {
fprintf(stderr, "shmat failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
π‘ Why Check "Is the consumer running?"
If shmget() fails with errno = ENOENT, the shared memory segment
doesn't
exist. The most likely cause: the consumer hasn't been started yet. This user-friendly error
message
helps with debugging.
5.4.2 Main Producer Loop Overview
Each iteration: generate price β wait for space β write to buffer β signal consumer:
while (g_running) {
// Step 1: Generate random price using normal distribution
double price = price_distribution(random_generator);
if (price < 0.01) price = 0.01; // Floor at 1 cent
// Step 2: Wait for empty slot (P(empty))
if (my_semaphore_operation(semid, empty_slots, -1) == -1) {
if (!g_running) break; // Clean exit on Ctrl+C
break;
}
// Step 3: Enter critical section (P(mutex))
if (my_semaphore_operation(semid, mutual_exclusion, -1) == -1) {
// CRITICAL: Restore semaphore state before exiting
my_semaphore_operation(semid, empty_slots, +1);
if (!g_running) break;
break;
}
// Step 4: Write to buffer (inside critical section)
int idx = shared_data->producer_index;
strncpy(buffer_array[idx].commodity_name, name, MAX_COMMODITY_NAME - 1);
buffer_array[idx].commodity_name[MAX_COMMODITY_NAME - 1] = '\0';
buffer_array[idx].commodity_price = price;
shared_data->producer_index = (idx + 1) % buffer_size;
// Step 5: Exit critical section (V(mutex))
my_semaphore_operation(semid, mutual_exclusion, +1);
// Step 6: Signal product available (V(full))
my_semaphore_operation(semid, available_products, +1);
// Step 7: Interruptible sleep (responsive to Ctrl+C)
interruptible_sleep(sleep_ms);
}
5.4.3 Producer Loop β Step-by-Step Walkthrough
Complete workflow showing producer operations (left), shared buffer state (center), and consumer operations (right)
Let's trace through exactly what happens in one complete iteration:
Step 1: Generate a Random Price
double generated_price = price_distribution(random_generator);
C++11's <random> library uses a three-stage pipeline:
- Seed source (
std::random_device) β Provides true randomness from hardware entropy - Engine (
std::mt19937) β A high-quality pseudo-random number generator with period 219937-1 - Distribution (
std::normal_distribution) β Shapes raw integers into the desired statistical distribution
This generates values from a normal (Gaussian) distribution. Most prices cluster around the mean, with occasional extremes. This simulates real market price fluctuations.
Step 2: Enforce Price Floor
if (generated_price < 0.01) generated_price = 0.01;
Normal distributions can produce negative values (e.g., if stddev is large relative to mean). Since commodity prices cannot be negative, we clamp to a 1-cent minimum.
π Better Approach: Use std::max(0.01, generated_price) for
idiomatic
C++ that clearly expresses the intent of establishing a floor value.
Step 3: Wait for Empty Slot β P(empty)
my_semaphore_operation(semaphore_id, empty_slots, -1);
Why this is FIRST: This is critical. If the buffer is full
(empty_slots = 0), the producer blocks here until a consumer frees a slot.
Deadlock prevention: If we acquired the mutex first and then waited for space, we'd be holding the lock while blockedβno consumer could enter to free a slot β deadlock!
Step 4: Acquire Mutex β P(mutex)
my_semaphore_operation(semaphore_id, mutual_exclusion, -1);
Now that we know space exists, we acquire exclusive access. This prevents race conditions where two producers write to the same slot simultaneously. Critical section begins.
Step 5: Write to Buffer
int current_index = shared_data->producer_index;
strncpy(buffer_array[current_index].commodity_name, name, MAX_COMMODITY_NAME - 1);
buffer_array[current_index].commodity_name[MAX_COMMODITY_NAME - 1] = '\0';
buffer_array[current_index].commodity_price = generated_price;
Copy the commodity name (with explicit null termination for safety) and the price into the current slot.
Step 6: Advance Index (Circular Wrap)
shared_data->producer_index = (shared_data->producer_index + 1) % buffer_size;
The modulo operator implements wrap-around: index 0 β 1 β 2 β ... β 7 β 0 β 1 ... (for buffer_size=8). This is the "circular" in circular buffer.
Step 7: Release Mutex β V(mutex)
my_semaphore_operation(semaphore_id, mutual_exclusion, +1);
Increment mutex (unlock). Other processes can now enter the critical section. Critical section ends.
Step 8: Signal Product Available β V(full)
my_semaphore_operation(semaphore_id, available_products, +1);
Tell the consumer: "One more item is ready." If the consumer was blocked waiting for items, this wakes it up.
Step 9: Interruptible Sleep
// Sleep in small chunks (10ms each) for responsive signal handling
int chunks = sleep_ms / 10;
for (int i = 0; i < chunks && g_running; i++) {
usleep(10000); // 10ms = 10,000 microseconds
}
if ((sleep_ms % 10) > 0 && g_running) {
usleep((sleep_ms % 10) * 1000);
}
Why chunk the sleep? Signals (like Ctrl+C) only interrupt the current system call. A
single usleep(500000) would delay shutdown by up to 500ms. Breaking into 10ms chunks
lets
us check g_running frequently for responsive shutdown.
π Key Insights
- Order matters:
P(empty)beforeP(mutex)prevents deadlock - Circular buffer: Modulo arithmetic wraps the index automatically
- Independent iterations: Each loop generates a fresh random price
- Semaphore roles: empty/full count resources; mutex guards the critical section
- Rollback pattern: Restore first semaphore if second fails (signal-safe cleanup)
- Chunked sleep: Responsive shutdown without sacrificing timing accuracy
π Full Source: See code/producer.cpp for the complete implementation
with
all error handling.
5.5 Makefile
A Makefile automates the build process. Instead of typing long compiler commands,
you
just run make:
# Compiler and flags
CXX = g++
CXXFLAGS = -Wall -Wextra -Wpedantic -O2 -std=c++11
LDFLAGS = -pthread
# Targets
all: producer consumer
producer: producer.cpp
$(CXX) $(CXXFLAGS) $(LDFLAGS) producer.cpp -o producer
consumer: consumer.cpp
$(CXX) $(CXXFLAGS) $(LDFLAGS) consumer.cpp -o consumer
clean:
rm -f producer consumer
.PHONY: all clean
Understanding .PHONY
The .PHONY directive declares targets that don't produce output files.
In
our Makefile:
alldoesn't create a file named "all" β it builds producer and consumercleandoesn't create a file named "clean" β it removes binaries
Without .PHONY, if a file named clean existed in the directory,
make clean would say "clean is up to date" and do nothing. The .PHONY
directive tells Make to always run these targets regardless of whether files with those names exist.
What Each Flag Does:
| Flag | Purpose |
|---|---|
-Wall |
Enable all common warnings |
-Wextra |
Enable extra warnings beyond -Wall |
-Wpedantic |
Warn about non-standard code |
-O2 |
Optimization level 2 (good balance of speed and compile time) |
-std=c++11 |
Use C++11 standard (needed for <random>) |
-pthread |
Link pthread library (some systems need this for signal handling) |
Usage:
cd code/ # Navigate to code directory
make # Build both producer and consumer
make clean # Remove compiled binaries
make producer # Build only producer
make consumer # Build only consumer
5.6 Running the System
Complete Walkthrough:
# Terminal 1: Build and start consumer (creates IPC resources)
cd code/
make
./consumer 8 # Create buffer with 8 slots
# Terminal 2: Start first producer
./producer GOLD 2000.0 5.0 500 8
# ^ ^ ^ ^ ^
# | | | | βββ buffer_size (must match consumer)
# | | | βββββββ sleep between prices (500ms)
# | | βββββββββββ standard deviation ($5)
# | ββββββββββββββββββ mean price ($2000)
# βββββββββββββββββββββββ commodity name
# Terminal 3: Start second producer
./producer SILVER 25.0 0.5 300 8
# Terminal 4: Start third producer
./producer CRUDEOIL 75.0 2.0 400 8
# When done: Press Ctrl+C in consumer terminal
# Consumer cleans up all IPC resources
What You'll See:
The consumer displays a real-time dashboard showing:
- Current price for each commodity
- Moving average (smoothed trend)
- Price trend indicator (β up, β down, β stable)
- Update timestamps
Cleanup Verification:
After pressing Ctrl+C in the consumer, verify IPC resources were removed:
# Check for leftover resources (should show nothing with keys 777/555)
ipcs -m | grep 777 # Shared memory
ipcs -s | grep 555 # Semaphores
# If resources remain (from a crash), remove manually:
ipcrm -M 777 # Remove shared memory by key
ipcrm -S 555 # Remove semaphores by key
β οΈ Buffer Size Must Match!
All producers and the consumer must use the same buffer size. If they don't match, the shared memory layout won't align correctlyβproducers will write to wrong offsets, causing garbage data or crashes.