SomeSynchronizationTool

同步工具

作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为0时开始工作
CyclicBarrier 作用跟CountDownLatch类似,但是可以重复使用
Phaser 增强的CyclicBarrier

Semaphore

源码分析

内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。

/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
// 默认情况下使用非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 如果阻塞队列没有等待的线程,则参与许可的竞争;否则直接插入到阻塞队列尾节点并挂起,等待唤醒
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

demo

public class SemaphoreDemo {

static Semaphore semaphore = new Semaphore(5, true);

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
service.submit(new Task());
}
service.shutdown();
}

static class Task implements Runnable {
@Override
public void run() {
try {
semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "拿到了许可证");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "释放了许可证");
semaphore.release(2);
}
}
}

适用场景

可以用来做流量分流,特别是对公共资源有限的场景;

Exchanger

用于两个线程交换数据,支持泛型。

源码分析

使用park/unpark来实现等待状态的切换

/**
* Waits for another thread to arrive at this exchange point (unless
* the current thread is {@linkplain Thread#interrupt interrupted}),
* and then transfers the given object to it, receiving its object
* in return.
*
* <p>If another thread is already waiting at the exchange point then
* it is resumed for thread scheduling purposes and receives the object
* passed in by the current thread. The current thread returns immediately,
* receiving the object passed to the exchange by that other thread.
*
* <p>If no other thread is already waiting at the exchange then the
* current thread is disabled for thread scheduling purposes and lies
* dormant until one of two things happens:
* <ul>
* <li>Some other thread enters the exchange; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for the exchange,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @param x the object to exchange
* @return the object provided by the other thread
* @throws InterruptedException if the current thread was
* interrupted while waiting
*/
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}

// 如果在指定时间内没有另一个线程调用exchange,会抛出超时异常。
@SuppressWarnings("unchecked")
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}

demo

public class ExchangerDemo {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
System.out.println("这是线程A,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程A的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
Thread.sleep(1000);

new Thread(() -> {
try {
System.out.println("这是线程B,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程B的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

适用场景

一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以可以传输任何的数据,比如IO流或者IO缓存。根据JDK中的注释,可总结如下:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景;

当三个线程调用同一个实例的exchange方法时,只有前两个线程会交换数据,第三个线程会进入阻塞状态。

需要注意的是,exchange是可以重复使用的。也就是说。两个线程可以使用Exchanger在内存中不断地再交换数据。

CountDownLatch

CountDownLatch用一个给定的计数器来初始化,该计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。调用该类await()的线程会一直处于阻塞状态,直到其他线程调用countDown()使当前计数器的值变为零(每次调用countDown计数器的值减1)。当计数器值减至零时,所有因调用await()而处于等待状态的线程就会继续往下执行。这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier。

原理分析

内部有一个继承了AQS的同步器Sync

demo

/**
* 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。
*/
public class CountDownLatchDemo1And2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i + 1;
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("No." + no + "准备完毕,等待发令枪");
try {
begin.await();
System.out.println("No." + no + "开始跑步了");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "跑到终点了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
};
service.submit(runnable);
}
//裁判员检查发令枪...
Thread.sleep(3000);
System.out.println("发令枪响,比赛开始!");
begin.countDown();
end.await();
System.out.println("所有人到达终点,比赛结束");
}
}

适用场景

程序执行需要等待某个条件完成后才能继续执行后续的操作;如并行计算,当某个处理的运算量很大时,可以将该运算任务拆分成多个子任务,等待所有的子任务都完成之后,父任务再拿到所有子任务的运算结果进行汇总。

CyclicBarrier

类似于CountDownLatch,它也是通过计数器来实现的。当某个线程调用await方法时,该线程进入等待状态,且计数器加1,当计数器的值达到设置的初始值时,所有因调用await进入等待状态的线程被唤醒,继续执行后续操作。因为CycliBarrier在释放等待线程后可以重用,所以称为循环barrier。CycliBarrier支持一个可选的Runnable,在计数器的值到达设定值后(但在释放所有线程之前),该Runnable运行一次,Runnable在每个屏障点只运行一个

原理分析

虽然功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使用的是Lock + Condition实现的等待/通知模式。

/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

demo

public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到场了, 大家统一出发!");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new Task(i, cyclicBarrier)).start();
}
}

static class Task implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;

public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + id + "现在前往集合地点");
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程" + id + "到了集合地点,开始等待其他人到达");
cyclicBarrier.await();
System.out.println("线程" + id + "出发了");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

Barrier被破坏

如果线程在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以用isBroken()方法检测Barrier是否被破坏。

  1. 如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待;
  2. 如果在等待的过程中,线程被中断,也会抛出BrokenBarrierException异常,并且这个异常会传播到其他所有的线程;
  3. 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏;
  4. 如果超出指定的等待时间,当前线程会抛出TimeoutException 异常,其他线程会抛出BrokenBarrierException异常;

ReetrantReadWriteLock

具备的特性:

  1. 公平性

    • 非公平锁(默认)

      由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁;

    • 公平锁

      基于AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。

  2. 重入性

    写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。

  3. 锁降级

    写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。

  4. 锁获取中断

    读取锁和写入锁都支持获取锁期间被中断,这个和独占锁一致。

  5. 条件变量

    写入锁提供了对Condition的支持,这个和独占锁一致,但读取锁不允许获取条件变量,否则将产生异常UnsupportedOperationException。

state

在ReentrantReadWrilteLock里面将这个字段一分为二,高16位表示共享锁的数量,低16位表示独占锁的数量(或者重入数量)。

Phaser

Author: Jiayi Yang
Link: https://jiayiy.github.io/2020/07/17/SomeSynchronizationTool/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.