并发编程-进阶

锁机制

乐观锁与悲观锁

悲观锁

悲观锁指对数据被外界修改持保守态度,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制,数据库中实现是对数据记录操作前给记录加排它锁。如果获取锁失败,则说明数据正在被其它线程修改,则等待或者抛出异常。如果加锁成功,则获取记录,对其修改,然后事务提交后释放排它锁。

使用悲观锁的一个常用的例子: select * from 表 where .. for update;

乐观锁

它认为数据一般情况下不会造成冲突,所以在访问记录前不会加排它锁,而是在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测。具体说是根据 update 返回的行数让用户决定如何去做。

update 表 set comment='***',status='operator',version=version+1 where version = 1 and id = 1;

乐观锁并不会使用数据库提供的锁机制,一般在表添加 version 字段或者使用业务状态来做。乐观锁直到提交的时候才去锁定,所以不会产生任何锁和死锁。

公平锁与非公平锁

根据线程获取锁的抢占机制锁可以分为公平锁和非公平锁,公平锁表示线程获取锁的顺序是按照线程请求锁的时间长短来分决定的的,也就是最早获取锁的线程将最早获取到锁,也就是先来先得的 FIFO 顺序。而非公平锁则运行时候闯入,也就是先来不一定先得。

ReentrantLock 提供了公平和非公平锁的实现:

  • 公平锁:ReentrantLock pairLock = new ReentrantLock(true);
  • 非公平锁:ReentrantLock pairLock = new ReentrantLock(false);

如果构造函数不传递参数,则默认是非公平锁。

具体来说假设线程 A 已经持有了锁,这时候线程 B 请求该锁将会被挂起,当线程 A 释放锁后,假如当前有线程 C 也需要获取该锁,如果采用非公平锁方式,则根据线程调度策略线程 B 和 C 两者之一可能获取锁,这时候不需要任何其它干涉,如果使用公平锁则需要把 C 挂起,让 B 获取当前锁。

在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销。

独占锁与共享锁

根据锁只能被单个线程持有还是能被多个线程共同持有,锁分为独占锁和共享锁。

独占锁保证任何时候都只有一个线程能得到锁,ReentrantLock 就是以独占方式实现的。共享锁则同时有多个线程可以持有,例如 ReadWriteLock 读写锁,它允许一个资源可以被多线程同时进行读操作。

独占锁是一种悲观锁,每次访问资源都先加上互斥锁,这限制了并发性,因为读操作并不会影响数据一致性,而独占锁只允许同时一个线程读取数据,其它线程必须等待当前线程释放锁才能进行读取。

共享锁则是一种乐观锁,它放宽了加锁的条件,允许多个线程同时进行读操作。

可重入锁

当一个线程要获取一个被其它线程持有的独占锁时候,该线程会被阻塞,那么当一个线程再次获取它自己已经获取的锁时候是否会被阻塞那?如果不被阻塞,那么我们说该锁是可重入的,也就是只要该线程获取了该锁,那么可以无限制次数(高级篇我们会知道严格来说是有限次数)进入被该锁锁住的代码。

实际上 synchronized 内部锁是可重入锁,可重入锁的原理是在锁内部维护了一个线程标示,用来标示该锁目前被那个线程占用,然后关联一个计数器。一开始计数器值为0,说明该锁没有被任何线程占用,当一个线程获取了该锁,计数器会变成1,其它线程在获取该锁时候发现锁的所有者不是自己就会被阻塞挂起。

但是当获取该锁的线程再次获取锁时候发现锁拥有者是自己,就会把计数器值+1, 当释放锁后计数器会-1,当计数器为0时候,锁里面的线程标示重置为 null,这时候阻塞的线程会获取被唤醒来竞争获取该锁。

锁消除和锁粗化

  • 锁消除:指虚拟机即时编译器在运行时,对一些代码上要求同步,但被检测到不可能存在共享数据竞争的锁进行消除。主要根据逃逸分析。

