/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */ // 默认情况下使用非公平 publicSemaphore(int permits){ sync = new NonfairSync(permits); }
/** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */ publicSemaphore(int permits, boolean fair){ sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
finalintnonfairTryAcquireShared(int acquires){ for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
// 如果阻塞队列没有等待的线程,则参与许可的竞争;否则直接插入到阻塞队列尾节点并挂起,等待唤醒 protectedinttryAcquireShared(int acquires){ for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
demo
publicclassSemaphoreDemo{
static Semaphore semaphore = new Semaphore(5, true);
publicstaticvoidmain(String[] args){ ExecutorService service = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { service.submit(new Task()); } service.shutdown(); }
/** * Waits for another thread to arrive at this exchange point (unless * the current thread is {@linkplain Thread#interrupt interrupted}), * and then transfers the given object to it, receiving its object * in return. * * <p>If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * * <p>If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of two things happens: * <ul> * <li>Some other thread enters the exchange; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * for the exchange, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @param x the object to exchange * @return the object provided by the other thread * @throws InterruptedException if the current thread was * interrupted while waiting */ @SuppressWarnings("unchecked") public V exchange(V x)throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) thrownew InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
// 如果在指定时间内没有另一个线程调用exchange,会抛出超时异常。 @SuppressWarnings("unchecked") public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x; long ns = unit.toNanos(timeout); if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) thrownew InterruptedException(); if (v == TIMED_OUT) thrownew TimeoutException(); return (v == NULL_ITEM) ? null : (V)v; }
demo
publicclassExchangerDemo{ publicstaticvoidmain(String[] args)throws InterruptedException { Exchanger<String> exchanger = new Exchanger<>();
/** * Main barrier code, covering the various policies. */ privateintdowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation;
if (g.broken) thrownew BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); thrownew InterruptedException(); }
int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return0; } finally { if (!ranAction) breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); elseif (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } }
if (g.broken) thrownew BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); thrownew TimeoutException(); } } } finally { lock.unlock(); } }
demo
publicclassCyclicBarrierDemo{ publicstaticvoidmain(String[] args){ CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override publicvoidrun(){ System.out.println("所有人都到场了, 大家统一出发!"); } }); for (int i = 0; i < 10; i++) { new Thread(new Task(i, cyclicBarrier)).start(); } }