Shawn's Blog

蒸汽兔

多线程高并发编程 - 同步器(一)

字数:6022 字 阅读时长:约 33 分钟 阅读

本篇主要总结同步器的相关例子:包括synchronized、volatile、原子变量类(AtomicXxx)、CountDownLatch、ReentrantLock和ThreadLocal。还涉及到wait和notify/notifyAll。

synchronized 关键字

对某个对象进行加锁,保证操作原子性

运行下面的代码,查看添加 synchronized前后的区别

public class T implements Runnable{

    private int count = 10;
    
    @Override
    public /*synchronized*/ void run() {
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }

    public static void main(String[] args) {
        T t = new T();
        for (int i = 0; i < 5; i++) {
            new Thread(t).start();
        }
    }
}

不添加synchronized某次运行结果:

Thread-0 count = 7
Thread-4 count = 5
Thread-3 count = 6
Thread-2 count = 7
Thread-1 count = 7

出现上面这种结果原因: 线程重入的问题(线程执行过程中,被其他线程打断),因为 count– + sout(count) 不是原子操作

解决: 加上 synchronized 关键字,保证操作原子性 运行结果:

Thread-0 count = 9
Thread-2 count = 8
Thread-4 count = 7
Thread-3 count = 6
Thread-1 count = 5

使用场景

在java代码中使用synchronized可是使用在代码块和方法中,根据Synchronized用的位置可以有这些使用场景:

placeholder

具体查看示例代码 c_001 - c_005部分

知识点

  1. synchronized 锁定的不是代码块,而是 this 对象;
  2. 锁信息记录在堆内存对象中的,不是在栈引用中;
  3. synchronized 是互斥锁
  4. 关键字写在方法非静态上,锁的对象是当前对象this
public synchronized void m() { // 等同于 synchronized (this) { 
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
}

等同于

public void m() {
    synchronized (this) { // 任何线程要执行下面的代码,必须先拿到this锁
        // synchronized 锁定的不是代码块,而是 this 对象
        // 如果当前对象已经被锁定,其他线程再进入时,就会进行阻塞等待
        // 所以 synchronized 是互斥锁
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }
    // 当代码块执行完毕后,锁就会被释放,然后被其他线程获取
}
  1. 同步方法(加锁)和非同步方法(不加锁)是否可以同时调用?

    答:肯定可以

package c_007;

public class T {
    
    //同步方法
    public synchronized void m1() {
        System.out.println(Thread.currentThread().getName() + " m1 start");
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " m1 end");
    }
    
    //非同步方法
    public void m2() {
        System.out.println(Thread.currentThread().getName() + " m2 start");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " m2 end");
    }

    public static void main(String[] args) {
        T t = new T();
        new Thread(t::m1).start(); //jdk8 Lambda表达式
        new Thread(t::m2).start();
    }
}

/**
运行结果:
Thread-0 m1 start
Thread-1 m2 start
Thread-1 m2 end
Thread-0 m1 end
*/

示例代码 c_007部分

  1. 脏读现象 对业务写方法加锁,而对业务读方法不加锁,容易出现脏读问题,所以视业务情况如果允许可以给读写都加锁

    示例代码 c_008部分

  2. synchronized 是可重入锁 即一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请时仍然会得到该对象的锁,同样的,子类调用父类的同步方法,也是可冲入的。

public class T {

    synchronized void m1() {
        System.out.println("m1 start ");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //调用同样被锁的m2方法
        m2();
    }

    synchronized void m2() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(" m2"); // 这句话会打印,调用m2时,不会发生死锁
    }
    
    public static void main(String[] args) {
        T t = new T();
        new Thread(()->t.m1()).start();
    }
}

示例代码 c_009-C_010部分

  1. synchronized 代码块中,如果发生异常,锁会被释放

在并发处理过程中,有异常要多加小心,不然可能发生数据不一致的情况。 比如,在一个web app处理过程中,多个servlet线程共同访问同一资源,这时如果异常处理不合适,第一个线程抛出异常,其他线程就会进入同步代码区,有可能访问到异常产生的数据。因此要非常小心处理同步业务员逻辑中的异常。

