生产者消费者

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
//使用volatile标志是否需要可见性,方便提供给stop()方法
private volatile boolean needProduce=true;
//定义缓冲区
private final Vector sharequeue;
//定义容量
private final int Size;
//static修饰,既能计数,又能保证是线程安全的
private static AtomicInteger count=new AtomicInteger();
public Producer(Vector vector,int Size){
this.Size=Size;
this.sharequeue=vector;
}
@Override
public void run() {
int data;
Random random=new Random();
System.out.println("start product id= "+Thread.currentThread().getId());
try {
while (needProduce){
//模拟延迟
Thread.sleep(random.nextInt(1000));
//当缓冲区大小达到最大容量,便阻塞
while (sharequeue.size()==Size){
synchronized (sharequeue){
System.out.println("Queue is full,produer "+Thread.currentThread().getId()+"is waiting "+sharequeue.size());
sharequeue.wait();
}
}
synchronized (sharequeue){
data=count.incrementAndGet();
sharequeue.add(data);
System.out.println("create data: "+data+",size: "+sharequeue.size());
sharequeue.notifyAll();
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
needProduce=false;
}
}

使用while循环是为了避免虚假唤醒。

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Random;
import java.util.Vector;

public class Consumer implements Runnable {
private final Vector sharequeue;

public Consumer(Vector sharequeue) {
this.sharequeue = sharequeue;
}

@Override
public void run() {
Random random = new Random();
System.out.println("start consumer id= " + Thread.currentThread().getId());
try {
while (true) {
Thread.sleep(random.nextInt(1000));
while (sharequeue.isEmpty()) {
synchronized (sharequeue) {
System.out.println("Queue is empty: ,consumer" + Thread.currentThread().getId() + "is running,size=" + sharequeue.size());
sharequeue.wait();
}
}
synchronized (sharequeue) {
System.out.println("consumer consume data: " + sharequeue.remove(0) + ",size: " + sharequeue.size());
sharequeue.notifyAll();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

主线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.Vector;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {
public static void main(String[] args) throws InterruptedException{
Vector vector=new Vector();
int size=4;
ExecutorService executorService= Executors.newCachedThreadPool();
Producer producer1=new Producer(vector,size);
Producer producer2=new Producer(vector,size);
Producer producer3=new Producer(vector,size);
Consumer consumer1=new Consumer(vector);
Consumer consumer2=new Consumer(vector);
Consumer consumer3=new Consumer(vector);

executorService.execute(producer1);
executorService.execute(producer2);
executorService.execute(producer3);
executorService.execute(consumer1);
executorService.execute(consumer2);
executorService.execute(consumer3);

Thread.sleep(1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
executorService.shutdown();
}
}

输出示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
start product id= 11
start product id= 12
start product id= 13
start consumer id= 14
start consumer id= 15
start consumer id= 16
Queue is empty: ,consumer 14 is running,size=0
create data:1,size:1
consumer consume data: 1,size: 0
Queue is empty: ,consumer 14 is running,size=0
create data:2,size:1
consumer consume data: 2,size: 0
create data:3,size:1
create data:4,size:2
consumer consume data: 3,size: 1
create data:5,size:2
consumer consume data: 4,size: 1
consumer consume data: 5,size: 0
Queue is empty: ,consumer 15 is running,size=0
Queue is empty: ,consumer 14 is running,size=0
Queue is empty: ,consumer 16 is running,size=0
create data:6,size:1
consumer consume data: 6,size: 0
Queue is empty: ,consumer 14 is running,size=0
Queue is empty: ,consumer 15 is running,size=0
create data:7,size:1
consumer consume data: 7,size: 0
Queue is empty: ,consumer 14 is running,size=0
Queue is empty: ,consumer 15 is running,size=0
create data:8,size:1
consumer consume data: 8,size: 0
Queue is empty: ,consumer 14 is running,size=0
Queue is empty: ,consumer 15 is running,size=0
Queue is empty: ,consumer 16 is running,size=0
0%