并发编程-基础

概念

进程与线程

进程是资源分配的基本单位,线程是独立调度的基本单位,CPU调度和分派的基本单位。

操作系统在分配资源时候是把资源分配给进程的,但是 CPU 资源就比较特殊,它是分派到线程的,因为真正要占用 CPU 运行的是线程,所以也说线程是 CPU 分配的基本单位。

同步和异步

同步: 同步方法调用一旦开始,调用者必须等待调用方法返回,才能继续后面的行为。

异步:只需要调用方法,便会立即返回,可以继续后续的操作。

并发和并行:

并发:侧重于任务的交替执行,多个任务是串行执行的。

并行:真正意义上的同时执行。

当只有一个CPU时,一个CPU一次只能执行一条指令,这种情况下多进程与多线程是并发的,而不是并行的。

只有多核CPU才会出现并行情况。

临界区

表示一种公共资源或者共享数据,可以被多个线程使用,但每次使用时只能有一个线程去使用它。

阻塞与非阻塞

形容多个线程之间的影响。

阻塞:其中假设一个线程占用了临界区资源 ,其他需要该资源的线程就需要一直等待该线程释放资源。

非阻塞:没有一个线程可以妨碍其他线程的工作,所有线程都会尝试向前执行。

死锁、饥饿、活锁

饥饿: 某一个线程因为种种原因始终无法获得需要的资源,导致一直无法执行。

比如优先级太低 或者 某一个线程始终占用着关键资源不放,导致其他线程无法执行。

活锁: 线程互相谦让,主动将资源释放给别人使用,资源便会在两个线程之间跳动,则没有一个线程拿到资源。

并发级别:

阻塞:

如果使用的和其他重入锁,得到的就是一个阻塞的队列。

无饥饿:

锁是公平的,遵循着先来先到的原则,不管新来的线程优先级多高,都要乖乖排队。

无障碍

最弱的非阻塞调度,是一种乐观的控制策略,一旦发现修改共享数据冲突,则进行回滚操作。

无锁

并行也是无障碍的,但不同的是无锁的并发保证必然有一个线程能够在有限步中完成操作离开临界区。

无等待

在无锁的基础上,要求全部线程都在有限步内完成。

并行定律

Amdahl定律: 定义了串行化系统并行化后的加速比的计算公式和理论上限。

强调当串行比例一定时,加速比有上限,不管堆叠多少个CPU,都不能突破上限。

Gustafson定律:如果可被并行化的代码比重足够多,则加速比随着CPU数量呈线性增长。

线程创建与运行

Java 中有三种线程创建方法,分别为实现 Runnable 接口的run方法、继承 Thread 类并重写 run 方法、使用 FutureTask 方式。

实现继承 Thread 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ThreadTest {

//继承Thread类并重写run方法
public static class MyThread extends Thread {

@Override
public void run() {
System.out.println("I am a child thread");
}
}

public static void main(String[] args) {
// 创建线程
MyThread thread = new MyThread();
// 启动线程
thread.start();
}
}

MyThread 类继承了 Thread 类,并重写了 run 方法,然后调用了线程的 start 方法启动了线程,当创建完 thread 对象后该线程并没有被启动执行。当调用了 start 方法后才是真正启动了线程。其实当调用了 start 方法后线程并没有马上执行而是处于就绪状态,这个就绪状态是指该线程已经获取了除 CPU 资源外的其它资源,等获取 CPU 资源后才会真正处于运行状态。

当 run 方法执行完毕,该线程就处于终止状态了。使用继承方式好处是 run 方法内获取当前线程直接使用 this 就可以,无须使用 Thread.currentThread() 方法,不好的地方是 Java 不支持多继承,如果继承了 Thread 类那么就不能再继承其它类,另外任务与代码没有分离,当多个线程执行一样的任务时候需要多份任务代码,而 Runnable 则没有这个限制。

实现 Runnable 接口的 run 方法

1
2
3
4
5
6
7
8
9
10
11
12
public static class RunableTask implements Runnable{

@Override
public void run() {
System.out.println("I am a child thread");
}
}
public static void main(String[] args) throws InterruptedException{
RunableTask task = new RunableTask();
new Thread(task).start();
new Thread(task).start();
}