package c_011;

public class T {

    int count = 0;
    
    synchronized void m() {
        System.out.println(Thread.currentThread().getName() + " start");
        while (true) {
            count++;
            System.out.println(Thread.currentThread().getName() + " count=" + count);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (count == 5) {  // 当count == 5 时,synchronized代码块会抛出异常
                int i = 1 / 0; 
            }
        }
    }

    public static void main(String[] args) {
        T t = new T();
        Runnable r = new Runnable() {
            @Override
            public void run() {
                t.m();
            }
        };
        new Thread(r, "t1").start(); // 执行到第5秒时,抛出 ArithmeticException 
        // 如果抛出异常后,t2 会继续执行,就代表t2拿到了锁,即t1在抛出异常后释放了锁
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(r, "t2").start();
    }

}

示例代码 c_011部分

synchronized 优化

  1. 同步代码块中的语句越少越好
package c_016;

public class T {

    int count = 0;
    
    synchronized void m1() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // 业务逻辑中,只有下面这句代码需要 sync, 这时不应该给整个方法上锁
        count++;

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    void m2() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 业务逻辑中,只有下面这句需要 sync,这时不应该给整个方法上锁
        // 采用细粒度的锁,可以使线程争用时间变短,从而提高效率
        synchronized (this) {
            count++;
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

示例代码 c_016部分

  1. 锁对象通常要设置为 final类型,保证引用不可以变。

锁定某个对象o,如果o属性发生变化,不影响锁的使用,但是如果o编程另一个对象,则锁定的对象发生变化。

package c_017;

public class T {

    /*final*/ Object o = new Object();//锁对象
    
    void m() {
        synchronized (o) {
            while (true) {
                System.out.println(Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        T t = new T();
        new Thread(t::m, "线程1").start(); //启用线程1
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread thread2 = new Thread(t::m, "线程2");//启用线程2
        t.o = new Object(); // 改变锁引用, 线程2也有机会运行,否则一直都是线程1 运行      
        thread2.start();
    }

}

示例代码 c_017部分

  1. 不建议使用字符串常量作为锁定对象

在下面的例子中, m1和m2其实是锁定的同一对象 这种情况下,还会可能与其他类库发生死锁,比如某类库中也锁定了字符串 “Hello” 但是无法确认源码的具体位置,所以两个 “Hello” 将会造成死锁 因为你的程序和你用的类库无意间使用了同意把锁

package c_018;

public class T {

    String s1 = "Hello";
    String s2 = "Hello";
    
    void m1() {
        synchronized (s1) {
            
        }
    }

    void m2() {
        synchronized (s2) {
            
        }
    }
}


volatile 关键字

关键字,使一个变量在多个线程间可见

 * cn: 透明的临时的
 * 
 * JMM(Java Memory Model) 
 * 在JMM中所有对象以及信息都存放在主内存中包含堆
 * 而每个线程都有自己的独立空间存储了需要用到的变量的副本
 * 线程对共享变量的操作都会在自己的工作内存中进行然后同步给主内存
 * 

运行下面代码,对比有无volatile的情况下,整个程序运行结果的区别

下面的代码中,running 是位于堆内存中的 t 对象的

当线程t1开始运行的时候,会把running值从内存中读到t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都会去读取堆内存,

这样,当主线程修改running的值之后,t1线程感知不到,所以不会停止运行

使用volatile,将会强制所有线程都去堆内存中读取running的值

package c_012;

public class T {

    /*volatile*/ boolean running = true;   

    void m() {
        System.out.println(" m start ");
        while (running) { // 直到主线程将running设置为false,T线程才会退出
            // 在while中加入一些语句,可见性问题可能就会消失,这是因为加入语句后,CPU可能就会出现空闲,然后就会同步主内存中的内容到工作内存
            // 所以,可见性问题可能会消失
            /*try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/
        }
        System.out.println(" m end ");
    }

    public static void main(String[] args) {
        T t = new T();
        new Thread(t::m, "t1").start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t.running = true;
    }

}

示例代码 c_012部分

知识点

  1. volatile只能保证可见性,不能保证原子性

    volatile并不能保证多个线程共同修改running变量所带来的不一致的问题,也就是说volatile不能替代synchronized,AtomicXXX类也是可以保持原子性操作的,详细下面会讲到。

package c_013;

public class T {

    volatile int count = 0;
    /*AtomicInteger count = new AtomicInteger(0);*/
    
    /*synchronized*/ void m() {
        for (int i = 0; i < 10000; i++) {
            count++;
            /*count.incrementAndGet();*/
        }
    }

    public static void main(String[] args) {
        // 创建一个10个线程的list,执行任务皆是 m方法
        T t = new T();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(t::m, "t-" + i));
        }
        
        // 启动这10个线程
        threads.forEach(Thread::start);
        
        // join 到主线程,防止主线程先行结束
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 10个线程,每个线程执行10000次,结果应为 100000
        System.out.println(t.count);  // 所得结果并不为 100000,说明volatile 不保证原子性
    }

}

volatile与synchronize的区别

  保持可见性 保持原子性 效率
synchronize
valatile ×

AtomicXXX 类

AtomicXXX 代表此类中的所有方法都是原子操作,并且可以保证可见性

AtomicInteger举例:

package c_013;

public class T {

    AtomicInteger count = new AtomicInteger(0);
    
    void m() {
        for (int i = 0; i < 10000; i++) 
            count.incrementAndGet();
        }
    }

    public static void main(String[] args) {
        // 创建一个10个线程的list,执行任务皆是 m方法
        T t = new T();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(t::m, "t-" + i));
        }
        
        // 启动这10个线程
        threads.forEach(Thread::start);
        
        // join 到主线程,防止主线程先行结束
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 10个线程,每个线程执行10000次,结果应为 100000
        System.out.println(t.count);  
    }

}

/**
* 运行结果: 100000
*/

CountDownLatch

介绍

CountDownLatch是一个计数(构造函数中指定此数值)的锁,当通过countDown方法将此计数值减为0时会唤醒之前调用await的线程。一般用于当某些任务执行完后,在执行其他任务的场景中。

实现原理

计数器通过使用锁(共享锁、排它锁)实现,

CountDownLatch是通过一个计数器来实现的,计数器的初始值为等待线程数量。

CountDownLatch是一个同步的辅助类,它能够使一个线程等待其他线程完成各自的工作后再执行。

CountDownLatch是基于AbstractQueuedSynchronizer(AQS)实现的,其通过state作为计数器。构造CountDownLatch时初始化一个state,以后每调用countDown方法一次,state减1;当state=0时,唤醒在await上被挂起的线程。

CountDownLatch的计数器state不能被重置,如果需要一种能重置count的版本,可以考虑使用CyclicBarrier。

具体代码使用可以参考题目测试中的代码实现

ReentrantLock

ReentrantLock可以用于替代synchronized

ReentrantLock使用完毕后,必须调用unlock()手动释放锁

代码示例:

public class ReentrantLock1 {
    
    ReentrantLock lock = new ReentrantLock();
    
    /*synchronized*/ void m1() {
        /*for (int i = 0; i < 10; i++) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(i);
        }*/
        lock.lock(); // 相当于 synchronized
        try {
            for (int i = 0; i < 10; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
            }
        } finally {
            lock.unlock(); // 使用完毕后,必须手动释放锁
            // 不同于synchronized,抛出异常后,不会自动释放锁,需要我们在finally中释放此锁
        }
    }
    
    /*synchronized*/ void m2() {
        /*System.out.println("m2...");*/
        
        lock.lock(); // 相当于 synchronized
        try {
            System.out.println("m2...");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ReentrantLock1 r1 = new ReentrantLock1();
        new Thread(r1::m1, "t1").start(); // m1 已经执行,被t1占有锁this
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(r1::m2, "t2").start(); // 锁已经被其他线程占用,m1执行完毕后,不会执行
    }

}


ReentrantLock 和 synchronized 的区别

ReentrantLock 可以完成 synchronized 的任何功能,并且ReentrantLock比 synchronized 更灵活

  1. ReentrantLock 可以进行尝试锁定

    使用 tryLock() 如果无法锁定、或者在指定时间内无法锁定,线程可以决定是否继续等待。

package c_020;

public class ReentrantLock3 {
    ReentrantLock lock = new ReentrantLock();
    void m1() {
        lock.lock(); // 相当于 synchronized
        try {
            for (int i = 0; i < 10; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
            }
        } finally {
            lock.unlock(); // 使用完毕后,必须手动释放锁
            // 不同于synchronized,抛出异常后,不会自动释放锁,需要我们在finally中释放此锁
        }
    }
    void m2() {
        //沉睡13秒看拿到锁情况
        /*try {
            TimeUnit.SECONDS.sleep(13);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
        // 尝试获取锁,返回true拿到了
        if (lock.tryLock()) {
            // lock.tryLock(5, TimeUnit.SECONDS) // 等5s内还没拿到就返回false
            try {
                System.out.println("m2...");
            }finally {
                System.out.println("t2释放锁...");
                lock.unlock();
            }
        } else {
            System.out.println(" m2 没拿到锁");
        }
    }
    
    public static void main(String[] args) {
        ReentrantLock3 r1 = new ReentrantLock3();
        new Thread(r1::m1, "t1").start(); // m1 已经执行,被t1占有锁this
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(r1::m2, "t2").start(); // 锁已经被其他线程占用,m1执行完毕后,不会执行
    }
}

结果:

0
m2 没拿到锁
1
...
10

  1. ReentrantLock 可以调用 lockInterruptibly方法,可以对线程interrupt方法做出响应,中断线程等待
package c_020;

public class ReentrantLock4 {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("t1 start");
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);  // 线程一直占用锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            
        }, "t1").start();

        Thread t2 = new Thread(() -> {

            try {
                lock.lockInterruptibly(); // t2 尝试获取锁
                System.out.println("t2 start");
            } catch (InterruptedException e) {
                System.out.println("t2 等待中被打断");
            } finally {
                lock.unlock(); // 没有锁定进行unlock就会抛出 IllegalMonitorStateException 
            }
        }, "t2");
        t2.start();

        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 打断线程2的等待
        t2.interrupt();
        
    }

}

  1. ReentrantLock 可以指定为公平锁,synchronized 是不公平锁

公平锁: 先获取锁的人,在锁被释放时,优先获得锁

不公平锁,无论先后,线程调度器将会随机给某个线程锁,不用计算线程时序,效率较高

package c_020;
public class ReentrantLock5 extends Thread {

    private static ReentrantLock lock = new ReentrantLock(true);// 指定锁为公平锁

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "获取锁");
            } finally {
                lock.unlock(); // 公平锁 t1 unlock 后,等待时间长的一定是 t2 所以下次一定是 t2 执行
            }
        }
    }

    public static void main(String[] args) {
        ReentrantLock5 t1 = new ReentrantLock5();
        ReentrantLock5 t2 = new ReentrantLock5();
        t1.start();
        t2.start();
    }
}

运行结果:

Thread-0获取锁
Thread-1获取锁
Thread-0获取锁
Thread-1获取锁
Thread-0获取锁
Thread-1获取锁
。。。。

示例代码 c_020部分

ThreadLocal

线程局部变量

特点:

ThreadLocal:使用空间换时间 效率更高 线程同步:使用时间换空间

ThreadLocal可能会导致内存泄漏

使用示例:

package c_022;

public class ThreadLocal2 {

    static ThreadLocal<Person> p = new ThreadLocal<>();

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(p.get()); // 2. 虽然threadLocal时共享变量,但是取不到其他线程放入的值,所以此处为null
        }).start();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            p.set(new Person()); // 1. 往线程局部变量放入一个person
        }).start();
    }
    
    static class Person {
        String name = "zhangsan";
    }
}

题目测试1

题目

实现一个容器,提供两个方法,add,size 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到达5时,线程2给出提示并结束

容器实现

容器实现很简单,我们就使用一个集合就搞定了,然后提供add,size方法,容器类如下:

package c_019_m;

public class MyContainer {

    private List<Object> list = new ArrayList<>();

    public void add(Object ele) {
        list.add(ele);
    }

    public int size() {
        return list.size();
    }

}


线程实现

线程的方案就很多了

volatile 关键字

使用volatile 关键字使容器保持可见性,使list发生变化时,主动通知其他线程,更新工作空间

package c_019_m;

public class MyContainer {

    //容器添加volatile关键字保持可见性
    private volatile List<Object> list = new ArrayList<>();
    
    //后续省略。。。。
}


public static void main(String[] args) {
        MyContainer container = new MyContainer();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                container.add(new Object());
                System.out.println("add " + i);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
            }
        }, "t1").start();

        new Thread(() -> {
            while (true) {
                if (container.size() == 5) {
                    break;
                }
            }
            System.out.println("监测到容器长度为5,线程2立即退出");
        }, "t2").start();

}

结果:

add 0
add 1
add 2
add 3
add 4
监测到容器长度为5线程2立即退出

这个方案虽说可以实现,但也是有一些问题存在的:

  1. 不够精确,当container.size == 5 还未执行break时,有可能被其他线程抢占;
  2. 损耗性能,t2 线程,一直在走while循环,很浪费性能

wait() 与 notify()

wait() 与 notify() 方法的调用必须在同步代码块中

wait(): 会释放当前的锁,然后让出CPU,进入等待状态。

notify/notifyAll() : 会唤醒一个或多个正处于等待状态的线程,然后继续往下执行,直到执行完synchronized 代码块的代码或是中途遇到wait() ,再次释放锁。

思路: 由于wait() 会释放锁,所以我们可以先启动监控线程t2,调用wait()释放锁,再起线程 t1,当满足条件时notify()唤醒线程t1.

代码实现:

public static void main(String[] args) {

        MyContainer3 container = new MyContainer3();

        final Object lock = new Object();

        new Thread(() -> {
            synchronized (lock) {
                System.out.println("t2 启动");
                if (container.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("监测到容器长度为5,线程2立即退出");
                lock.notify();
            }
        }, "t2").start();

        // 先启动t2线程,让t2线程进入等待状态
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        new Thread(() -> {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    container.add(new Object());
                    System.out.println("add " + i);
                    // 当长度为5时,通知 t2 进行退出
                    if (container.size() == 5) {
                        lock.notify(); // notify 不会释放锁,即便通知t2,t2也获取不到锁
                        // 可以在wait一下,将锁释放,再让t2通知t1继续执行
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
}

问题解决,但是呢,这样看起来代码是不是有点小复杂,有点绕了。。。so,继续。

CountDownLatch

使用Latch (门闩) 替代 wait notify来进行通信好处是,通信简单,同时也可以指定等待时间

使用await和countDown 方法替代 wait 和 notify

CountDownLatch不涉及锁定,当count值为0时,当前线程继续运行;

当不涉及同步,只涉及线程通信的时候,用synchronized + wait + notify 就显得太重了

public static void main(String[] args) {

        MyContainer5 container = new MyContainer5();

        // Count down 往下数  Latch 门闩
        // 门闩不能保证可见性,不是一种同步方式,只是一种线程通信方式,保证不了可见性
        // 门闩的等待,不会持有任何锁
        CountDownLatch latch = new CountDownLatch(1);//创建门闩

        new Thread(() -> {
            System.out.println("t2 启动");
            if (container.size() != 5) {
                try {
                    latch.await();
                    // 指定等待时间
                    //latch.await(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("监测到容器长度为5,线程2立即退出");
        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1 启动");
            for (int i = 0; i < 10; i++) {
                container.add(new Object());
                System.out.println("add " + i);
                // 当长度为5时,撤掉一个门闩,此时门闩为0,门会打开,即t2会执行
                if (container.size() == 5) {
                    latch.countDown();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();
}

题目测试2

题目

写一个固定容量的容器,拥有put和get方法,以及getCount方法* 能够支持2个生产者线程以及10个消费者线程的阻塞调用

容器实现

注意点:生产者消费者模式

如果调用 get方法时,容器为空,get方法就需要阻塞等待

如果调用 put方法时,容器满了,put方法就需要阻塞等待

wait/notify

package c_021_m;

public class MyContainer1<T> {
    
    private final LinkedList<T> list = new LinkedList<>();
    private final int MAX = 10;
    private int count = 0;

    public synchronized void put(T t) {
        while (MAX == count) { // 如果容量最大,释放锁等待    ///思考? 【这里为什么使用while,而不是使用if???】
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 否则 put 
        list.add(t);
        ++count;
        this.notifyAll(); // 通知消费者线程,可以消费了
        //思考? 【这里为什么调用 notifyAll 而不是 notify ?】
    }

    public synchronized T get() {
        while (list.size() == 0) { // 如果容量为空,释放锁等待  
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 否则获取
        T t = list.removeFirst();
        count--;
        this.notifyAll(); // 通知生产者线程生产
        return t;
    }
}

思考解答:

为什么使用while 而不是使用 if ???
在与wait()的配合中,百分之99的程序都是与while而不是if结合使用。
上述代码中,在容器已满的情况下,put方法会wait等待,当容器中的元素被消费者消费了一部分,就会唤醒所有put方法,
put方法会继续向下执行,直接执行list.add(t),那么多个生产者线程执行list.add() 就有可能出现数据一致性的问题。
如果使用while则会循环判断,就避免了这些问题。

不是有锁吗?为什么会需要循环判断?
wait之后,锁就会失去,再次被唤醒时,并且得到锁之后,是从list.add()开始执行的,会无判断直接加入到容器中。


为什么调用 notifyAll 而不是 notify ?
因为notify有可能再次叫醒一个生产者线程

使用Lock&Condition

使用Lock和Condition实现,可以精确唤醒某些线程,提高执行效率

package c_021_m;
public class MyContainer2<T> {

    private final LinkedList<T> list = new LinkedList<>();
    private final int MAX = 10;
    private int count = 0;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();


    public  void put(T t) {
        lock.lock();
        try {
            while (MAX == count) {
                producer.await();
            }
            list.add(t);
            ++count;
            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public  T get() {
        lock.lock();
        try {
            while (list.size() == 0) {
                producer.signalAll();
                consumer.await();
            }
            producer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        T t = list.removeFirst();
        count--;
        return t;
    }

    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        // 启动消费者线程
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 5; j++) {
                    System.out.println("容器内所剩值:"+c.count+" ; c: "+c.list.toString());
                    System.out.println(c.get());
                }
            }, "c_" + i ).start();
        }

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 2; j++) {
                    System.out.println("插入值: "+Thread.currentThread().getName() + " " + j);
                    c.put(Thread.currentThread().getName() + " " + j);
                }
            }, "p_" + i).start();
        }
    }
}


结果:

容器内所剩值0 ; c: []
插入值: p_1 0
插入值: p_1 1
容器内所剩值2 ; c: [p_1 0, p_1 1]
p_1 0
容器内所剩值1 ; c: [p_1 1]
p_1 1
容器内所剩值0 ; c: []
容器内所剩值0 ; c: []
...
插入值: p_0 0
插入值: p_0 1
p_0 0
容器内所剩值1 ; c: [p_0 1]
p_0 1
容器内所剩值0 ; c: []

相关资料

CountDownLatch相关资料

源码来源

学习视频


© Shawn Jim. All rights reserved. 本站总访问量 次, 访客数 人次.