多线程高并发编程 - 并发容器(二)
本篇主要总结了:线程安全的单例模式和并发容器。其中并发容器包含:ConcurrentHashMap、ConcurrentSkipListMap、CopyOnWriteArrayList和队列相关的内部加锁的并发队列ConcurrentLinkedQueue 以及阻塞队列BlockingQueue (LinkedBlockingQueue、ArrayBlockingQueue 、DelayQueue 、TransferQueue、SynchronizedQueue )
线程安全的单例模式
不使用同步锁
package c_023;
public class Singleton1 {
//初识话一个示例对象
private static Singleton1 singleton1 = new Singleton1();
//private 类型的构造函数,保证其他对象不能直接new一个该对象示例
private Singleton1() {
}
/**
* 该类唯一的一个public方法
* @return
*/
public static Singleton1 getInstance(){
return singleton1;
}
}
缺点: 该类加载时会直接new一个静态对象,当一个系统中这样的类较多时,会使得启动速度变慢。
现在流行的设计都是将“延迟加载”,这样我们可以在第一次使用时才初始化该类对象。
所以这种方式只适合在小系统使用。
使用同步锁
package c_023;
public class Singleton2 {
private static Singleton2 singleton2;
/**
* private 修饰构造方法
*/
private Singleton2() {
}
/**
* 使用 synchronized 修饰该方法,保证只有一个线程能够构建一个实例对象
* @return
*/
public static synchronized Singleton2 getInstance(){
if(singleton2 == null){
singleton2 = new Singleton2();
}
return singleton2;
}
}
缺点: 一次锁住了一个方法, 这个粒度有点大
改进: 只锁住其中的new语句就OK。就是所谓的“双重锁”机制。
使用双重同步锁
package c_023;
public class Singleton3 {
private static Singleton3 singleton3;
private Singleton3() {
}
/**
* 对获取示例的方法进行同步
* @return
*/
public static Singleton3 getInstance(){
if(singleton3 == null){
synchronized (Singleton3.class){
if(singleton3 == null){
singleton3 = new Singleton3();
}
}
}
return singleton3;
}
}
内部类实现
package c_023;
public class Singleton {
private Singleton() {
}
private static class Inner {
private static Singleton s = new Singleton();
}
public static Singleton getInstance() {
return Inner.s;
}
}
更好的是采用这种方式,既不用加锁,也能实现懒加载
示例代码 c_023部分
并发容器
map/set相关
不加锁
HashMap
TreeMap
LinkedHashMap
加锁且并发性不是特别高
collect.synchronizedXXX()
提供一系列方法给容器添加锁
package c_025;
public class CollectionsSychronizedXX {
public static void main(String[] args) {
Map map = new HashMap(); //创建一个容器
Map synchronizedMap = Collections.synchronizedMap(map); //调用方法给map加锁
}
}
Hashtable
所有实现都是带锁的
加锁且并发性比较高
ConcurrentHashMap
使用分段锁,多线程情况下效率比Hashtable高
容器被分为16段。多个线程可以同时并发的往里面插入数据,所以在多线程情况下,效率会比HashTable
高
加锁且并发性比较高要求排序
ConcurrentSkipListMap(并发跳表容器)
高并发,并且需要排序的情况下
效率比较代码:
package c_025;
/**
* skipMap: https://blog.csdn.net/sunxianghuang/article/details/52221913
*/
public class T01_ConcurrentMap {
public static void main(String[] args) {
//Map<String, String> map = new HashMap<>();
//Map<String, String> map = new Hashtable<>(); // 423 每次加锁,都锁一个对象
//Map<String, String> map = new ConcurrentHashMap<>(); // 309,加的是分段所,将容器分为16段,每段都有一个锁 segment; 1.8以后 使用 Node + synchronized+CAS
Map<String, String> map = new ConcurrentSkipListMap<>(); // 317 并发且排序,插入效率较低,但是读取很快
Random r = new Random();
Thread[] ths = new Thread[100];
CountDownLatch latch = new CountDownLatch(ths.length); // 启动了一个门闩,每有一个线程退出,门闩就减1,直到所有线程结束,门闩打开,主线程结束
long start = System.currentTimeMillis();
// 创建100个线程,每个线程添加10000个元素到map,并启动这些线程
for (int i = 0; i < ths.length; i++) {
ths[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
map.put("a" + r.nextInt(10000), "a" + r.nextInt(100000));
}
latch.countDown();
}, "t" + i);
}
Arrays.asList(ths).forEach(Thread::start);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(map.size());
}
}
队列相关
同步队列
ConcurrentLinkedQueue
LinkedQueue 无界队列,
offer(): 插入值时并不会抛出异常,会通过boolean类型的返回值告诉你是否插入成功
poll(): 将头值取出来并删除
peek(): 将第一个值取出来并且不删除
package c_025;
/**
* 同步队列,ConcurrentQueue
*/
public class T04_ConcurrentQueue {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>(); // LinkedQueue,无界队列
for (int i = 0; i < 10; i++) {
queue.offer("a" + i); // 有返回值,返回false代表没有加入成功,true 代表成功,并且此方法不会阻塞
}
System.out.println(queue);
System.out.println(queue.size());
System.out.println(queue.poll()); // 取出队头
System.out.println(queue.size());
System.out.println(queue.peek()); // 取出队头,但是不删除队头
System.out.println(queue.size());
}
}
结果:
[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
10
a0 //去除内容并且删除
9
a1 //只是取出内容
9
BlockingQueue(阻塞式队列)
put(): 如果满了,线程就会等待
take(): 如果空了,线程就会等待
linkedBlokingQueue
无界队列
示例代码:
package c_025;
public class T05_LinkedBlockingQueue {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 启动生产者线程生产
new Thread(() -> {
for (int j = 0; j < 100; j++) {
try {
queue.put("aaa" + j); // put 方法,给容器添加元素,如果容器已经满了,则会阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 启用消费者线程消费
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + ":" + queue.take()); // 从队列中拿数据,如果空了,则会阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
}
}
ArrayBlockingQueue
有界队列
示例代码:
package c_025;
/**
* 使用阻塞有界同步队列 ArrayBlockingQueue 完成生产者消费者模式
*/
public class T06_ArrayBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue<>(10);//初识10个容量队列
for (int i = 0; i < 10; i++) {
queue.put("a" + i);
}
//queue.put("a11"); // 会阻塞
//queue.add("a11"); // 会抛出异常
//System.out.println(queue.offer("a11")); // 会返回false
System.out.println(queue.offer("a11", 1, TimeUnit.SECONDS)); // 会等待1s,返回false, 如果1s内有空闲,则添加成功后返回true
}
}
DelayQueue(延迟队列)
是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中
出队有个时间限制, 每个元素有一个等待时间, 可以按照等待时间排序元素DelayQueue元素必须为 Delayed类型的,即必须设置元素的等待时间
可以用来做定时任务:
package c_025;
public class T07_DelayQueue {
/**
* 定时类
*/
static class MyTask implements Delayed {
private long runningTime;
public MyTask(long runTime) {
this.runningTime = runTime;
}
// 这是每个元素的等待时间, 越是后加入的元素,时间等待的越长
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 这是排序规律, 执行等待时间最短的排在上面
@Override
public int compareTo(Delayed o) {
return (int) (o.getDelay(TimeUnit.MILLISECONDS) - this.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return runningTime + "";
}
}
public static void main(String[] args) throws InterruptedException {
long timestamp = System.currentTimeMillis();
MyTask myTask1 = new MyTask(timestamp + 1000); // 1s后执行
MyTask myTask2 = new MyTask(timestamp + 2000);
MyTask myTask3 = new MyTask(timestamp + 1500);
MyTask myTask4 = new MyTask(timestamp + 2500);
MyTask myTask5 = new MyTask(timestamp + 500);
DelayQueue<MyTask> tasks = new DelayQueue<>();
tasks.put(myTask1);
tasks.put(myTask2);
tasks.put(myTask3);
tasks.put(myTask4);
tasks.put(myTask5);
System.out.println(tasks); // 确实按照我们拍的顺序执行的
for (int i = 0; i < tasks.size(); i++) {
System.out.println(tasks.take());
}
}
}
结果:
[1565690284606, 1565690284106, 1565690283606, 1565690283106, 1565690282606]
1565690284606 //按插入顺序取
1565690284106
1565690283606
TransferQueue
拥有transfer方法,传输,当transfer一个元素时,如果有take方法阻塞等待获取元素,则不向队列中保存,直接给等待的take方法
使用情景:如果将元素放入队列,再拿给消费者线程,太慢了,如果需要的效率更高,可以使用TransferQueue来解决更高的并发
transfer():
package c_025;
public class T08_TransferQueue {
public static void main(String[] args) {
TransferQueue mq = new LinkedTransferQueue();
// 先让消费者线程等待
new Thread(() -> {
try {
System.out.println(mq.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 再让生产者线程生产
try {
mq.transfer("aaa"); // put add 都不会阻塞,会添加到容器中,只有transfer才有此种功能(等待消费者直接获取),所以transfer是有容量的
} catch (InterruptedException e) {
e.printStackTrace();
}
/*new Thread(() -> {
try {
System.out.println(mq.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/
}
}
SynchronousQueue
容量为0的队列,一种特殊的TransferQueue
无法使用add() /put() 方法向容器中添加元素
package c_025;
public class T09_SynchronousQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new SynchronousQueue();
new Thread(() -> {
try {
System.out.println(queue.take()); // 取不到就阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//queue.add("aaa"); // IllegalStateException: Queue full 抛出异常,因为没有容量
queue.put("aaa"); // 会阻塞等待消费者线程获取,内部是transfer
System.out.println(queue.size()); // 长度为0
}
}
CopyOnWriteList(写时复制容器)
当发生写操作(添加、删除、修改)时,容器就会复制原有容器一份然后对新操作进行写操作,然后再将引用转向新的容器
package c_025;
public class T02_CopyOnWriteList {
public static void main(String[] args) {
List<String> list =
//new ArrayList<>(); //会出现并发问题
//new Vector<>();
new CopyOnWriteArrayList<>(); // 写速极慢,读取快
Random r = new Random();
Thread[] ths = new Thread[100];
for (int i = 0; i < ths.length; i++) {
Runnable task = () -> {
for (int j = 0; j < 1000; j++) {
list.add("a" + r.nextInt(100));
}
};
ths[i] = new Thread(task);
}
runAndComputeTime(ths);
System.out.println(list.size());
}
static void runAndComputeTime(Thread[] ths) {
long start = System.currentTimeMillis();
Arrays.asList(ths).forEach(Thread::start);
Arrays.asList(ths).forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
好处:保证读操作不需要锁也能正常访问,是一种读写分离的实现方式
缺点:写的效率极低,特定场景下才会使用到
总结
对于map/set的选择使用
-
不需要使用多线程的情况下
HashMap
TreeMap
LinkedHashMap
-
并发量比较小的情况下
Hashtable
Collections.sychronizedXXX()
-
高并发情况下:
ConcurrentHashMap
同时要求排序
ConcurrentSkipListMap
对于队列的选择使用
-
不需要同步的队列
ArrayList
LinkedList
-
需要同步的队列,且并发量比较低
Collections.synchronizedXXX()
Vector
-
需要同步的队列,且并发量比较高
ConcurrentLinkedQueue
阻塞式队列
LinkedBlockingQueue(无界队列)
ArrayBlockingQueue(有界队列)
TransferQueue(直接将内容交给消费者执行)
SynchronusQueue(特殊的TransferQueue,容量为0)
-
执行定时任务队列
DelayQueue
-
写的时候非常少,读的时候非常多
CopyOnWriteList