An example implementation could look like that:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SynchronizedProducer { | |
private Queue<String> queue; | |
public SynchronizedProducer(Queue queue) { | |
this.queue = queue; | |
} | |
public void produce() throws InterruptedException { | |
while (true) { | |
putOnQueue(UUID.randomUUID().toString()); | |
Thread.sleep(1); | |
} | |
} | |
private void putOnQueue(String message) { | |
synchronized (queue) { | |
queue.add(message); | |
} | |
} | |
} |
Let's take a look at a consumer:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SynchronizedConsumer { | |
private Queue queue; | |
public SynchronizedConsumer(Queue queue) { | |
this.queue = queue; | |
} | |
public void consume() throws InterruptedException { | |
while (true) { | |
takeFromQueue(); | |
Thread.sleep(1); | |
} | |
} | |
private void takeFromQueue() { | |
synchronized (queue) { | |
if (!queue.isEmpty()) { | |
System.out.println(queue.remove()); | |
} | |
} | |
} | |
} |
Class that glues them together:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SynchronizedProducerConsumerApp { | |
public static void main(String[] args) throws InterruptedException { | |
Queue<String> queue = new LinkedList<String>(); | |
final SynchronizedConsumer synchronizedConsumer = new SynchronizedConsumer(queue); | |
final SynchronizedProducer synchronizedProducer = new SynchronizedProducer(queue); | |
Executors.newSingleThreadExecutor().submit(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
synchronizedProducer.produce(); | |
} catch (InterruptedException e) { | |
} | |
} | |
}); | |
Executors.newSingleThreadExecutor().submit(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
synchronizedConsumer.consume(); | |
} catch (InterruptedException e) { | |
} | |
} | |
}); | |
} | |
} |
There are at least 3 major drawbacks with this implementation.
First of all we're acquiring monitor of the queue, so we're completely blocking access to it not allowing for a concurrent access.
Secondly we need to check if the queue is not empty before taking from it.
Last but not least, consumer sleeps some specified time (sleeping when you're supposed to be working is not the best thing to do) and our performance decreases.
The remedy for above problems is BlockingQueue.
The new producer is similar to previous one, except that it operates on BlockingQueue and doesn't use any synchronization on it, because it's thread safe.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BlockingQueueProducer { | |
private BlockingQueue<String> queue; | |
public BlockingQueueProducer(BlockingQueue queue) { | |
this.queue = queue; | |
} | |
public void produce() throws InterruptedException { | |
while (true) { | |
putOnQueue(UUID.randomUUID().toString()); | |
Thread.sleep(1); | |
} | |
} | |
private void putOnQueue(String message) { | |
queue.add(message); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BlockingQueueConsumer { | |
private BlockingQueue queue; | |
public BlockingQueueConsumer(BlockingQueue queue) { | |
this.queue = queue; | |
} | |
public void consume() throws InterruptedException { | |
while (true) { | |
takeFromQueue(); | |
} | |
} | |
private void takeFromQueue() throws InterruptedException { | |
System.out.println(queue.take()); | |
} | |
} |
Class to run it:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class BlockingQueueConsumerProducerApp { | |
public static void main(String[] args) { | |
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); | |
final BlockingQueueConsumer consumer = new BlockingQueueConsumer(queue); | |
final BlockingQueueProducer producer = new BlockingQueueProducer(queue); | |
Executors.newSingleThreadExecutor().submit(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
producer.produce(); | |
} catch (InterruptedException e) { | |
} | |
} | |
}); | |
Executors.newSingleThreadExecutor().submit(new Runnable() { | |
@Override | |
public void run() { | |
try { | |
consumer.consume(); | |
} catch (InterruptedException e) { | |
} | |
} | |
}); | |
} | |
} |
There are other implementations of BlockingQueue that may be helpful for our needs (e.g. that introduces priority to elements or some delay that is needed before taking).
No comments:
Post a Comment