Table of contents
1.
Introduction
2.
What is the Producer-Consumer Problem?
3.
Solution using Producer and Consumer Threads and Issue with Synchronization
4.
Introducing Synchronization in the Message Queue Class
5.
Producer with Multiple Consumers
6.
Solution using Java Concurrency's BlockingQueue Class
7.
Frequently Asked Questions
7.1.
What happens if multiple producers try to add messages to a full queue simultaneously?
7.2.
Can the Producer-Consumer problem be solved without using synchronization?
7.3.
What are the advantages of using BlockingQueue over the synchronized approach?
8.
Conclusion
Last Updated: Nov 16, 2024
Easy

Producer Consumer Problem in Java

Author Riya Singh
0 upvote
Career growth poll
Do you think IIT Guwahati certified course can help you in your career?

Introduction

In programming, there are various challenges that developers face when working with multiple threads. One of the most common problems is the Producer-Consumer problem, which occurs when there is a need to coordinate the production and consumption of data between threads. In Java, this problem can be solved using different approaches, like using synchronized methods or the BlockingQueue class provided by the Java Concurrency package. 

Producer Consumer Problem in Java

In this article, we will discuss the Producer-Consumer problem in detail, understand its challenges, and learn how to implement effective solutions using Java.

What is the Producer-Consumer Problem?

The Producer-Consumer problem is a classic synchronization problem in concurrent programming. It involves two types of threads: producers & consumers. The producers generate data & store it in a shared buffer, while the consumers retrieve the data from the buffer & process it. The main challenge lies in coordinating the actions of the producers & consumers to ensure that the following conditions are met:

1. Producers should not attempt to add data to the buffer if it is already full.
 

2. Consumers should not attempt to retrieve data from the buffer if it is empty.
 

3. Multiple producers & consumers should be able to work concurrently without causing data inconsistency or corruption.


Let's consider a simple example to explain the problem more clearly. Imagine a messaging application where users can send and receive messages. The producers, in this case, are the users sending messages, and the consumers are the users receiving those messages. The shared buffer is the message queue that stores the messages temporarily until they are consumed by the recipients. The challenge is to ensure that the message queue is accessed and modified in a thread-safe manner, preventing any data loss or inconsistency.

Solution using Producer and Consumer Threads and Issue with Synchronization

Let's start by implementing a basic solution to the Producer-Consumer problem using producer and consumer threads. We'll create a simple Message class to represent the data being produced & consumed:

class Message {
    private String content;

    public Message(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }
}


Next, we'll create a MessageQueue class that will serve as the shared buffer between the producers & consumers:

class MessageQueue {
    private Queue<Message> queue = new LinkedList<>();
    public void addMessage(Message message) {
        queue.add(message);
    }
    public Message removeMessage() {
        return queue.remove();
    }
}


Now, let's implement the producer & consumer threads:

class Producer implements Runnable {
    private MessageQueue messageQueue;


    public Producer(MessageQueue messageQueue) {
        this.messageQueue = messageQueue;
    }

   @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            Message message = new Message("Message " + i);
            messageQueue.addMessage(message);
            System.out.println("Produced: " + message.getContent());
        }
    }
}
class Consumer implements Runnable {
    private MessageQueue messageQueue;

    public Consumer(MessageQueue messageQueue) {
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            Message message = messageQueue.removeMessage();
            System.out.println("Consumed: " + message.getContent());
        }
    }
}


However, this solution has a synchronization issue. If multiple producers and consumers access the message queue concurrently, it can lead to race conditions and inconsistent behavior. For example, if a consumer tries to remove a message from an empty queue, it will throw a NoSuchElementException.

Introducing Synchronization in the Message Queue Class

To address the synchronization issue & ensure thread safety, we can modify the MessageQueue class to include synchronization mechanisms. We'll use the synchronized keyword to control access to the shared message queue.

Let’s see the updated MessageQueue class with synchronization:

class MessageQueue {
    private Queue<Message> queue = new LinkedList<>();
    private int capacity;
    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }
    public synchronized void addMessage(Message message) throws InterruptedException {
        while (queue.size() == capacity) {
            wait();
        }
        queue.add(message);
        notify();
    }

    public synchronized Message removeMessage() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        Message message = queue.remove();
        notify();
        return message;
    }
}


Let's discuss the changes we made:

1. We added a capacity variable to specify the maximum number of messages the queue can hold.
 

2. The addMessage() method is now synchronized. It checks if the queue is full using a while loop. If the queue is full, the producer thread waits by calling wait(). When notified, it adds the message to the queue and notifies any waiting consumer threads using notify().
 

3. The removeMessage() method is also synchronized. It checks if the queue is empty using a while loop. If the queue is empty, the consumer thread waits by calling wait(). When notified, it removes and returns the message from the queue and notifies any waiting producer threads using notify().
 

With these changes, producers and consumers can safely access the shared message queue without causing race conditions or inconsistencies. The wait() and notify() methods coordinate the actions of the threads and ensure that producers wait when the queue is full and consumers wait when the queue is empty.

Now, let's update the main class to use the synchronized MessageQueue:

public class Main {
    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(5);
        Thread producerThread = new Thread(new Producer(messageQueue));
        Thread consumerThread = new Thread(new Consumer(messageQueue));

        producerThread.start();
        consumerThread.start();
    }
}


In this example, we create a MessageQueue with a capacity of 5. We then create and start producer and consumer threads. The producer thread will produce messages and add them to the queue, while the consumer thread will consume messages from the queue. The synchronization mechanisms in the MessageQueue class ensure that the threads coordinate their actions and avoid conflicts.

Producer with Multiple Consumers

