Java使用BlockingQueue实现生产者消费者模式


生产者和消费者模式通过执行工作的分离解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。 是一种非常好的解耦开发模式。

Java实现的经典的方法是使用wait和notify方法来协调生产者消费者的同步合作,实现生产者消费者模式最方便的方法是使用juc中的阻塞队列。juc中的阻塞队列(BlockingQueue)结构更简单,更容易编程控制。 我们只需要编写业务代码,同步的问题,扔给了阻塞队列来管理。 当队列满的时候生产者调用的put()方法将阻塞,同理当队列为空的时候消费者take()方法将阻塞,等待生产者生产。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumerPattern {

public static void main(String args[]) {
BlockingQueue bufQueue = new LinkedBlockingQueue();
Thread prodThread = new Thread(new Producer(bufQueue));
Thread consThread = new Thread(new Consumer(bufQueue));
prodThread.start();
consThread.start();
}

}

//Producer
class Producer implements Runnable {
private final BlockingQueue<String> bufQueue;
public Producer(BlockingQueue<String> bufQueue) {
this.bufQueue = bufQueue;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println("Produced: " + i);
bufQueue.put(String.valueOf(i));
}
bufQueue.put("OVER");// 设置标志位表示结束
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}

//Consumer
class Consumer implements Runnable {
private final BlockingQueue<String> bufQueue;
public Consumer(BlockingQueue<String> bufQueue) {
this.bufQueue = bufQueue;
}
@Override
public void run() {
try {
String x = null;
while (!(x = bufQueue.take()).equals("OVER")) {
System.out.println("Consumed: " + x);
}
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}

如示例一所示,这里生产者和消费者都是1个,也就是1:1。生产者设置了对应的结束标志"OVER",也就是生产者发出信号自己生产结束,消费者消费完也可以结束了,这样可以避免消费线程一直阻塞。但是,现实环境中生产与消费的速度往往是不同的,为了高效的运行,设置的生产与消费往往不是1:1的比例,也就是说生产者的终止信号不够用。如何才能使消费者得知生产者结束的信号呢?一个很好的方法是同样利用juc中的AtomicInteger对生产者数量进行记录,如下示例。此时线程都可以“友好”结束。
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

class Producer implements Runnable {
private BlockingQueue<String> buf;
private BlockingQueue<String> messages;
private AtomicInteger prodCount;// 生产者计数器

public Producer(BlockingQueue<String> d, BlockingQueue<String> m, AtomicInteger call_count) {
this.buf = d;
this.messages = m;
this.prodCount = call_count;
}

public void run() {
try {
String s;
while ((s = messages.poll()) != null) {
buf.put(s);
System.out.println(Thread.currentThread().getName() + "-produce " + s);
}
} catch (InterruptedException intEx) {
System.out.println("Interrupted! ");
} finally {
// 无论发生什么情况 prodCount都要减小。否则 消费者线程得不到生产者已经结束的消息
prodCount.getAndDecrement();
}

}
}

class Consumer implements Runnable {
private BlockingQueue<String> drop;
private AtomicInteger call_count;
public Consumer(BlockingQueue<String> d, AtomicInteger call_count) {
this.drop = d;
this.call_count = call_count;
}
public void run() {
doSomething();
}
private void doSomething() {
String msg = null;
while ((msg = drop.poll()) != null) {
System.out.println(Thread.currentThread().getName() + "-consume " + msg);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (call_count.get() != 0) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
}
doSomething();
}
}
}

public class MultiProducerConsumer {
private static final int P_NUM = 1; // 记录生产者的数量
private static final AtomicInteger P_NOW_COUNT = new AtomicInteger(P_NUM);

public static void main(String[] args) {
BlockingQueue<String> messages = initMes();
BlockingQueue<String> drop = new ArrayBlockingQueue(100, false);

long begin = System.currentTimeMillis();
Map<String, Thread> threadMap = new HashMap<String, Thread>();
for (int i = 1; i <= 5; i++) {
if (i <= P_NUM) {
String name = "线程A" + i;
threadMap.put(name, new Thread(new Producer(drop, messages, P_NOW_COUNT), name));
} else {
String name = "线程B" + i;
threadMap.put(name, new Thread(new Consumer(drop, P_NOW_COUNT), name));
}
}
for (Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
for (Entry<String, Thread> entry : threadMap.entrySet()) {
try {
entry.getValue().join();
} catch (InterruptedException e) {
}
}
long end = System.currentTimeMillis();

System.out.println("耗费时间: " + (end - begin));
}

public static BlockingQueue<String> initMes() {
List<String> messages = Arrays.asList("cat ", "dog ", "pig", "fish", "sheep", "cattle",
"duck", "chicken", "goose", "bird", "tiger", "lion", "elephant", "wolf", "mouse",
"leopard");
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
queue.addAll(messages);
return queue;
}
}


智能推荐

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号  

赞助商广告