如上面代码,两个线程公用一个 task 代码逻辑,需要的话 RunableTask 可以添加参数进行任务区分,另外 RunableTask 可以继承其他类,但是上面两种方法都有一个缺点就是任务没有返回值。

实现 FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//创任务类,类似Runable
public static class CallerTask implements Callable<String>{

@Override
public String call() throws Exception {

return "hello";
}

}

public static void main(String[] args) throws InterruptedException {
// 创建异步任务
FutureTask futureTask = new FutureTask<>(new CallerTask());
//启动线程
new Thread(futureTask).start();
try {
//等待任务执行完毕,并返回结果
String result = futureTask.get();
System.out.println(result);
} catch (ExecutionException e) {
e.printStackTrace();
}
}

哪些指令不能重排:(Happen-Before原则)

  • 程序顺序执行:一个线程内保证语义的串行性
  • volatile原则: volatile变量的写操作必然要早于读操作
  • 锁规则: lock要发生在unlock之前
  • 传递性: A先于B,B先于C,则A先于C
  • start()方法早于先于它的任何一个工作
  • 线程所有操作先于Thread.join( )
  • interrupt( )中断操作先于被中断线程的代码
  • 对象的构造函数执行,结束均先于finalize( )方法

不使用多进程而使用多线程,是因为线程切换和调度的成本远远小于进程。

线程状态:

查看JDK源码,发现其中线程在枚举类中有6个状态,NEW表示刚刚创建的线程,等待start()方法执行时,则处于RUNNABLE状态,表示线程所需的一切资源已经准备好了。

若执行过程中遇到了锁,则进入BLOCKED状态。

WAITING:会进入一个无时间限制的等待

TIMED_WAITING: 会进入一个有时间限制的等待

调用start()和调用run()方法的区别:

其中调用run( )方法,只会在当前线程中串行执行run()中的代码。

线程中断

1
2
3
4
public boolean Thread.isInterrupted()   //判断线程是否被中断
public void Thread.interrupted() //中断线程,只是设置线程的中断标志,但实际上并不会中断线程
public static boolean Thread.isInterrupted()
//静态方法,判断线程是否被中断,并清除当前线程的中断标志位。

例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.thread.test1;

public class Test_Interrupt {

public static void main(String[] args) throws InterruptedException {

Thread t1=new Thread(new Runnable() {

@Override
public void run() {
while(true) {
//判断是否被中断,如果只有t1.interrupt();则无法中断线程。需要添加标记变量。
if(Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted");
break;
}
}
}
});
t1.start();
Thread.sleep(200);
t1.interrupt();
}

}

例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.thread.test1;

public class Test_Interrupt {

public static void main(String[] args) throws InterruptedException {

Thread t1=new Thread(new Runnable() {

@Override
public void run() {

while(true) {
if(Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted");
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {

System.out.println("Interrupt when sleep");
//设置中断,因为sleep方法由于中断而抛出异常,此时会清除中断标记,若不加处理,则下一次循环开始时,则无法捕获这个中断,故需要再次设置。
Thread.currentThread().interrupt();
}
Thread.yield();
}
}
});
t1.start();
Thread.sleep(200);
t1.interrupt();
}
}

线程等待和通知

Object对象必须包含在对应的sychronized语句中,无论是wait( )和notify( )都需要首先获得目标对象的一个监视器。若未事先获取到该对象的监视器锁,则调用 wait() 方法时候线程会抛出 IllegalMonitorStateException 异常。

当一个线程调用一个共享对象的 wait() 方法时候,调用线程会被阻塞挂起,直到下面几个情况之一发生才返回:

  • 其它线程调用了该共享对象的 notify() 或者 notifyAll() 方法;