We have seen an example with a single producer and a single consumer until now. However, in real-world situations, it's common to have multiple consumers processing messages from a single producer. Let's modify our code to handle multiple consumers.

First, let's update the main class to create multiple consumer threads:

public class Main {
    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(5);

        Thread producerThread = new Thread(new Producer(messageQueue));
        Thread consumerThread1 = new Thread(new Consumer(messageQueue), "Consumer 1");
        Thread consumerThread2 = new Thread(new Consumer(messageQueue), "Consumer 2");
        Thread consumerThread3 = new Thread(new Consumer(messageQueue), "Consumer 3");

        producerThread.start();
        consumerThread1.start();
        consumerThread2.start();
        consumerThread3.start();
    }
}


In this updated code, we create three consumer threads, each with a unique name. All the consumer threads share the same MessageQueue instance.

Next, let's modify the Consumer class to handle multiple consumers:

class Consumer implements Runnable {
    private MessageQueue messageQueue;
    public Consumer(MessageQueue messageQueue) {
        this.messageQueue = messageQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Message message = messageQueue.removeMessage();
                System.out.println(Thread.currentThread().getName() + " consumed: " + message.getContent());
                Thread.sleep(1000); // Simulate message processing time
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


In the updated Consumer class, we use a while loop to continuously consume messages from the message queue. We retrieve the current thread's name using Thread.currentThread().getName() & include it in the output to identify which consumer thread consumed each message.

We also introduce a short delay of 1 second using Thread.sleep(1000) to simulate message processing time. This delay allows us to observe the behavior of multiple consumers more easily.

When you run the modified code, you will see an output like this:

Produced: Message 1
Consumer 1 consumed: Message 1
Produced: Message 2
Consumer 2 consumed: Message 2
Produced: Message 3
Consumer 3 consumed: Message 3
Produced: Message 4
Consumer 1 consumed: Message 4
Produced: Message 5
Consumer 2 consumed: Message 5


As you can see, messages are produced by the single producer thread and consumed by the multiple consumer threads in a round-robin fashion. The synchronization mechanisms in the MessageQueue class ensure that the consumers coordinate their access to the shared message queue and avoid conflicts.

Solution using Java Concurrency's BlockingQueue Class

While the synchronized approach we discussed earlier works well for the Producer-Consumer problem, Java provides a more convenient and efficient solution using the BlockingQueue class from java.util.concurrent package.

The BlockingQueue interface represents a queue that supports operations that wait for the queue to become non-empty when retrieving an element and wait for space to become available in the queue when storing an element. It provides built-in methods for adding and removing elements, which automatically handle the synchronization and coordination between producers and consumers.

Let's see how we can use BlockingQueue to solve the Producer-Consumer problem:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class Producer implements Runnable {
    private BlockingQueue<Message> queue;
    public Producer(BlockingQueue<Message> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            Message message = new Message("Message " + i);
            try {
                queue.put(message);
                System.out.println("Produced: " + message.getContent());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
class Consumer implements Runnable {
    private BlockingQueue<Message> queue;
    public Consumer(BlockingQueue<Message> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        while (true) {
            try {
                Message message = queue.take();
                System.out.println(Thread.currentThread().getName() + " consumed: " + message.getContent());
                Thread.sleep(1000); // Simulate message processing time
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Message> queue = new LinkedBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread1 = new Thread(new Consumer(queue), "Consumer 1");
        Thread consumerThread2 = new Thread(new Consumer(queue), "Consumer 2");
        Thread consumerThread3 = new Thread(new Consumer(queue), "Consumer 3");
        producerThread.start();
        consumerThread1.start();
        consumerThread2.start();
        consumerThread3.start();
    }
}
You can also try this code with Online Java Compiler
Run Code


Output

Produced: Message 1
Consumer 1 consumed: Message 1
Produced: Message 2
Consumer 2 consumed: Message 2
Produced: Message 3
Consumer 3 consumed: Message 3
Produced: Message 4
Consumer 1 consumed: Message 4
Produced: Message 5
Consumer 2 consumed: Message 5


In this modified code:

1. We import the BlockingQueue interface & the LinkedBlockingQueue class, which is an implementation of BlockingQueue.
 

2. In the Producer class, we use the put() method to add messages to the queue. The put() method automatically handles the synchronization & blocks the producer if the queue is full.
 

3. In the Consumer class, we use the take() method to retrieve messages from the queue. The take() method automatically handles the synchronization & blocks the consumer if the queue is empty.
 

4. In the main method, we create a LinkedBlockingQueue with a capacity of 5 and pass it to the producer and consumer threads.


Note: The BlockingQueue simplifies the synchronization process and provides a more concise and readable solution than the synchronized approach. It handles the coordination between producers and consumers internally, making the code easier to understand and maintain.

Frequently Asked Questions

What happens if multiple producers try to add messages to a full queue simultaneously?

The BlockingQueue automatically handles the synchronization. If the queue is full, the producers will be blocked until space becomes available in the queue.

Can the Producer-Consumer problem be solved without using synchronization?

No, synchronization is necessary to ensure thread safety & avoid race conditions when multiple threads access the shared queue concurrently.

What are the advantages of using BlockingQueue over the synchronized approach?

BlockingQueue provides a more convenient and efficient solution by encapsulating the synchronization logic internally. It offers methods like put() and take() that handle the coordination between producers and consumers, resulting in more concise and readable code.

Conclusion

In this article, we discussed the Producer-Consumer problem in Java and discussed various solutions to address it. We started with a basic approach using synchronized methods and then introduced the BlockingQueue class from the Java Concurrency package. BlockingQueue simplifies the synchronization process and provides a more efficient and maintainable solution. 

You can also check out our other blogs on Code360.

Live masterclass