Shawn's Blog

蒸汽兔

多线程高并发编程 - 并发容器(二)

字数:2734 字 阅读时长:约 15 分钟 阅读

本篇主要总结了:线程安全的单例模式和并发容器。其中并发容器包含:ConcurrentHashMap、ConcurrentSkipListMap、CopyOnWriteArrayList和队列相关的内部加锁的并发队列ConcurrentLinkedQueue 以及阻塞队列BlockingQueue (LinkedBlockingQueue、ArrayBlockingQueue 、DelayQueue 、TransferQueue、SynchronizedQueue )

线程安全的单例模式

不使用同步锁

package c_023;
public class Singleton1 {

    //初识话一个示例对象
    private static Singleton1 singleton1 = new Singleton1();

    //private 类型的构造函数,保证其他对象不能直接new一个该对象示例
    private Singleton1() {

    }

    /**
     * 该类唯一的一个public方法
     * @return
     */
    public static Singleton1 getInstance(){
        return singleton1;
    }
}

缺点: 该类加载时会直接new一个静态对象,当一个系统中这样的类较多时,会使得启动速度变慢。

现在流行的设计都是将“延迟加载”,这样我们可以在第一次使用时才初始化该类对象。

所以这种方式只适合在小系统使用。

使用同步锁

package c_023;
public class Singleton2 {
    private static Singleton2 singleton2;

    /**
     * private 修饰构造方法
     */
    private  Singleton2() {

    }

    /**
     * 使用 synchronized 修饰该方法,保证只有一个线程能够构建一个实例对象
     * @return
     */
    public static synchronized Singleton2 getInstance(){
        if(singleton2 == null){
            singleton2 = new Singleton2();
        }
        return singleton2;
    }
}

缺点: 一次锁住了一个方法, 这个粒度有点大

改进: 只锁住其中的new语句就OK。就是所谓的“双重锁”机制。

使用双重同步锁

package c_023;
public class Singleton3 {
    private static Singleton3 singleton3;

    private Singleton3() {
    }

    /**
     * 对获取示例的方法进行同步
     * @return
     */
    public static Singleton3 getInstance(){
        if(singleton3 == null){
            synchronized (Singleton3.class){
                if(singleton3 == null){
                    singleton3 = new Singleton3();
                }
            }
        }
        return singleton3;
    }
}

内部类实现

package c_023;

public class Singleton {
    
    private Singleton() {
    }
    
    private static class Inner {
        private static Singleton s = new Singleton();
    }

    public static Singleton getInstance() {
        return Inner.s;
    }
    
}

更好的是采用这种方式,既不用加锁,也能实现懒加载

示例代码 c_023部分

并发容器

map/set相关

不加锁

HashMap

TreeMap

LinkedHashMap

加锁且并发性不是特别高

collect.synchronizedXXX()

提供一系列方法给容器添加锁

package c_025;

public class CollectionsSychronizedXX {

    public static void main(String[] args) {
        Map map = new HashMap(); //创建一个容器
        Map synchronizedMap = Collections.synchronizedMap(map); //调用方法给map加锁
    }
}

Hashtable

所有实现都是带锁的

加锁且并发性比较高

ConcurrentHashMap

使用分段锁,多线程情况下效率比Hashtable高

容器被分为16段。多个线程可以同时并发的往里面插入数据,所以在多线程情况下,效率会比HashTable

加锁且并发性比较高要求排序

ConcurrentSkipListMap(并发跳表容器)

高并发,并且需要排序的情况下

效率比较代码:

package c_025;
/**
 * skipMap: https://blog.csdn.net/sunxianghuang/article/details/52221913
 */
public class T01_ConcurrentMap {

    public static void main(String[] args) {

        //Map<String, String> map = new HashMap<>(); 
        //Map<String, String> map = new Hashtable<>(); // 423  每次加锁,都锁一个对象
        //Map<String, String> map = new ConcurrentHashMap<>(); // 309,加的是分段所,将容器分为16段,每段都有一个锁 segment; 1.8以后 使用 Node + synchronized+CAS
        Map<String, String> map = new ConcurrentSkipListMap<>(); // 317  并发且排序,插入效率较低,但是读取很快
        Random r = new Random();
        Thread[] ths = new Thread[100];
        CountDownLatch latch = new CountDownLatch(ths.length); // 启动了一个门闩,每有一个线程退出,门闩就减1,直到所有线程结束,门闩打开,主线程结束
        
        long start = System.currentTimeMillis();
        // 创建100个线程,每个线程添加10000个元素到map,并启动这些线程
        for (int i = 0; i < ths.length; i++) {
            ths[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    map.put("a" + r.nextInt(10000), "a" + r.nextInt(100000));
                }
                latch.countDown();
            }, "t" + i);
        }
        Arrays.asList(ths).forEach(Thread::start);

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long end = System.currentTimeMillis();
        System.out.println(end - start);
        System.out.println(map.size());
    }
    
}