  • 其它线程调用了该线程的 interrupt() 方法设置了该线程的中断标志,该线程会抛出 InterruptedException 异常返回。

虚假唤醒

一个线程可以从挂起状态变为可以运行状态(也就是被唤醒)即使该线程没有被其它线程调用 notify(),notifyAll() 进行通知,或者被中断,或者等待超时。如生产者1个,消费者多个,生产者一次只生产一个,而消费者过多,可能导致虚假唤醒。

为了防止虚假唤醒,需要不停的去测试该线程被唤醒的条件是否满足,不满足则继续等待,也就是说在一个循环中去调用 wait() 方法进行防范,退出循环的条件是条件满足了唤醒该线程。

1
2
3
4
5
synchronized (obj) {
while (条件不满足){
obj.wait();
}
}

等待线程执行终止的 join 方法

项目中会遇到这样的场景,就是需要等待某几件事情完成后才能继续往下执行,比如多个线程去加载资源,当多个线程全部加载完毕后在汇总处理,Thread 类中有个 join() 方法就可以完成。

由于 CountDownLatch 功能比 join 更丰富,所以项目实践中一般使用 CountDownLatch

线程死锁

指的是两个或两个以上的线程在执行过程中,因争夺资源而造成的互相等待的现象,在无外力作用的情况下,这些线程会一直相互等待而无法继续运行下去。

死锁的产生必须具备以下四个必要条件。

  • 互斥条件:指线程对已经获取到的资源进行排它性使用,即该资源同时只由一个线程占用。如果此时还有其它进行请求获取该资源,则请求者只能等待,直至占有资源的线程用毕释放。
  • 请求并持有条件:指一个线程已经持有了至少一个资源,但又提出了新的资源请求,而新资源已被其其它线程占有,所以当前线程会被阻塞,但阻塞的同时并不释放自己已经获取的资源。
  • 不可剥夺条件:指线程获取到的资源在自己使用完之前不能被其它线程抢占,只有在自己使用完毕后由自己释放。
  • 环路等待条件:指在发生死锁时,必然存在一个线程——资源的环形链,即线程集合{T0,T1,···, Tn}中的 T0 正在等待一个 T1 占用的资源;T1 正在等待 T2 占用的资源,……Tn正在等待已被 T0 占用的资源。

目前只有持有并等待循环等待是可以被破坏的

避免死锁

  • 避免一个线程持有多个锁
  • 避免一个线程在锁内同时占用多个资源
  • 使用定时锁
  • 对于数据库锁,加锁和解锁在同一数据库连接中

守护线程与用户线程

当main 函数是唯一的用户线程,thread 线程是守护线程,当 main 线程运行结束后,JVM 发现当前已经没有用户线程了,就会终止 JVM 进程。

Java 中在 main 线程运行结束后,JVM 会自动启动一个叫做 DestroyJavaVM 线程,该线程会等待所有用户线程结束后终止 JVM 进程。

其中会调用到 JavaMain 这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
int JNICALL
JavaMain(void * _args)
{
...
//执行Java中的main函数
(*env)->CallStaticVoidMethod(env, mainClass, mainID, mainArgs);

//main函数返回值
ret = (*env)->ExceptionOccurred(env) == NULL ? 0 : 1;

//等待所有非守护线程结束,然后销毁JVM进程
LEAVE();
}

LEAVE 是 C 语言里面的一个宏定义,定义如下:

1
2
3
4
5
6
7
8
9
10
11
#define LEAVE() 
do {
if ((*vm)->DetachCurrentThread(vm) != JNI_OK) {
JLI_ReportErrorMessage(JVM_ERROR2);
ret = 1;
}
if (JNI_TRUE) {
(*vm)->DestroyJavaVM(vm);
return ret;
}
} while (JNI_FALSE)

上面宏的作用实际是创建了一个名字叫做 DestroyJavaVM 的线程来等待所有用户线程结束。

在 Tomcat 的 NIO 实现 NioEndpoint 中会开启一组接受线程用来接受用户的链接请求和一组处理线程负责具体处理用户请求,那么这些线程是用户线程还是守护线程呢?下面我们看下 NioEndpointstartInternal 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void startInternal() throws Exception {

if (!running) {
running = true;
paused = false;

...

//创建处理线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);//声明为守护线程
pollerThread.start();
}
//启动接受线程
startAcceptorThreads();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];

for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());//设置是否为守护线程,默认为守护线程
t.start();
}
}