程序员怎么会在明知道不存在数据竞争的情况下使用同步呢?很多不是程序员自己加入的。

  • 锁粗化:原则上,同步块的作用范围要尽量小。但是如果一系列的连续操作都对同一个对象反复加锁和解锁,甚至加锁操作在循环体内,频繁地进行互斥同步操作也会导致不必要的性能损耗。

锁粗化就是增大锁的作用域。

线程安全集合

ConcurrentHashMap 在 1.8 中的实现,相比于 1.7 的版本变动很大。

  • 取消了 Segment 分段锁的数据结构,取而代之的是数组 + 链表 + 红黑树的结构。而对于锁的粒度,调整为对每个组元素加锁即对每个链表的头节点( Node)用 Synchronized 加锁,即如果要写入链表,必须获得这个链表的头节点的对象锁。然后是定位节点的 Hash 算法被简化了,这样带来的弊端是 Hash 冲突会加剧。因此在链表节点数量大于 8 时,会将链表转化为红黑树进行存储。这样一来,查询的时间复杂度就会由原先的 O(n) 变为 O(logN)。
  • 在第一次添加元素的时候,默认初期长度为 16,当往 map 中继续添加元素的时候,通过 hash 值跟数组长度进行与操作来决定放在数组的哪个位置。如果出现放在同一个位置的时候,优先以链表的形式存放,在同一个位置的个数又达到了 8 个以上,如果数组的长度还小于 64 的时候,则会扩容数组。如果数组的长度大于等于 64 了的话,在会将该节点的链表转换成树。通过扩容数组的方式来把这些节点给分散开。
  • 然后将这些元素复制到扩容后的新的数组中,同一个链表中的元素通过 Hash 值的数组长度位来区分,是还是放在原来的位置还是放到扩容的长度的相同位置去 。在扩容完成之后,如果某个节点的是树,同时现在该节点的个数又小于等于 6 个了,则会将该树转为链表。记住,不是少于 8 个就转为链表哦

ConcurrentHashMap 数据存储的过程。其保证线程安全主要通过 Node + CAS + Synchronized

CAS

CAS 即 Compare And Swap,是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较-更新操作的原子性,JDK 里面的 Unsafe 类提供了一些列的 compareAndSwap 方法,下面以 compareAndSwapLong 为例进行简单介绍。

  • boolean compareAndSwapLong(Object obj,long valueOffset,long expect, long update)方法。

compareAndSwap 的意思也就是比较并交换,CAS 有四个操作数分别为:对象内存位置,对象中的变量的偏移量,变量预期值 expect,新的值 update。

操作含义是如果对象 obj 中内存偏移量为 valueOffset 位置的变量值为 expect 则使用新的值 update 替换旧的值 expect。这个是处理器提供的一个原子性指令。

一般CAS具有三个操作数,内存位置 V预期值 A新值 B。如果在执行过程中,发现内存中的值 V 与预期值 A 相匹配,那么他会将 V 更新为新值 A。如果预期值 A 和内存中的值 V 不相匹配,那么处理器就不会执行任何操作。

这是一个非阻塞方法 (Non-blocking Algorithms),虽然在处理器层面其是阻塞的,但是由于处理器层面的阻塞时间极其的短,在代码层面不会有体现。

CAS存在的三大问题

