并发容器精讲

面试杀手锏

ConcurrentHashMap、CopyOnWriteArrayList、阻塞队列

并发容器

并发容器概览

ConcurrentHashMap:线程安全的HashMap

CopyOnWriteArrayList:线程安全的List

BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道

ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的LinkedList

ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查询

古老和过时的同步容器

Vector和Hashtable

Vector

是jdk早期的一部分,目标也是提供一个线程安全的集合类,但是随着jdk的发展已经不能满足需求了,性能不够好

1
2
3
4
5
6
7
8
9
public class VectorDemo {

public static void main(String[] args) {
Vector<String> vector = new Vector<>();
vector.add("test");
System.out.println(vector.get(0));
}

}

输出结果

1
test

Vector中的很多方法都是由synchronized修饰的,在多线程下线程性能不够好

Hashtable

1
2
3
4
5
6
7
8
public class HashtableDemo {
public static void main(String[] args) {
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("学完以后跳槽涨薪幅度", "80%");
System.out.println(hashtable.get("学完以后跳槽涨薪幅度"));
}

}

输出结果

1
80%

同样,Hashtable中的很多方法都是由synchronized修饰的,在多线程下线程性能不够好

HashMap和ArrayList

虽然这两个类不是线程安全的,但是可以用Collections.synchronizedList(new ArrayList\<E>())Collections.synchronizedMap(new HashMap<K,V>)使之变成线程安全的

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 描述:演示Collections.synchronizedList(new ArrayList<E>())
*/
public class SynList {
public static void main(String[] args) {
List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
list.add(5);
System.out.println(list.get(0));

Map<Object, Object> objectObjectMap = Collections.synchronizedMap(new HashMap<>());
}
}

同样使用synchronized来修饰,与上面的方法差不多

ConcurrentHashMap和CopyOnWriteArrayList

取代同步的HashMap和同步的ArrayList

绝大多数并发情况下,ConcurrentHashMap和CopyOnWriteArrayList的性能都更好

ConcurrentHashMap

重点、面试常考

Map简介

HashMap、Hashtabe、LinkedHashMap、TreeMap

HashMap

HashMap会根据键的HashCode来存储,由于可以直接算出HashCode的值,所以可以直接定位到需要找到的位置,它的访问速度是非常快的,允许键(key)为null来写入的,值也可以是null,但是是线程不安全的

Hashtable

是历史遗留类,很多功能与HashMap是一致的,但是线程安全的,任何时刻只有一个线程能对它进行操作,不建议使用

LinkedHashMap

HashMap的一个子类,保存了记录的插入顺序,在遍历的时候有用,遍历的顺序与插入的顺序一致

TreeMap

由于实现了SortedMap接口,所以可以根据键来排序,默认是升序,也可以自定义排序条件

以上一种实现都要求key是不可变对象,即key在创建后它的hash值不会改变

Map的基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 描述: 演示Map的基本用法
*/
public class MapDemo {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
System.out.println(map.isEmpty());
map.put("东哥", 38);
map.put("西哥", 28);
System.out.println(map.keySet());
System.out.println(map.get("西哥"));
System.out.println(map.size());
System.out.println(map.containsKey("东哥"));
map.remove("东哥");
System.out.println(map.containsKey("东哥"));

}
}

输出结果

1
2
3
4
5
6
true
[东哥, 西哥]
28
2
true
false

为什么需要ConcurrentHashMap

为什么不用Collections.synchronizedMap()

它是通过锁来保证线程的安全访问的,但是由于synchronized对于并发量高的时候并不理想,所以不采用这样的方法

为什么HashMap是线程不安全的

注意:HashMap本来就不是用于并发的

同时put碰撞导致数据丢失

如果多个线程同时put,如果计算出来的hash值是一样的话,那么这几个key会放到同一个位置,但是多个线程放在同一个位置,只能保留一个线程的数据,其他线程的数据都会丢失的,所以会丢失一部分数据

同时put扩容导致数据丢失

如果多个线程同时put,并且发现需要同时扩容,那么扩容之后的数组,也只有一个会保留下来,所以也会造成某些数据的丢失

死循环造成CPU100%

HashMap在高并发下的死循环(仅在JDK7及以前存在)

原因:在多个线程同时扩容的时候会造成链表的死循环,这样一来就没有尽头了

HashMap的分析

image-20200223222538740

image-20200223222640457

红黑树

对二叉查找树BST的一种平衡策略,会自动平衡,防止极端不平衡从而影响查找效率的情况发生

特点

每个节点要么是红色的,要么是黑色的,但根节点永远是黑色的

红色节点不能连续,即红色节点的孩子和父亲都不能是红色

从任意节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点

所有的叶结点都是黑色的

HashMap关于并发的特点

  1. 非线程安全
  2. 迭代时不允许改变内容
  3. 只读的并发是安全的
  4. 如果一定要把HashMap用在并发环境,用Collections.synchronizedMap(new HashMap())

JDK1.7的ConcurrentHashMap实现和分析