private boolean daemon = true;
public void setDaemon(boolean b) {
daemon = b;
}
public boolean getDaemon() {
return daemon;
}

如上代码也就是说默认情况下接受线程和处理线程都是守护线程,这意味着当 Tomcat 收到 shutdown 命令后 Tomcat 进程会马上消亡,而不会等处理线程处理完当前的请求。

注:如果你想在主线程结束后 JVM 进程马上结束,那么创建线程的时候可以设置线程为守护线程,否则如果希望主线程结束后子线程继续工作,等子线程结束后在让 JVM 进程结束那么就设置子线程为用户线程。

三个窗口同步卖票

使用显式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.thread.test1;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLock {

public static void main(String[] args) {
Ticket ticket=new Ticket();
// TODO Auto-generated method stub
new Thread(ticket,"窗口1").start();
new Thread(ticket,"窗口2").start();
new Thread(ticket,"窗口3").start();
}

}
class Ticket implements Runnable{
private int ticket=100;
private Lock lock=new ReentrantLock();

@Override
public void run() {
while(ticket>0) {
lock.lock();
try {
if(ticket>0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ ":"+--ticket);
}
}finally {
lock.unlock();
}
}
}
}

模拟CAS算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.thread.test1;

public class TestCompareAndSwap {

public static void main(String[] args) {
// TODO Auto-generated method stub
final ComapareAndSwap casAndSwap=new ComapareAndSwap();
for(int i=0;i<10;i++) {
new Thread(new Runnable() {

@Override
public void run() {
// TODO Auto-generated method stub
int value=casAndSwap.get();
boolean b=casAndSwap.CompareSet(value,(int) Math.random()*100);
System.out.println(b);

}
}).start();
}

}

}
class ComapareAndSwap{
private int value;
//获取值
public synchronized int get() {
return value;
}
//交换值
public synchronized int CompareSwap(int expectValue,int newValue) {
int oldValue=value;
if(oldValue==expectValue) {
this.value=newValue ;
}
return oldValue;
}
///设置新值
public synchronized boolean CompareSet(int expectValue,int newValue) {
return expectValue==CompareSwap(expectValue, newValue);
}
}

上下文切换

CPU通过时间片轮转算法来不断的切换线程以执行任务。所谓的上下文切换也就是,任务从保存到再加载的过程。

线程上下文切换时机:

  • 当前线程的 CPU 时间片使用完毕处于就绪状态时候;
  • 当前线程被其它线程中断时候。

注:由于线程切换是有开销的,所以并不是开的线程越多越好,比如如果机器是4核心的,你开启了100个线程,那么同时执行的只有4个线程,这100个线程会来回切换线程上下文来共享这四个 CPU。

vmstat 测量上下文切换的次数

如何减少上下文切换

  • 无锁并发编程 将数据ID按照Hash算法取模分段,不同线程处理不同段的数据。
  • CAS Atomic包,原子性
  • 使用最少线程
  • 协程操作 在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换

volatile

非阻塞算法,不会造成线程上下文切换的开销。

实现原理

jvm 提供的最轻量级的同步机制。

可见性

当程序对 volatile 修饰的变量进行写操作时,即对该变量值进行修改时,JIT 编译器生成对应汇编指令时,除了会包含写的动作,还会在最后加上一行:

1
0x01a3de24: lock addl $0X0,(%esp);

其中可以看出多出一行为lock前缀的汇编代码,在多核处理器下会引发两件事情:

  • 将当前CPU缓存行的数据写回系统内存
  • 这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效

其中LOCK信号会锁住总线,导致其他CPU不能访问总线,也就意味着不能访问内存。而根据缓存的一致性协议,当其他线程操作该变量值,发现内存地址无效了,就会去访问系统内存读取数据。

有序性

  • 内存屏障

    在使用 volatile 修饰的变量会产生内存屏障,即在执行到内存屏障这句指令时,在它前面的操作已经全部完成;它会强制将对缓存的修改操作立即写入主存