这也是乐观锁中存在的问题。

  • ABA 问题:因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A - 2B -3A。
  • 循环时间长开销大:自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline), 使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突( memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。
  • 只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i=2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

AQS框架

AQS(AbstractQueuedSynchronizer 类)是一个用来构建锁和同步器的框架,各种 Lock 包中的锁(常用的有 ReentrantLock、ReadWriteLock),以及其他如 Semaphore、CountDownLatch,甚至是早期的 FutureTask 等,都是基于 AQS 来构建。

  • AQS 在内部定义了一个 volatile int state 变量,表示同步状态:当线程调用 lock 方法时 ,如果 state=0,说明没有任何线程占有共享资源的锁,可以获得锁并将 state=1;如果 state=1,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。
  • AQS 通过 Node 内部类构成的一个双向链表结构的同步队列,来完成线程获取锁的排队工作,当有线程获取锁失败后,就被添加到队列末尾。
    • Node 类是对要访问同步代码的线程的封装,包含了线程本身及其状态叫 waitStatus(有五种不同 取值,分别表示是否被阻塞,是否等待唤醒,是否已经被取消等),每个 Node 结点关联其 prev 结点和 next 结点,方便线程释放锁后快速唤醒下一个在等待的线程,是一个 FIFO 的过程。
    • Node 类有两个常量,SHARED 和 EXCLUSIVE,分别代表共享模式和独占模式。所谓共享模式是一个锁允许多条线程同时操作(信号量 Semaphore 就是基于 AQS 的共享模式实现的),独占模式是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待(如 ReentranLock)。
  • AQS 通过内部类 ConditionObject 构建等待队列(可有多个),当 Condition 调用 wait() 方法后,线程将会加入等待队列中,而当 Condition 调用 signal() 方法后,线程将从等待队列转移动同步队列中进行锁竞争。
  • AQS 和 Condition 各自维护了不同的队列,在使用 Lock 和 Condition 的时候,其实就是两个队列的互相移动。

Synchronized 和 ReentrantLock 的异同

ReentrantLock 是 Lock 的实现类,是一个互斥的同步锁。

从功能角度,ReentrantLock 比 Synchronized 的同步操作更精细(因为可以像普通对象一样使用),甚至实现 Synchronized 没有的高级功能,如:

  • 等待可中断:当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,对处理执行时间非常长的同步块很有用。
  • 带超时的获取锁尝试:在指定的时间范围内获取锁,如果时间到了仍然无法获取则返回。
  • 可以判断是否有线程在排队等待获取锁。
  • 可以响应中断请求:与 Synchronized 不同,当获取到锁的线程被中断时,能够响应中断,中断异常将会被抛出,同时锁会被释放。
  • 可以实现公平锁。

从锁释放角度,Synchronized 在 JVM 层面上实现的,不但可以通过一些监控工具监控 Synchronized 的锁定,而且在代码执行出现异常时,JVM 会自动释放锁定;但是使用 Lock 则不行,Lock 是通过代码实现的,要保证锁定一定会被释放,就必须将 unLock() 放到 finally{} 中。

从性能角度,Synchronized 早期实现比较低效,对比 ReentrantLock,大多数场景性能都相差较大。但是在 Java 6 中对其进行了非常多的改进,在竞争不激烈时,Synchronized 的性能要优于 ReetrantLock;在高竞争情况下,Synchronized 的性能会下降几十倍,但是 ReetrantLock 的性能能维持常态。

线程池

newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。这种类型的线程池特点是:

  • 工作线程的创建数量几乎没有限制(其实也有限制的,数目为 Interger. MAX_VALUE)这样可,灵活的往线程池中添加线程。
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 在使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

newFixedThreadPool

创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。

FixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

newSingleThreadExecutor

创建一个单线程化的 Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO、LIFO、优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。

线程池设计

  • 当任务队列没有满时,如果线程池中线程个数小于核心线程数,那么新加任务进来时,创建新的线程,并把新任务设为 worker 的 firstTask,由新创建的线程直接执行。线程池的 Worker 重写了 run 方法,当线程调用 run 方法时,执行 runWorker 方法,此时线程会进去一个 while 循环不断 getTask,意味着如果有新任务添加进来都会被在 while 里面循环的线程执行。

  • 如果线程达到核心线程数,且任务队列没有满,新添加进来的任务会被池里任意空闲的线程执行。如果工作队列满了,说明线程池的工作线程已经达到了核心数目,那么就需要创建新的线程,新任务为新线程的 firstTask。runTask 函数中,while 循环 getTask 时,每个线程从任务队列中 takae 任务时是独占 lock 锁的,哪个线程能 take 成功,就由哪个线程执行。

0%