多线程高并发编程 - 同步器(一)
本篇主要总结同步器的相关例子:包括synchronized、volatile、原子变量类(AtomicXxx)、CountDownLatch、ReentrantLock和ThreadLocal。还涉及到wait和notify/notifyAll。
synchronized 关键字
对某个对象进行加锁,保证操作原子性
运行下面的代码,查看添加 synchronized
前后的区别
public class T implements Runnable{
private int count = 10;
@Override
public /*synchronized*/ void run() {
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}
public static void main(String[] args) {
T t = new T();
for (int i = 0; i < 5; i++) {
new Thread(t).start();
}
}
}
不添加synchronized
某次运行结果:
Thread-0 count = 7
Thread-4 count = 5
Thread-3 count = 6
Thread-2 count = 7
Thread-1 count = 7
出现上面这种结果原因: 线程重入的问题(线程执行过程中,被其他线程打断),因为 count– + sout(count) 不是原子操作
解决: 加上 synchronized 关键字,保证操作原子性 运行结果:
Thread-0 count = 9
Thread-2 count = 8
Thread-4 count = 7
Thread-3 count = 6
Thread-1 count = 5
使用场景
在java代码中使用synchronized
可是使用在代码块和方法中,根据Synchronized
用的位置可以有这些使用场景:
具体查看示例代码 c_001 - c_005部分
知识点
synchronized
锁定的不是代码块,而是 this 对象;- 锁信息记录在堆内存对象中的,不是在栈引用中;
synchronized
是互斥锁- 关键字写在方法非静态上,锁的对象是当前对象
this
;
public synchronized void m() { // 等同于 synchronized (this) {
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}
等同于
public void m() {
synchronized (this) { // 任何线程要执行下面的代码,必须先拿到this锁
// synchronized 锁定的不是代码块,而是 this 对象
// 如果当前对象已经被锁定,其他线程再进入时,就会进行阻塞等待
// 所以 synchronized 是互斥锁
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
}
// 当代码块执行完毕后,锁就会被释放,然后被其他线程获取
}
-
同步方法(加锁)和非同步方法(不加锁)是否可以同时调用?
答:肯定可以
package c_007;
public class T {
//同步方法
public synchronized void m1() {
System.out.println(Thread.currentThread().getName() + " m1 start");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m1 end");
}
//非同步方法
public void m2() {
System.out.println(Thread.currentThread().getName() + " m2 start");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m2 end");
}
public static void main(String[] args) {
T t = new T();
new Thread(t::m1).start(); //jdk8 Lambda表达式
new Thread(t::m2).start();
}
}
/**
运行结果:
Thread-0 m1 start
Thread-1 m2 start
Thread-1 m2 end
Thread-0 m1 end
*/
示例代码 c_007部分
-
脏读现象 对业务写方法加锁,而对业务读方法不加锁,容易出现脏读问题,所以视业务情况如果允许可以给读写都加锁
示例代码 c_008部分
-
synchronized 是可重入锁 即一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请时仍然会得到该对象的锁,同样的,子类调用父类的同步方法,也是可冲入的。
public class T {
synchronized void m1() {
System.out.println("m1 start ");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//调用同样被锁的m2方法
m2();
}
synchronized void m2() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" m2"); // 这句话会打印,调用m2时,不会发生死锁
}
public static void main(String[] args) {
T t = new T();
new Thread(()->t.m1()).start();
}
}
示例代码 c_009-C_010部分
synchronized
代码块中,如果发生异常,锁会被释放
在并发处理过程中,有异常要多加小心,不然可能发生数据不一致的情况。 比如,在一个web app处理过程中,多个servlet线程共同访问同一资源,这时如果异常处理不合适,第一个线程抛出异常,其他线程就会进入同步代码区,有可能访问到异常产生的数据。因此要非常小心处理同步业务员逻辑中的异常。
package c_011;
public class T {
int count = 0;
synchronized void m() {
System.out.println(Thread.currentThread().getName() + " start");
while (true) {
count++;
System.out.println(Thread.currentThread().getName() + " count=" + count);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count == 5) { // 当count == 5 时,synchronized代码块会抛出异常
int i = 1 / 0;
}
}
}
public static void main(String[] args) {
T t = new T();
Runnable r = new Runnable() {
@Override
public void run() {
t.m();
}
};
new Thread(r, "t1").start(); // 执行到第5秒时,抛出 ArithmeticException
// 如果抛出异常后,t2 会继续执行,就代表t2拿到了锁,即t1在抛出异常后释放了锁
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(r, "t2").start();
}
}
示例代码 c_011部分
synchronized 优化
- 同步代码块中的语句越少越好
package c_016;
public class T {
int count = 0;
synchronized void m1() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 业务逻辑中,只有下面这句代码需要 sync, 这时不应该给整个方法上锁
count++;
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void m2() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 业务逻辑中,只有下面这句需要 sync,这时不应该给整个方法上锁
// 采用细粒度的锁,可以使线程争用时间变短,从而提高效率
synchronized (this) {
count++;
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
示例代码 c_016部分
- 锁对象通常要设置为 final类型,保证引用不可以变。
锁定某个对象o,如果o属性发生变化,不影响锁的使用,但是如果o编程另一个对象,则锁定的对象发生变化。
package c_017;
public class T {
/*final*/ Object o = new Object();//锁对象
void m() {
synchronized (o) {
while (true) {
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
T t = new T();
new Thread(t::m, "线程1").start(); //启用线程1
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread2 = new Thread(t::m, "线程2");//启用线程2
t.o = new Object(); // 改变锁引用, 线程2也有机会运行,否则一直都是线程1 运行
thread2.start();
}
}
示例代码 c_017部分
- 不建议使用字符串常量作为锁定对象
在下面的例子中, m1和m2其实是锁定的同一对象 这种情况下,还会可能与其他类库发生死锁,比如某类库中也锁定了字符串 “Hello” 但是无法确认源码的具体位置,所以两个 “Hello” 将会造成死锁 因为你的程序和你用的类库无意间使用了同意把锁
package c_018;
public class T {
String s1 = "Hello";
String s2 = "Hello";
void m1() {
synchronized (s1) {
}
}
void m2() {
synchronized (s2) {
}
}
}
volatile 关键字
关键字,使一个变量在多个线程间可见
* cn: 透明的,临时的
*
* JMM(Java Memory Model):
* 在JMM中,所有对象以及信息都存放在主内存中(包含堆、栈)
* 而每个线程都有自己的独立空间,存储了需要用到的变量的副本,
* 线程对共享变量的操作,都会在自己的工作内存中进行,然后同步给主内存
*
运行下面代码,对比有无volatile的情况下,整个程序运行结果的区别
下面的代码中,running 是位于堆内存中的 t 对象的
当线程t1开始运行的时候,会把running值从内存中读到t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都会去读取堆内存,
这样,当主线程修改running的值之后,t1线程感知不到,所以不会停止运行
使用
volatile
,将会强制所有线程都去堆内存中读取running的值
package c_012;
public class T {
/*volatile*/ boolean running = true;
void m() {
System.out.println(" m start ");
while (running) { // 直到主线程将running设置为false,T线程才会退出
// 在while中加入一些语句,可见性问题可能就会消失,这是因为加入语句后,CPU可能就会出现空闲,然后就会同步主内存中的内容到工作内存
// 所以,可见性问题可能会消失
/*try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
}
System.out.println(" m end ");
}
public static void main(String[] args) {
T t = new T();
new Thread(t::m, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
t.running = true;
}
}
示例代码 c_012部分
知识点
-
volatile
只能保证可见性,不能保证原子性volatile
并不能保证多个线程共同修改running变量所带来的不一致的问题,也就是说volatile
不能替代synchronized
,AtomicXXX
类也是可以保持原子性操作的,详细下面会讲到。
package c_013;
public class T {
volatile int count = 0;
/*AtomicInteger count = new AtomicInteger(0);*/
/*synchronized*/ void m() {
for (int i = 0; i < 10000; i++) {
count++;
/*count.incrementAndGet();*/
}
}
public static void main(String[] args) {
// 创建一个10个线程的list,执行任务皆是 m方法
T t = new T();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(t::m, "t-" + i));
}
// 启动这10个线程
threads.forEach(Thread::start);
// join 到主线程,防止主线程先行结束
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 10个线程,每个线程执行10000次,结果应为 100000
System.out.println(t.count); // 所得结果并不为 100000,说明volatile 不保证原子性
}
}
volatile与synchronize的区别
保持可见性 | 保持原子性 | 效率 | |
---|---|---|---|
synchronize | √ | √ | 低 |
valatile | √ | × | 高 |
AtomicXXX 类
AtomicXXX
代表此类中的所有方法都是原子操作,并且可以保证可见性
AtomicInteger举例:
package c_013;
public class T {
AtomicInteger count = new AtomicInteger(0);
void m() {
for (int i = 0; i < 10000; i++)
count.incrementAndGet();
}
}
public static void main(String[] args) {
// 创建一个10个线程的list,执行任务皆是 m方法
T t = new T();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(t::m, "t-" + i));
}
// 启动这10个线程
threads.forEach(Thread::start);
// join 到主线程,防止主线程先行结束
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 10个线程,每个线程执行10000次,结果应为 100000
System.out.println(t.count);
}
}
/**
* 运行结果: 100000
*/
CountDownLatch
介绍
CountDownLatch是一个计数(构造函数中指定此数值)的锁,当通过countDown方法将此计数值减为0时会唤醒之前调用await的线程。一般用于当某些任务执行完后,在执行其他任务的场景中。
实现原理
计数器通过使用锁(共享锁、排它锁)实现,
CountDownLatch是通过一个计数器来实现的,计数器的初始值为等待线程数量。
CountDownLatch是一个同步的辅助类,它能够使一个线程等待其他线程完成各自的工作后再执行。
CountDownLatch是基于AbstractQueuedSynchronizer(AQS)实现的,其通过state作为计数器。构造CountDownLatch时初始化一个state,以后每调用countDown方法一次,state减1;当state=0时,唤醒在await上被挂起的线程。
CountDownLatch的计数器state不能被重置,如果需要一种能重置count的版本,可以考虑使用CyclicBarrier。
具体代码使用可以参考题目测试中的代码实现
ReentrantLock
ReentrantLock可以用于替代synchronized
ReentrantLock使用完毕后,必须调用unlock()手动释放锁
代码示例:
public class ReentrantLock1 {
ReentrantLock lock = new ReentrantLock();
/*synchronized*/ void m1() {
/*for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}*/
lock.lock(); // 相当于 synchronized
try {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}
} finally {
lock.unlock(); // 使用完毕后,必须手动释放锁
// 不同于synchronized,抛出异常后,不会自动释放锁,需要我们在finally中释放此锁
}
}
/*synchronized*/ void m2() {
/*System.out.println("m2...");*/
lock.lock(); // 相当于 synchronized
try {
System.out.println("m2...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLock1 r1 = new ReentrantLock1();
new Thread(r1::m1, "t1").start(); // m1 已经执行,被t1占有锁this
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(r1::m2, "t2").start(); // 锁已经被其他线程占用,m1执行完毕后,不会执行
}
}
ReentrantLock 和 synchronized 的区别
ReentrantLock 可以完成 synchronized 的任何功能,并且ReentrantLock比 synchronized 更灵活
-
ReentrantLock 可以进行尝试锁定
使用 tryLock() 如果无法锁定、或者在指定时间内无法锁定,线程可以决定是否继续等待。
package c_020;
public class ReentrantLock3 {
ReentrantLock lock = new ReentrantLock();
void m1() {
lock.lock(); // 相当于 synchronized
try {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
}
} finally {
lock.unlock(); // 使用完毕后,必须手动释放锁
// 不同于synchronized,抛出异常后,不会自动释放锁,需要我们在finally中释放此锁
}
}
void m2() {
//沉睡13秒看拿到锁情况
/*try {
TimeUnit.SECONDS.sleep(13);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
// 尝试获取锁,返回true拿到了
if (lock.tryLock()) {
// lock.tryLock(5, TimeUnit.SECONDS) // 等5s内还没拿到就返回false
try {
System.out.println("m2...");
}finally {
System.out.println("t2释放锁...");
lock.unlock();
}
} else {
System.out.println(" m2 没拿到锁");
}
}
public static void main(String[] args) {
ReentrantLock3 r1 = new ReentrantLock3();
new Thread(r1::m1, "t1").start(); // m1 已经执行,被t1占有锁this
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(r1::m2, "t2").start(); // 锁已经被其他线程占用,m1执行完毕后,不会执行
}
}
结果:
0
m2 没拿到锁
1
...
10
- ReentrantLock 可以调用 lockInterruptibly方法,可以对线程interrupt方法做出响应,中断线程等待
package c_020;
public class ReentrantLock4 {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println("t1 start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); // 线程一直占用锁
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
Thread t2 = new Thread(() -> {
try {
lock.lockInterruptibly(); // t2 尝试获取锁
System.out.println("t2 start");
} catch (InterruptedException e) {
System.out.println("t2 等待中被打断");
} finally {
lock.unlock(); // 没有锁定进行unlock就会抛出 IllegalMonitorStateException
}
}, "t2");
t2.start();
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 打断线程2的等待
t2.interrupt();
}
}
- ReentrantLock 可以指定为公平锁,synchronized 是不公平锁
公平锁: 先获取锁的人,在锁被释放时,优先获得锁
不公平锁,无论先后,线程调度器将会随机给某个线程锁,不用计算线程时序,效率较高
package c_020;
public class ReentrantLock5 extends Thread {
private static ReentrantLock lock = new ReentrantLock(true);// 指定锁为公平锁
@Override
public void run() {
for (int i = 0; i < 100; i++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取锁");
} finally {
lock.unlock(); // 公平锁 t1 unlock 后,等待时间长的一定是 t2 所以下次一定是 t2 执行
}
}
}
public static void main(String[] args) {
ReentrantLock5 t1 = new ReentrantLock5();
ReentrantLock5 t2 = new ReentrantLock5();
t1.start();
t2.start();
}
}
运行结果:
Thread-0获取锁
Thread-1获取锁
Thread-0获取锁
Thread-1获取锁
Thread-0获取锁
Thread-1获取锁
。。。。
示例代码 c_020部分
ThreadLocal
线程局部变量
特点:
ThreadLocal:使用空间换时间 效率更高 线程同步:使用时间换空间
ThreadLocal可能会导致内存泄漏
使用示例:
package c_022;
public class ThreadLocal2 {
static ThreadLocal<Person> p = new ThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(p.get()); // 2. 虽然threadLocal时共享变量,但是取不到其他线程放入的值,所以此处为null
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
p.set(new Person()); // 1. 往线程局部变量放入一个person
}).start();
}
static class Person {
String name = "zhangsan";
}
}
题目测试1
题目
实现一个容器,提供两个方法,add,size 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到达5时,线程2给出提示并结束
容器实现
容器实现很简单,我们就使用一个集合就搞定了,然后提供add,size方法,容器类如下:
package c_019_m;
public class MyContainer {
private List<Object> list = new ArrayList<>();
public void add(Object ele) {
list.add(ele);
}
public int size() {
return list.size();
}
}
线程实现
线程的方案就很多了
volatile 关键字
使用volatile
关键字使容器保持可见性,使list发生变化时,主动通知其他线程,更新工作空间
package c_019_m;
public class MyContainer {
//容器添加volatile关键字保持可见性
private volatile List<Object> list = new ArrayList<>();
//后续省略。。。。
}
public static void main(String[] args) {
MyContainer container = new MyContainer();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
container.add(new Object());
System.out.println("add " + i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
new Thread(() -> {
while (true) {
if (container.size() == 5) {
break;
}
}
System.out.println("监测到容器长度为5,线程2立即退出");
}, "t2").start();
}
结果:
add 0
add 1
add 2
add 3
add 4
监测到容器长度为5,线程2立即退出
这个方案虽说可以实现,但也是有一些问题存在的:
- 不够精确,当container.size == 5 还未执行break时,有可能被其他线程抢占;
- 损耗性能,t2 线程,一直在走while循环,很浪费性能
wait() 与 notify()
wait() 与 notify() 方法的调用必须在同步代码块中
wait(): 会释放当前的锁,然后让出CPU,进入等待状态。
notify/notifyAll() : 会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。
思路: 由于wait() 会释放锁,所以我们可以先启动监控线程t2,调用wait()释放锁,再起线程 t1,当满足条件时notify()唤醒线程t1.
代码实现:
public static void main(String[] args) {
MyContainer3 container = new MyContainer3();
final Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
System.out.println("t2 启动");
if (container.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("监测到容器长度为5,线程2立即退出");
lock.notify();
}
}, "t2").start();
// 先启动t2线程,让t2线程进入等待状态
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
container.add(new Object());
System.out.println("add " + i);
// 当长度为5时,通知 t2 进行退出
if (container.size() == 5) {
lock.notify(); // notify 不会释放锁,即便通知t2,t2也获取不到锁
// 可以在wait一下,将锁释放,再让t2通知t1继续执行
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
问题解决,但是呢,这样看起来代码是不是有点小复杂,有点绕了。。。so,继续。
CountDownLatch
使用Latch (门闩) 替代 wait notify来进行通信好处是,通信简单,同时也可以指定等待时间
使用await和countDown 方法替代 wait 和 notify
CountDownLatch不涉及锁定,当count值为0时,当前线程继续运行;
当不涉及同步,只涉及线程通信的时候,用synchronized + wait + notify 就显得太重了
public static void main(String[] args) {
MyContainer5 container = new MyContainer5();
// Count down 往下数 Latch 门闩
// 门闩不能保证可见性,不是一种同步方式,只是一种线程通信方式,保证不了可见性
// 门闩的等待,不会持有任何锁
CountDownLatch latch = new CountDownLatch(1);//创建门闩
new Thread(() -> {
System.out.println("t2 启动");
if (container.size() != 5) {
try {
latch.await();
// 指定等待时间
//latch.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("监测到容器长度为5,线程2立即退出");
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
container.add(new Object());
System.out.println("add " + i);
// 当长度为5时,撤掉一个门闩,此时门闩为0,门会打开,即t2会执行
if (container.size() == 5) {
latch.countDown();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
}
题目测试2
题目
写一个固定容量的容器,拥有put和get方法,以及getCount方法* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
容器实现
注意点:生产者消费者模式
如果调用 get方法时,容器为空,get方法就需要阻塞等待
如果调用 put方法时,容器满了,put方法就需要阻塞等待
wait/notify
package c_021_m;
public class MyContainer1<T> {
private final LinkedList<T> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;
public synchronized void put(T t) {
while (MAX == count) { // 如果容量最大,释放锁等待 ///思考? 【这里为什么使用while,而不是使用if???】
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 否则 put
list.add(t);
++count;
this.notifyAll(); // 通知消费者线程,可以消费了
//思考? 【这里为什么调用 notifyAll 而不是 notify ?】
}
public synchronized T get() {
while (list.size() == 0) { // 如果容量为空,释放锁等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 否则获取
T t = list.removeFirst();
count--;
this.notifyAll(); // 通知生产者线程生产
return t;
}
}
思考解答:
为什么使用while 而不是使用 if ???
在与wait()的配合中,百分之99的程序都是与while而不是if结合使用。
上述代码中,在容器已满的情况下,put方法会wait等待,当容器中的元素被消费者消费了一部分,就会唤醒所有put方法,
put方法会继续向下执行,直接执行list.add(t),那么多个生产者线程执行list.add() 就有可能出现数据一致性的问题。
如果使用while则会循环判断,就避免了这些问题。
不是有锁吗?为什么会需要循环判断?
wait之后,锁就会失去,再次被唤醒时,并且得到锁之后,是从list.add()开始执行的,会无判断直接加入到容器中。
为什么调用 notifyAll 而不是 notify ?
因为notify有可能再次叫醒一个生产者线程
使用Lock&Condition
使用Lock和Condition实现,可以精确唤醒某些线程,提高执行效率
package c_021_m;
public class MyContainer2<T> {
private final LinkedList<T> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t) {
lock.lock();
try {
while (MAX == count) {
producer.await();
}
list.add(t);
++count;
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
lock.lock();
try {
while (list.size() == 0) {
producer.signalAll();
consumer.await();
}
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
T t = list.removeFirst();
count--;
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
// 启动消费者线程
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 5; j++) {
System.out.println("容器内所剩值:"+c.count+" ; c: "+c.list.toString());
System.out.println(c.get());
}
}, "c_" + i ).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 2; i++) {
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 2; j++) {
System.out.println("插入值: "+Thread.currentThread().getName() + " " + j);
c.put(Thread.currentThread().getName() + " " + j);
}
}, "p_" + i).start();
}
}
}
结果:
容器内所剩值:0 ; c: []
插入值: p_1 0
插入值: p_1 1
容器内所剩值:2 ; c: [p_1 0, p_1 1]
p_1 0
容器内所剩值:1 ; c: [p_1 1]
p_1 1
容器内所剩值:0 ; c: []
容器内所剩值:0 ; c: []
...
插入值: p_0 0
插入值: p_0 1
p_0 0
容器内所剩值:1 ; c: [p_0 1]
p_0 1
容器内所剩值:0 ; c: []