与synchronized的比较

在 JVM 中,每个对象在内存中的布局分为三块区域:对象头、实例数据和对齐填充。

  • 实例变量:存放类的属性数据信息,包括父类的属性信息,如果是数组的实例部分还包括数组的长度,这部分内存按 4 字节对齐。

  • 填充数据:由于虚拟机要求对象起始地址必须是 8 字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐,这点了解即可。

  • 对于对象头,一般而言,synchronized 使用的锁对象是存储在 Java 对象头里的,而每个对象都存在着一个 monitor 与之关联,下面为对应程序获取锁,释放锁的字节码指令:

1
2
3
monitorenter  //进入同步方法
..........获得对象锁的程序执行代码
monitorexit //退出同步方法

其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。当执行 monitorenter 指令时,当前线程将试图获取对象锁 所对应的 monitor 的持有权,当该对象的 monitor 的进入计数器为 0,那线程可以成功取得 monitor,并将计数器值设置为 1,取锁成功。

如果当前线程已经拥有 monitor 的持有权,那它可以重入。这个 monitor,重入时计数器的值也会加 1。倘若其他线程已经拥有该对象 的 monitor 的所有权,那当前线程将被阻塞,直到正在执行线程执行完毕,即 monitorexit 指令被执行,执行线程将释放 monitor(锁)并设置计数器值为 0 ,其他线程将有机会持有 monitor。

追加64字节能提高并发编程效率

部分处理器的缓存行是64个字节宽,不支持部分填充缓存行,若队列的头结点和尾结点都不足64字节的话,处理器会把他们都读到一个高速缓存行中,但当处理器试图修改头结点时,会锁定整个缓存行,导致尾结点不能被其他处理器所访问。

但无法保证原子性。例如AtomicInteger,值使用volatile修饰,保证多核下的可见性,数据修改时使用unsafe类,保证原子性。

synchronized


独占锁,只能有一个线程去调用,其他调用线程会阻塞

实现原理

Java内存模型规定了所有的变量都存放在主内存中,当线程使用变量时候都是把主内存里面的变量拷贝到了自己的工作空间或者叫做工作内存。

比如有这样的双核 CPU 系统架构,每核有自己的控制器和运算器,其中控制器包含一组寄存器和操作控制器,运算器执行算术逻辑运算,并且有自己的一级缓存,并且有些架构里面双核还有个共享的二级缓存

假设线程 A和 B 使用不同 CPU 进行去修改共享变量 X,假设 X 的初始化为0,并且当前两级 Cache 都为空的情况,具体看下面分析:

  • 假设线程 A 首先获取共享变量 X 的值,由于两级 Cache 都没有命中,所以到主内存加载了 X=0,然后会把 X=0 的值缓存到两级缓存,假设线程 A 修改 X 的值为1,然后写入到两级 Cache,并且刷新到主内存(注:如果没刷新会主内存也会存在内存不可见问题)。这时候线程 A 所在的 CPU 的两级 Cache 内和主内存里面 X 的值都是1;
  • 然后假设线程 B 这时候获取 X 的值,首先一级缓存没有命中,然后看二级缓存,二级缓存命中了,所以返回 X=1;然后线程 B 修改 X 的值为2;然后存放到线程2所在的一级 Cache 和共享二级 Cache,最后更新主内存值为2;
  • 然后假设线程 A 这次又需要修改 X 的值,获取时候一级缓存命中获取 X=1,到这里问题就出现了,明明线程 B 已经把 X 的值修改为了2,为啥线程 A 获取的还是1呢?这就是共享变量的内存不可见问题,也就是线程 B 写入的值对线程 A 不可见。

多线程并发修改共享变量时候会存在内存不可见的问题,究其原因是因为 Java 内存模型中线程操作共享变量时候会从自己的工作内存中获取而不是从主内存获取或者线程写入到本地内存的变量没有被刷新回主内存

  • 线程进入 Synchronized 块的语义:把在 Synchronized 块内使用到的变量从线程的工作内存中清除,在 Synchronized 块内使用该变量时候就不会从线程的工作内存中获取了,而是直接从主内存中获取。
  • 退出 Synchronized 块的内存语义:把 Synchronized 块内对共享变量的修改刷新到主内存。