image-20200223230321759
  1. Java7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法

  2. 每个segment设置了独立的ReentrantLock锁,每个segment之间互不影响,提高了并发效率

  3. ConcurrentHashMap默认有16个Segments,所以最多可以同时支持16个线程并发的写(操作分别分布在不同的Segment),这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的

JDK1.8的ConcurrentHashMap实现和分析

image-20200224095604376

putVal()流程

  1. 判断key、value不为空
  2. 计算hash值
  3. 根据对应位置节点的类型来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
  4. 检查满足阈值就红黑树化
  5. 返回oldVal

get()流程

  1. 计算hash值
  2. 找到对应的位置,根据情况进行:直接取值、或红黑树里找值、或遍历链表取值
  3. 返回找到的结果

为什么要把1.7结构改成1.8的结构

数据结构

Hash碰撞

保证并发安全

为什么超过8要转为红黑树

默认不是红黑树节点,默认是链表的形式,这是因为它所占用的内存更少

想要达到冲突为8是很难的,如果达到8就会转为红黑树,依然能提高效率

组合操作

ConcurrentHashMap为什么也不是线程安全的

replace操作

putIfAbsent

if(!map.containsKey(key)){

​ return map.put(key,value);

}else{

​ return map.get(key);

}

实际生产案例

线程安全问题需要时刻注意

CopyOnWriteArrayList

诞生的历史和原因

在JDK5引入,是List中最主要的并发工具,用来代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因是一样的

Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑

CopyOnWrite并发容器还包括CopyOnWriteArraySet,用来替代同步Set

适用场景

读操作尽可能快,而写即使慢也没有太大关系

读多写少:黑名单、每日更新、监听器,这些的迭代操作远多于修改操作

读写原则

回顾读写锁,读读共享,其他都互斥(写写互斥,读写互斥,写读互斥)

读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是写入也不会阻塞读取操作,只用写入和写入之间要进行同步等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
*/
public class CopyOnWriteArrayListDemo1 {

public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
//CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");

Iterator<String> iterator = list.iterator();

while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);

