线程中的生产者消费者场景

生产者-消费者问题可以说是线程中最基础,最经典的场景了。它把并发编程中涉及到的一些常见概念都披露了出来,可以说是线程入门绕不开的场景。什么是’生产者-消费者问题’呢,通俗的定义就是:
在指定容量的容器中,同时存在两种对象对容器进行生产或者消费的动作,由于容器的容量有限,使得”生产”不能太多(太多没有意义,容器装不下),“消费”不能无限(容器中不一定含有那么多消费量)。具体详情可以参见生产者消费者问题

在Java的线程模型中,我总结了对于这一问题的3种处理模式,可以分别比较一下。

1. 普通模式 synchronized+notify+await

这种模式应该最为普遍,不需要了解JDK1.5以后的相关线程类工具,直接使用内置关键字synchronized保证线程访问的同步性,同时使用继承至Object对象的wait,notify方法可以根据业务需求控制线程的实际访问权限。详情如下,这里业务场景是:一个盘子一次只能装一个鸡蛋,分别有放鸡蛋的线程和取鸡蛋的线程对盘子进行存取操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class Plate {


// 容器
private List<Egg> eggs = new ArrayList<Egg>();


// 取鸡蛋的业务逻辑
public synchronized void getEgg() {
while (eggs.size() == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Egg e = eggs.get(0);
eggs.clear();
System.out.println(">>>>>>>>>>get egg:" + e.getName());
notify();
}


// 放鸡蛋的业务逻辑
public synchronized void putEgg(Egg egg) {
while (eggs.size() != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
eggs.add(egg);
System.out.println(">>>>>>>>>>put egg:" + egg.getName());
notify();
}

// 生产者线程
static class PutThread implements Runnable{

private Plate plate;

PutThread(Plate plate) {
this.plate = plate;
}

@Override
public void run() {
plate.putEgg(new Egg("egg[" + RandomUtils.nextInt(10) + "]"));
}
}

//消费者线程
static class GetThread implements Runnable{
private Plate plate;

GetThread(Plate plate) {
this.plate = plate;
}

@Override
public void run() {
plate.getEgg();
}
}


static class Egg {
private String name;
String getName() {
return name;
}
void setName(String name) {
this.name = name;
}

Egg(String name) {
this.name = name;
}
}


public static void main(String [] args){
Plate p = new Plate();
while(true) {
new Thread(new PutThread(p)).start();
new Thread(new GetThread(p)).start();
}

}

}

2. 巧妙模式 Semaphore

Semaphore 是JDK5推出线程工具类之一,JDK5推出的一系列线程工具类大大简化了并发编程,覆盖了一些常见的业务场景,后面我会有篇文章单独讲讲这些工具类。
下面看看如何用Semaphore 进行生产者消费者问题的解决思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class NewPlate {

private Semaphore fullSema = new Semaphore(10); // 定义容器的最大容量条件
private Semaphore emptySema = new Semaphore(0);// 定义容器的最小容量条件
private Semaphore mutex = new Semaphore(1);// 这个非常重要,用来控制 消费/生产逻辑一次只有一个线程来访问,说白了就是模拟Synchronized的语义。

private ArrayList<Object> list = new ArrayList<Object>();

public void set(Object data) {
try {
// 先判断是否满了,语义就是wait()
fullSema.acquire();

// 保证一次只有一个线程访问,语义就是synchronized
mutex.acquire();
System.out.printf("=====before set , current size:%d\n", list.size());
list.add(data);
TimeUnit.SECONDS.sleep(1);
System.out.printf("=====after set , current size:%d\n", list.size());
mutex.release();
// 为空条件释放一个,语义就是notify()
emptySema.release();

} catch (InterruptedException e) {
e.printStackTrace();
}
}

public Object get() {
Object ret = null;
try {
emptySema.acquire();
mutex.acquire();
System.out.printf(">>>>>>before get , current size:%d\n", list.size());
ret = list.remove(0);
TimeUnit.SECONDS.sleep(4);
System.out.printf(">>>>>>after get, current size:%d\n", list.size());
mutex.release();
fullSema.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
return ret;
}

public static void main(String[] args) {

final NewPlate newPlate = new NewPlate();
Thread setThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
newPlate.set(new Object());
}
}
});
Thread getThead = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
newPlate.get();
}
}
});

setThread.start();
getThead.start();


}

}

3. 高阶模式 Lock + Condition

Lock 提供了与synchronized相似的语义,但是功能更为强大,我个人认为这种模式从语义上更好理解,更类似人类的语言逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class BoundedBuffer {


private int maxSize;// 容器的容量
private LinkedList<Object> buffer; // 容器

private Lock lock;// 锁,用来加锁 生产/消费逻辑,保证一次只有一个线程访问
private Condition notFull;// 非满条件,在容量已满的情况下,控制生产者继续生产
private Condition notEmpty;// 非空条件,在容量是空的情况下,控制消费线程继续消费

BoundedBuffer() {
maxSize = 5;
buffer = new LinkedList<Object>();
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}

public void set(Object data) {
lock.lock();
try {
// 容量已满,则生产者等待
while (buffer.size() == maxSize) {
notFull.await();
}
buffer.offer(data);
TimeUnit.SECONDS.sleep(2);
System.out.printf("set %s, size:%d\n", Thread.currentThread().getName(), buffer.size());

// 生产完毕,提醒所有消费者可以消费了
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public Object get() {
lock.lock();
Object ret = null;
try {
// 容量为空,则消费者等待
while (buffer.size() == 0) {
notEmpty.await();
}
ret = buffer.poll();
TimeUnit.SECONDS.sleep(5);
System.out.printf("get %s,get one, current size:%d\n", Thread.currentThread().getName(), buffer.size());

// 消费完毕,提醒所有生产者者可以继续生产
notFull.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return ret;
}

static class Setter implements Runnable {

private BoundedBuffer boundedBuffer;

Setter(BoundedBuffer boundedBuffer) {
this.boundedBuffer = boundedBuffer;
}

@Override
public void run() {
boundedBuffer.set(new Object());
}
}

static class Getter implements Runnable {
private BoundedBuffer boundedBuffer;

Getter(BoundedBuffer boundedBuffer) {
this.boundedBuffer = boundedBuffer;
}

@Override
public void run() {
boundedBuffer.get();
}
}

public static void main(String[] args) {

BoundedBuffer boundedBuffer = new BoundedBuffer();
Setter setter = new Setter(boundedBuffer);
Getter getter = new Getter(boundedBuffer);
for (int i = 0; i < 10; i++) {
Thread setThread = new Thread(setter);
Thread getThread = new Thread(getter);
setThread.start();
getThread.start();
}
}