队列相关

同步队列

ConcurrentLinkedQueue

LinkedQueue 无界队列,

offer(): 插入值时并不会抛出异常,会通过boolean类型的返回值告诉你是否插入成功

poll(): 将头值取出来并删除

peek(): 将第一个值取出来并且不删除

package c_025;

/**
 * 同步队列,ConcurrentQueue
 */
public class T04_ConcurrentQueue {


    public static void main(String[] args) {
        Queue<String> queue = new ConcurrentLinkedQueue<>(); // LinkedQueue,无界队列

        for (int i = 0; i < 10; i++) {
            queue.offer("a" + i); // 有返回值,返回false代表没有加入成功,true 代表成功,并且此方法不会阻塞
        }

        System.out.println(queue);
        System.out.println(queue.size());

        System.out.println(queue.poll()); // 取出队头
        System.out.println(queue.size());

        System.out.println(queue.peek()); // 取出队头,但是不删除队头
        System.out.println(queue.size());
    }
    
}

结果:

[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
10
a0 //去除内容并且删除
9
a1 //只是取出内容
9
BlockingQueue(阻塞式队列)

put(): 如果满了,线程就会等待

take(): 如果空了,线程就会等待

linkedBlokingQueue

无界队列

示例代码:

package c_025;

public class T05_LinkedBlockingQueue {

    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        // 启动生产者线程生产
        new Thread(() -> {
            for (int j = 0; j < 100; j++) {
                try {
                    queue.put("aaa" + j); // put 方法,给容器添加元素,如果容器已经满了,则会阻塞等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "p").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 启用消费者线程消费
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                while (true) {
                    try {
                        System.out.println(Thread.currentThread().getName() + ":" + queue.take()); // 从队列中拿数据,如果空了,则会阻塞等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "c" + i).start();
        }

    }

}
ArrayBlockingQueue

有界队列

示例代码:

package c_025;
/**
 * 使用阻塞有界同步队列 ArrayBlockingQueue 完成生产者消费者模式
 */
public class T06_ArrayBlockingQueue {
    public static void main(String[] args) throws InterruptedException {

        BlockingQueue queue = new ArrayBlockingQueue<>(10);//初识10个容量队列

        for (int i = 0; i < 10; i++) {
            queue.put("a" + i);
        }

        //queue.put("a11"); // 会阻塞
        //queue.add("a11"); // 会抛出异常
        //System.out.println(queue.offer("a11")); // 会返回false
        System.out.println(queue.offer("a11", 1, TimeUnit.SECONDS)); // 会等待1s,返回false, 如果1s内有空闲,则添加成功后返回true
    }
}

DelayQueue(延迟队列)

是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中

出队有个时间限制, 每个元素有一个等待时间, 可以按照等待时间排序元素DelayQueue元素必须为 Delayed类型的,即必须设置元素的等待时间

可以用来做定时任务:

package c_025;

public class T07_DelayQueue {

    /**
    * 定时类
    */
    static class MyTask implements Delayed {
        private long runningTime;

        public MyTask(long runTime) {
            this.runningTime = runTime;
        }
        
        // 这是每个元素的等待时间, 越是后加入的元素,时间等待的越长
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 这是排序规律, 执行等待时间最短的排在上面
        @Override
        public int compareTo(Delayed o) {
            return (int) (o.getDelay(TimeUnit.MILLISECONDS) - this.getDelay(TimeUnit.MILLISECONDS));
        }
        
        @Override
        public String toString() {
            return runningTime + "";
        }
    }

    
    public static void main(String[] args) throws InterruptedException {
        long timestamp = System.currentTimeMillis();
        MyTask myTask1 = new MyTask(timestamp + 1000); // 1s后执行
        MyTask myTask2 = new MyTask(timestamp + 2000);
        MyTask myTask3 = new MyTask(timestamp + 1500);
        MyTask myTask4 = new MyTask(timestamp + 2500);
        MyTask myTask5 = new MyTask(timestamp + 500);

        DelayQueue<MyTask> tasks = new DelayQueue<>();
        tasks.put(myTask1);
        tasks.put(myTask2);
        tasks.put(myTask3);
        tasks.put(myTask4);
        tasks.put(myTask5);

        System.out.println(tasks);  // 确实按照我们拍的顺序执行的

        for (int i = 0; i < tasks.size(); i++) {
            System.out.println(tasks.take());
        }
    }
}

结果:

[1565690284606, 1565690284106, 1565690283606, 1565690283106, 1565690282606]
1565690284606 //按插入顺序取 
1565690284106
1565690283606
TransferQueue

拥有transfer方法,传输,当transfer一个元素时,如果有take方法阻塞等待获取元素,则不向队列中保存,直接给等待的take方法

使用情景:如果将元素放入队列,再拿给消费者线程,太慢了,如果需要的效率更高,可以使用TransferQueue来解决更高的并发

transfer():

package c_025;
public class T08_TransferQueue {
    public static void main(String[] args) {       
        TransferQueue mq = new LinkedTransferQueue();       
        // 先让消费者线程等待
        new Thread(() -> {
            try {
                System.out.println(mq.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 再让生产者线程生产
        try {
            mq.transfer("aaa");  // put add 都不会阻塞,会添加到容器中,只有transfer才有此种功能(等待消费者直接获取),所以transfer是有容量的
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /*new Thread(() -> {
            try {
                System.out.println(mq.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();*/
    }

}

SynchronousQueue

容量为0的队列,一种特殊的TransferQueue

无法使用add() /put() 方法向容器中添加元素

package c_025;

public class T09_SynchronousQueue {

    public static void main(String[] args) throws InterruptedException {
        
        BlockingQueue queue = new SynchronousQueue();
        
        new Thread(() -> {
            try {
                System.out.println(queue.take()); // 取不到就阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        //queue.add("aaa"); // IllegalStateException: Queue full  抛出异常,因为没有容量
        queue.put("aaa");  // 会阻塞等待消费者线程获取,内部是transfer
        
        System.out.println(queue.size()); // 长度为0 
    }
}

CopyOnWriteList(写时复制容器)

当发生写操作(添加、删除、修改)时,容器就会复制原有容器一份然后对新操作进行写操作,然后再将引用转向新的容器

package c_025;
public class T02_CopyOnWriteList {
    public static void main(String[] args) {
        List<String> list =
                //new ArrayList<>(); //会出现并发问题
                //new Vector<>(); 
                new CopyOnWriteArrayList<>();  // 写速极慢,读取快

        Random r = new Random();
        Thread[] ths = new Thread[100];

        for (int i = 0; i < ths.length; i++) {
            Runnable task = () -> {
                for (int j = 0; j < 1000; j++) {
                    list.add("a" + r.nextInt(100));
                }
            };
            ths[i] = new Thread(task);

        }
        runAndComputeTime(ths);
        System.out.println(list.size());
        
    }

    static void runAndComputeTime(Thread[] ths) {
        long start = System.currentTimeMillis();
        Arrays.asList(ths).forEach(Thread::start);
        Arrays.asList(ths).forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }
    
}

好处:保证读操作不需要锁也能正常访问,是一种读写分离的实现方式

缺点:写的效率极低,特定场景下才会使用到

总结

对于map/set的选择使用

  1. 不需要使用多线程的情况下

    HashMap

    TreeMap

    LinkedHashMap

  2. 并发量比较小的情况下

    Hashtable

    Collections.sychronizedXXX()

  3. 高并发情况下:

    ConcurrentHashMap

    同时要求排序

    ConcurrentSkipListMap

对于队列的选择使用

  1. 不需要同步的队列

    ArrayList

    LinkedList

  2. 需要同步的队列,且并发量比较低

    Collections.synchronizedXXX()

    Vector

  3. 需要同步的队列,且并发量比较高

    ConcurrentLinkedQueue

    阻塞式队列

    LinkedBlockingQueue(无界队列)

    ArrayBlockingQueue(有界队列)

    TransferQueue(直接将内容交给消费者执行)

    SynchronusQueue(特殊的TransferQueue,容量为0)

  4. 执行定时任务队列

    DelayQueue

  5. 写的时候非常少,读的时候非常多

    CopyOnWriteList

相关资料

源码来源

学习视频


© Shawn Jim. All rights reserved. 本站总访问量 次, 访客数 人次.