跟 Synchronized 相比,可重入锁 ReentrantLock 的实现原理

锁的实现原理基本是为了达到一个目的:

让所有的线程都能看到某种标记。

  • Synchronized 通过在对象头中设置标记实现了这一目的,是一种 JVM 原生的锁实现方式.

  • ReentrantLock 以及所有的基于 Lock 接口的实现类,都是通过用一个 volitile 修饰的 int 型变量,并保证每个线程都能拥有对该 int 的可见性和原子修改,其本质是基于所谓的 AQS 框架。

JVM对java的原生锁的优化

由于 Java 层面的线程与操作系统的原生线程有映射关系,如果要将一个线程进行阻塞或唤起都需要操作系统的协助,这就需要从用户态切换到内核态来执行,这种切换代价十分昂贵,很耗处理器时间,现代 JDK 中做了大量的优化。

  • 使用自旋锁,即在把线程进行阻塞操作之前先让线程自旋等待一段时间,可能在等待期间其他线程已经解锁,这时就无需再让线程执行阻塞操作,避免了用户态到内核态的切换。

现代 JDK 中还提供了三种不同的 Monitor 实现,也就是三种不同的锁:

  • 偏向锁(Biased Locking)
  • 轻量级锁
  • 重量级锁

这三种锁使得 JDK 得以优化 Synchronized 的运行,当 JVM 检测到不同的竞争状况时,会自动切换到适合的锁实现,这就是锁的升级、降级。

  • 当没有竞争出现时,默认会使用偏向锁。

JVM 会利用 CAS 操作,在对象头上的 Mark Word 部分设置线程 ID,以表示这个对象偏向于当前线程,所以并不涉及真正的互斥锁,因为在很多应用场景中,大部分对象生命周期中最多会被一个线程锁定,使用偏向锁可以降低无竞争开销。

  • 如果有另一线程试图锁定某个被偏向过的对象,JVM 就撤销偏向锁,切换到轻量级锁实现。
  • 轻量级锁依赖 CAS 操作 Mark Word 来试图获取锁,如果重试成功,就使用普通的轻量级锁;否则,进一步升级为重量级锁。

AtomicLong的用法

AtomicLong 是原子性递增或者递减类,其内部使用 Unsafe 来实现,下面看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;

// (1)获取Unsafe实例
private static final Unsafe unsafe = Unsafe.getUnsafe();

//(2)存放变量value的偏移量
private static final long valueOffset;


//(3)判断JVM是否支持Long类型无锁CAS
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();

static {
try {
//(4)获取value在AtomicLong中偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

//(5)实际变量值
private volatile long value;

public AtomicLong(long initialValue) {
value = initialValue;
}
....
}
  • 代码(1)创建了通过 Unsafe.getUnsafe()方式获取到 Unsafe 类实例,是因为 AtomicLong 类也是在 rt.jar 包里面,AtomicLong 类的加载就是通过 BootStarp 类加载器进行加载的。

  • 代码(5)中 value 声明为 volatile 是为了多线程下保证内存可见性,value 是具体存放计数的变量。

  • 代码(2)(4)获取 value 变量在 AtomicLong 类中偏移量。

    主要函数:

  • 递增和递减操作代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//(6)调用unsafe方法,原子性设置value值为原始值+1,返回值为递增后的值
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}

//(7)调用unsafe方法,原子性设置value值为原始值-1,返回值为递减之后的值
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}

//(8)调用unsafe方法,原子性设置value值为原始值+1,返回值为原始值
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

//(9)调用unsafe方法,原子性设置value值为原始值-1,返回值为原始值
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}

如上代码内部都是调用 Unsafe 的 getAndAddLong 方法实现,这个函数是个原子性操作,这里第一个参数是 AtomicLong 实例的引用,第二个参数是 value 变量在 AtomicLong 中的偏移值,第三个参数是要设置第二个变量的值。