if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
*/
public class CopyOnWriteArrayListDemo1 {

public static void main(String[] args) {
//ArrayList<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");

Iterator<String> iterator = list.iterator();

while (iterator.hasNext()) {
System.out.println("list is" + list);
String next = iterator.next();
System.out.println(next);

if (next.equals("2")) {
list.remove("5");
}
if (next.equals("3")) {
list.add("3 found");
}
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
list is[1, 2, 3, 4, 5]
1
list is[1, 2, 3, 4, 5]
2
list is[1, 2, 3, 4]
3
list is[1, 2, 3, 4, 3 found]
4
list is[1, 2, 3, 4, 3 found]
5

实现原理

CopyOnWrite的含义

创建新副本,读写分离

不可变原理

迭代时候

缺点

数据一致性问题

CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性,所以如果希望写入的数据马上能读到,就不要使用CopyOnWrite容器

内存占用问题

因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时保存两个对象的内存

源码分析

数据结构

get

add

并发队列Queue

阻塞队列、非阻塞队列

为什么要使用队列

用队列可以在线程间传递数据:生产者消费者模式、银行转账

考虑锁等线程安全问题的重任从用户转移到了队列上

并发队列简介

Queue与BlockingQueue是JDK5中新增的,Queue只用来保存一组等待处理的数据,它会有很多种实现,底层是LinkedList,Queue只能从头或者尾来获取数据,不能从中间获取数据

BlockingQueue增加了可阻塞的插入和获取操作,若队列为空,取的操作会一直阻塞,直到里面有了数据,如果队列满了,插入也插入不进去,也会一直阻塞,直到有用户把其中的数据取出来,这体现了生产者消费者模式

各并发队列关系图

image-20200224174734549

阻塞队列

BlockingQueue

什么是阻塞队列

阻塞队列是具有阻塞功能的队列,所以它首先是一个队列,其次是具有阻塞功能

通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用,阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的

image-20200224185451780

阻塞功能:最有特色的两个带有阻塞功能的方法是take()方法和put()方法

take()方法:获取并移除队列的头结点,一旦在执行take的时候,队列里无数据,则阻塞,直到队列里有数据

image-20200224191055479

put()方法:插入元素,但是如果队列已满,那么就无法继续插入,并且阻塞,直到队列里有了空闲空间

image-20200224192035440

是否有界(容量有多大)

这是一个非常重要的属性,无界队列意味着里面可以容纳非常多的数据,Integer.MAX_VALUE,约为2的31次方,可以近似认为是无限容量

线程池与阻塞队列的关系:阻塞队列是线程池的重要组成部分

BlockingQueue的主要方法

put、take(会阻塞住)

add、remove、element(会抛出异常)

offer(添加元素)、poll(取出元素)、peek

主要方法介绍

ArrayBlockingQueue

有界

指定容量

公平:可以指定是否需要保证公平,如果要保证公平的话,那么等待了最长时间的线程就会被优先处理,不过这同时会带来一定的性能损耗

使用案例

有10个面试者,但只有1个面试官,大厅里有三个位置休息,每个人面试时间是10秒,模拟所有人面试的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 描述: TODO
*/
public class ArrayBlockingQueueDemo {


public static void main(String[] args) {

ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);

Interviewer r1 = new Interviewer(queue);
Consumer r2 = new Consumer(queue);
new Thread(r1).start();
new Thread(r2).start();
}
}

class Interviewer implements Runnable {

BlockingQueue<String> queue;

public Interviewer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
System.out.println("10个候选人都来啦");
for (int i = 0; i < 10; i++) {
String candidate = "Candidate" + i;
try {
queue.put(candidate);
System.out.println("安排好了" + candidate);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put("stop");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Consumer implements Runnable {

BlockingQueue<String> queue;

public Consumer(BlockingQueue queue) {

this.queue = queue;
}

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg;
try {
while(!(msg = queue.take()).equals("stop")){
System.out.println(msg + "到了");
Thread.sleep(10000);
}
System.out.println("所有候选人都结束了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
10个候选人都来啦
安排好了Candidate0
安排好了Candidate1
安排好了Candidate2
安排好了Candidate3
Candidate0到了
Candidate1到了
安排好了Candidate4
Candidate2到了
安排好了Candidate5
Candidate3到了
安排好了Candidate6
Candidate4到了
安排好了Candidate7
Candidate5到了
安排好了Candidate8
Candidate6到了
安排好了Candidate9
Candidate7到了
Candidate8到了
Candidate9到了
所有候选人都结束了

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

无界,容量能达到Integer.MAX_VALUE

内部结构:Node、两把锁

put()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

PriorityBlockingQueue

支持优先级

自然排序,而不是先进先出

无界队列

是PriorityQueue的线程安全版本

SynchronousQueue

容量为0,值得注意的是,SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要持有元素,它所做的就是直接传递(direct handoff),因此效率很高

image-20200225082849924

SynchronousQueue没有peek函数,因为peek的含义是取出头结点,但是SynchronousQueue的容量为0,所以连头结点都没有,也就没有peek方法,同理,没有iterate相关方法

因此它是一个极好的用来直接传递的并发数据结构

SynchronousQueue是线程池Executors.newCachedThreadPool()使用的阻塞队列

非阻塞队列

ConcurrentLinkedQueue

并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景,用的相对比较少

源码中的offer体现了CAS的思想,内有p.casNext方法,用了Unsafe.compareAndSwapObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

如何选择适合自己的队列

边界

空间

吞吐量

各并发容器总结

Java.util.concurrent包提供的容器分为3类,Concurrent*、CopyOnWrite*、Blocking*

Concurrent*的特点是大部分通过CAS实现并发的

CopyOnWrite*则是通过复制一份原数据来实现的

Blocking*通过AQS实现的

文章目录
  1. 1. 并发容器精讲
    1. 1.1. 并发容器
      1. 1.1.1. 并发容器概览
    2. 1.2. 古老和过时的同步容器
      1. 1.2.1. Vector和Hashtable
      2. 1.2.2. HashMap和ArrayList
      3. 1.2.3. ConcurrentHashMap和CopyOnWriteArrayList
    3. 1.3. ConcurrentHashMap
      1. 1.3.1. Map简介
        1. 1.3.1.1. HashMap
        2. 1.3.1.2. Hashtable
        3. 1.3.1.3. LinkedHashMap
      2. 1.3.2. TreeMap
      3. 1.3.3. Map的基本用法
      4. 1.3.4. 为什么需要ConcurrentHashMap
        1. 1.3.4.1. 为什么不用Collections.synchronizedMap()
        2. 1.3.4.2. 为什么HashMap是线程不安全的
      5. 1.3.5. HashMap的分析
      6. 1.3.6. JDK1.7的ConcurrentHashMap实现和分析
      7. 1.3.7. JDK1.8的ConcurrentHashMap实现和分析
      8. 1.3.8. 为什么要把1.7结构改成1.8的结构
      9. 1.3.9. 组合操作
      10. 1.3.10. 实际生产案例
    4. 1.4. CopyOnWriteArrayList
      1. 1.4.1. 诞生的历史和原因
      2. 1.4.2. 适用场景
      3. 1.4.3. 读写原则
      4. 1.4.4. 实现原理
        1. 1.4.4.1. CopyOnWrite的含义
      5. 1.4.5. 缺点
      6. 1.4.6. 源码分析
    5. 1.5. 并发队列Queue
      1. 1.5.1. 为什么要使用队列
      2. 1.5.2. 并发队列简介
      3. 1.5.3. 各并发队列关系图
      4. 1.5.4. 阻塞队列
        1. 1.5.4.1. 什么是阻塞队列
        2. 1.5.4.2. BlockingQueue的主要方法
        3. 1.5.4.3. 主要方法介绍
        4. 1.5.4.4. ArrayBlockingQueue
        5. 1.5.4.5. LinkedBlockingQueue
        6. 1.5.4.6. PriorityBlockingQueue
        7. 1.5.4.7. SynchronousQueue
      5. 1.5.5. 非阻塞队列
      6. 1.5.6. 如何选择适合自己的队列
    6. 1.6. 各并发容器总结
|