生产者-消费者问题可以说是线程中最基础,最经典的场景了。它把并发编程中涉及到的一些常见概念都披露了出来,可以说是线程入门绕不开的场景。什么是’生产者-消费者问题’呢,通俗的定义就是:在指定容量的容器中,同时存在两种对象对容器进行生产或者消费的动作,由于容器的容量有限,使得”生产”不能太多(太多没有意义,容器装不下),“消费”不能无限(容器中不一定含有那么多消费量)。 具体详情可以参见生产者消费者问题 。
在Java的线程模型中,我总结了对于这一问题的3种处理模式,可以分别比较一下。
1. 普通模式 synchronized+notify+await 这种模式应该最为普遍,不需要了解JDK1.5以后的相关线程类工具,直接使用内置关键字synchronized保证线程访问的同步性,同时使用继承至Object对象的wait,notify方法可以根据业务需求控制线程的实际访问权限。详情如下,这里业务场景是:一个盘子一次只能装一个鸡蛋,分别有放鸡蛋的线程和取鸡蛋的线程对盘子进行存取操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public class Plate { private List<Egg> eggs = new ArrayList<Egg>(); public synchronized void getEgg () { while (eggs.size() == 0 ) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Egg e = eggs.get(0 ); eggs.clear(); System.out.println(">>>>>>>>>>get egg:" + e.getName()); notify(); } public synchronized void putEgg (Egg egg) { while (eggs.size() != 0 ) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } eggs.add(egg); System.out.println(">>>>>>>>>>put egg:" + egg.getName()); notify(); } static class PutThread implements Runnable { private Plate plate; PutThread(Plate plate) { this .plate = plate; } @Override public void run () { plate.putEgg(new Egg("egg[" + RandomUtils.nextInt(10 ) + "]" )); } } static class GetThread implements Runnable { private Plate plate; GetThread(Plate plate) { this .plate = plate; } @Override public void run () { plate.getEgg(); } } static class Egg { private String name; String getName () { return name; } void setName (String name) { this .name = name; } Egg(String name) { this .name = name; } } public static void main (String [] args) { Plate p = new Plate(); while (true ) { new Thread(new PutThread(p)).start(); new Thread(new GetThread(p)).start(); } } }
2. 巧妙模式 Semaphore Semaphore 是JDK5推出线程工具类之一,JDK5推出的一系列线程工具类大大简化了并发编程,覆盖了一些常见的业务场景,后面我会有篇文章单独讲讲这些工具类。 下面看看如何用Semaphore 进行生产者消费者问题的解决思路。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class NewPlate { private Semaphore fullSema = new Semaphore(10 ); private Semaphore emptySema = new Semaphore(0 ); private Semaphore mutex = new Semaphore(1 ); private ArrayList<Object> list = new ArrayList<Object>(); public void set (Object data) { try { fullSema.acquire(); mutex.acquire(); System.out.printf("=====before set , current size:%d\n" , list.size()); list.add(data); TimeUnit.SECONDS.sleep(1 ); System.out.printf("=====after set , current size:%d\n" , list.size()); mutex.release(); emptySema.release(); } catch (InterruptedException e) { e.printStackTrace(); } } public Object get () { Object ret = null ; try { emptySema.acquire(); mutex.acquire(); System.out.printf(">>>>>>before get , current size:%d\n" , list.size()); ret = list.remove(0 ); TimeUnit.SECONDS.sleep(4 ); System.out.printf(">>>>>>after get, current size:%d\n" , list.size()); mutex.release(); fullSema.release(); } catch (InterruptedException e) { e.printStackTrace(); } return ret; } public static void main (String[] args) { final NewPlate newPlate = new NewPlate(); Thread setThread = new Thread(new Runnable() { @Override public void run () { while (true ) { newPlate.set(new Object()); } } }); Thread getThead = new Thread(new Runnable() { @Override public void run () { while (true ) { newPlate.get(); } } }); setThread.start(); getThead.start(); } }
3. 高阶模式 Lock + Condition Lock 提供了与synchronized相似的语义,但是功能更为强大,我个人认为这种模式从语义上更好理解,更类似人类的语言逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public class BoundedBuffer { private int maxSize; private LinkedList<Object> buffer; private Lock lock; private Condition notFull; private Condition notEmpty; BoundedBuffer() { maxSize = 5 ; buffer = new LinkedList<Object>(); lock = new ReentrantLock(); notFull = lock.newCondition(); notEmpty = lock.newCondition(); } public void set (Object data) { lock.lock(); try { while (buffer.size() == maxSize) { notFull.await(); } buffer.offer(data); TimeUnit.SECONDS.sleep(2 ); System.out.printf("set %s, size:%d\n" , Thread.currentThread().getName(), buffer.size()); notEmpty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public Object get () { lock.lock(); Object ret = null ; try { while (buffer.size() == 0 ) { notEmpty.await(); } ret = buffer.poll(); TimeUnit.SECONDS.sleep(5 ); System.out.printf("get %s,get one, current size:%d\n" , Thread.currentThread().getName(), buffer.size()); notFull.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return ret; } static class Setter implements Runnable { private BoundedBuffer boundedBuffer; Setter(BoundedBuffer boundedBuffer) { this .boundedBuffer = boundedBuffer; } @Override public void run () { boundedBuffer.set(new Object()); } } static class Getter implements Runnable { private BoundedBuffer boundedBuffer; Getter(BoundedBuffer boundedBuffer) { this .boundedBuffer = boundedBuffer; } @Override public void run () { boundedBuffer.get(); } } public static void main (String[] args) { BoundedBuffer boundedBuffer = new BoundedBuffer(); Setter setter = new Setter(boundedBuffer); Getter getter = new Getter(boundedBuffer); for (int i = 0 ; i < 10 ; i++) { Thread setThread = new Thread(setter); Thread getThread = new Thread(getter); setThread.start(); getThread.start(); } }