其中 getAndIncrement 方法在 JDK 7 的实现逻辑为:

1
2
3
4
5
6
7
8
public final long getAndIncrement() {
while (true) {
long current = get();
long next = current + 1;
if (compareAndSet(current, next))
return current;
}
}

如上代码可知每个线程是先拿到变量的当前值(由于是 value 是 volatile 变量所以这里拿到的是最新的值),然后在工作内存对其进行增一操作,然后使用 CAS 修改变量的值,如果设置失败,则循环继续尝试,直到设置成功。

而 JDK 8 逻辑为:

1
2
3
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

其中JDK8中unsafe.getAndAddLong代码为:

1
2
3
4
5
6
7
8
9
public final long getAndAddLong(Object paramObject, long paramLong1, long paramLong2)
{
long l;
do
{
l = getLongVolatile(paramObject, paramLong1);
} while (!compareAndSwapLong(paramObject, paramLong1, l, l + paramLong2));
return l;
}

可知 JDK 7 的 AtomicLong 中的循环逻辑已经被 JDK 8 的原子操作类 Unsafe 内置了,之所以内置应该是考虑到这种函数会在其它地方也会用到,内置可以提高复用性。

  • boolean compareAndSet(long expect, long update)方法
1
2
3
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

两个线程各自统计自己所在数据中0的个数,调用AtomicLong的原子性递增方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.atomic.AtomicLong;

public class Atomic {
private static AtomicLong atomicLong = new AtomicLong();
private static Integer[] array1 = new Integer[]{0, 1, 3, 6, 0, 8, 10};
private static Integer[] array2 = new Integer[]{2, 3, 0, 6, 7};

public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
int size = array1.length;
for (int i = 0; i < size; i++) {
if (array1[i].intValue() == 0) {
atomicLong.incrementAndGet();
}
}

}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
int size = array2.length;
for (int i = 0; i < size; i++) {
if (array2[i].intValue() == 0) {
atomicLong.incrementAndGet();
}
}
}
});
thread1.start();
thread2.start();
//join()必须加上,等待子线程执行完
thread1.join();
thread2.join();
System.out.println("打印0的个数为: " + atomicLong.get());
}

}

伪共享

Cache 内部是按行存储的,其中每一行称为一个 Cache 行,Cache 行是 Cache 与主内存进行数据交换的单位,Cache 行的大小一般为2的幂次数字节。

因为多个变量被放入了一个缓存行,并且多个线程同时写入缓存行中的不同变量。

多个变量之所以会被放入一个缓存行,是因为缓存与内存交换数据的单位是缓存行,当CPU要访问的变量未在缓存命中,根据程序运行的局部性原理会将该变量在内存中大小为Cache的内存放在缓存行。

如何避免伪共享

JDK 8 之前一般都是通过字节填充的方式来避免,也就是创建一个变量的时候使用填充字段填充该变量所在的缓存行,这样就避免了多个变量存在同一个缓存行,如下代码:

1
2
3
4
public final static class FilledLong {
public volatile long value = 0L;
public long p1, p2, p3, p4, p5, p6;
}

假如 Cache 行为64个字节,那么我们在 FilledLong 类里面填充了6个 long 类型变量,每个 long 类型占用8个字节,加上 value 变量的8个字节总共56个字节,另外这里 FilledLong 是一个类对象,而类对象的字节码的对象头占用了8个字节,所以当 new 一个 FilledLong 对象时候实际会占用64个字节的内存,这个正好可以放入 Cache 的一个行。

在 JDK 8 中提供了一个 sun.misc.Contended 注解,用来解决伪共享问题,上面代码可以修改为如下:

1
2
3
4
@sun.misc.Contended 
public final static class FilledLong {
public volatile long value = 0L;
}

上面是修饰类的,当然也可以修饰变量,比如 Thread 类中的使用:

1
2
3
4
5
6
7
8
9
10
11
/** The current seed for a ThreadLocalRandom */
@sun.misc.Contended("tlr")
long threadLocalRandomSeed;

/** Probe hash value; nonzero if threadLocalRandomSeed initialized */
@sun.misc.Contended("tlr")
int threadLocalRandomProbe;

/** Secondary seed isolated from public ThreadLocalRandom sequence */
@sun.misc.Contended("tlr")
int threadLocalRandomSecondarySeed;

需要注意的是默认情况下 @Contended 注解只用到 Java 核心类,比如 rt 包下的类,如果需要在用户 classpath 下的类使用这个注解需要添加 JVM 参数:-XX:-RestrictContended,另外默认填充的宽度为128,如果你想要自定义宽度可以设置 -XX:ContendedPaddingWidth 参数。

ThreadLocal

底层原理

ThreadLoal 变量是线程局部变量,其作用域覆盖线程,每个线程都可以拥有自己的局部变量,都可存入 ThreadLocal 中。

内部维护了一个 ThreadLocalMap 的成员变量。

ThreadLocalMap 是一个 key-value 存储的 map,也是用 Entry 来保存 K-V 结构数据的。但是 Entry 中 key 只能是 ThreadLocal 对象,这点被 Entry 的构造方法已经限定死了。

Entry 继承自 WeakReference(弱引用就是生命周期只能存活到下次 GC 前),但只有 Key 是弱引用类型的,Value 并非弱引用。这是为了防止内存泄露。一旦线程结束,key 变为一个不可达的对象,这个 Entry 就可以被 GC 了。

但当 ThreadLocal 在没有外部对象强引用时,发生 GC 时弱引用 Key 会被回收,而 Value 不会回收,如果创建 ThreadLocal 的线程一直持续运行,那么这个 Entry 对象中的 value 就有可能一直得不到回收,发生内存泄露。

注意不要与线程池配合,因为worker线程往往不会退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal k, Object v) {
super(k);
value = v;
}
}

解决 Hash 冲突

解决冲突的方法是线性探测法(不断加 1)。

所谓线性探测,就是根据初始 key 的 hashcode 值确定元素在 table 数组中的位置,如果发现这个位置上已经有其他 key 值的元素被占用,则利用固定的算法寻找一定步长的下个位置,依次判断,直至找到能够存放的位置。

避免泄露

既然 Key 是弱引用,那么就是在调用 ThreadLocal 的 get()、set() 方法时完成后再调用 remove 方法,将 Entry 节点和 Map 的引用关系移除,这样整个 Entry 对象在 GC Roots 分析后就变成不可达了,下次 GC 的时候就可以被回收。

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ThreadLocalTest {
static ThreadLocal localVariable=new ThreadLocal<>();
static void print(String str){
System.out.println(str+":"+localVariable.get());
//localVariable.remove();
}

public static void main(String[] args) {
Thread thread1=new Thread(new Runnable() {
@Override
public void run() {
localVariable.set("thread1");
print("thread1");
System.out.println("thread1 remove after"+":"+localVariable.get());
}
});
Thread thread2=new Thread(new Runnable() {
@Override
public void run() {
localVariable.set("thread2");
print("thread2");
System.out.println("thread2 remove after"+":"+localVariable.get());
}
});
thread1.start();
thread2.start();
}
}

输出示例(1)

1
2
3
4
thread1:thread1
thread1 remove after:thread1
thread2:thread2
thread2 remove after:thread2

将remove方法注释掉,打印输出实例。

输出示例(2)

1
2
3
4
thread1:thread1
thread1 remove after:null
thread2:thread2
thread2 remove after:null

每个线程的本地变量是存到线程自己的内存变量threadLocals里面的,若当前线程一直不消失,则有可能造成内存泄漏,使用完毕一定要调用remove()方法将线程的变量threadLocals里的本地变量删除。

InheritableThreadLocals

当父线程创建子线程时,构造函数里面会把父线程中的InheritableThreadLocals变量里面的本地变量拷贝一份复制到子线程的InheritableThreadLocals变量里。

这样,子线程便可以从父线程中获取到线程变量值了。

使用情境

  • 存放用户登录信息
  • 一些中间件需要统一的追踪ID将整个调用链路记录下来
0%