org.elasticsearch.common.util.concurrent
Class ThreadBarrier

java.lang.Object
  extended by java.util.concurrent.CyclicBarrier
      extended by org.elasticsearch.common.util.concurrent.ThreadBarrier

public class ThreadBarrier
extends CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. Barriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. ThreadBarrier adds a cause to BrokenBarrierException thrown by a CyclicBarrier.reset() operation defined by CyclicBarrier.

Sample usage:

  • Barrier as a synchronization and Exception handling aid
  • Barrier as a trigger for elapsed notification events
  •     class MyTestClass   implements RemoteEventListener
        {
            final ThreadBarrier     barrier;
    
            class Worker implements Runnable
            {
                    public void run()
                {
                            barrier.await();        //wait for all threads to reach run
                            try
                    {
                                    prepare();
                                    barrier.await();        //wait for all threads to prepare
                                    process();
                                    barrier.await();        //wait for all threads to process
                    }
                            catch(Throwable t){
                                    log("Worker thread caught exception", t);
                                    barrier.reset(t);
                    }
                }
            }
    
            public void testThreads() {
                    barrier = new ThreadBarrier(N_THREADS + 1);
                    for (int i = 0; i < N; ++i)
               new Thread(new Worker()).start();
    
                    try{
                            barrier.await();        //wait for all threads to reach run
                            barrier.await();        //wait for all threads to prepare
                            barrier.await();        //wait for all threads to process
                }
                    catch(BrokenBarrierException bbe) {
                            Assert.fail(bbe);
                }
           }
    
          int actualNotificationCount = 0;
            public synchronized void notify (RemoteEvent event) {
                    try{
                            actualNotificationCount++;
                            if (actualNotificationCount == EXPECTED_COUNT)
                                    barrier.await();        //signal when all notifications arrive
    
                             // too many notifications?
                             Assert.assertFalse("Exceeded notification count",
                                                                                    actualNotificationCount > EXPECTED_COUNT);
                }
                    catch(Throwable t) {
                            log("Worker thread caught exception", t);
                            barrier.reset(t);
                }
            }
    
            public void testNotify() {
                    barrier = new ThreadBarrier(N_LISTENERS + 1);
                    registerNotification();
                    triggerNotifications();
    
                    //wait until either all notifications arrive, or
                    //until a MAX_TIMEOUT is reached.
                    barrier.await(MAX_TIMEOUT);
    
                    //check if all notifications were accounted for or timed-out
                    Assert.assertEquals("Notification count",
                                                                            EXPECTED_COUNT, actualNotificationCount);
    
                    //inspect that the barrier isn't broken
                    barrier.inspect(); //throws BrokenBarrierException if broken
            }
        }
     


    Nested Class Summary
    static class ThreadBarrier.BarrierTimer
              A Barrier action to be used in conjunction with ThreadBarrier to measure performance between barrier awaits.
     
    Constructor Summary
    ThreadBarrier(int parties)
               
    ThreadBarrier(int parties, Runnable barrierAction)
               
     
    Method Summary
     int await()
               
     int await(long timeout, TimeUnit unit)
               
     void inspect()
              Inspects if the barrier is broken.
     boolean isBroken()
              Queries if this barrier is in a broken state.
     void reset(Throwable cause)
              Resets the barrier to its initial state.
     
    Methods inherited from class java.util.concurrent.CyclicBarrier
    getNumberWaiting, getParties, reset
     
    Methods inherited from class java.lang.Object
    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
     

    Constructor Detail

    ThreadBarrier

    public ThreadBarrier(int parties)

    ThreadBarrier

    public ThreadBarrier(int parties,
                         Runnable barrierAction)
    Method Detail

    await

    public int await()
              throws InterruptedException,
                     BrokenBarrierException
    Overrides:
    await in class CyclicBarrier
    Throws:
    InterruptedException
    BrokenBarrierException

    await

    public int await(long timeout,
                     TimeUnit unit)
              throws InterruptedException,
                     BrokenBarrierException,
                     TimeoutException
    Overrides:
    await in class CyclicBarrier
    Throws:
    InterruptedException
    BrokenBarrierException
    TimeoutException

    reset

    public void reset(Throwable cause)
    Resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException. Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to re-synchronize in some other way, and choose one to perform the reset. It may be preferable to instead create a new barrier for subsequent use.

    Parameters:
    cause - The cause of the BrokenBarrierException

    isBroken

    public boolean isBroken()
    Queries if this barrier is in a broken state. Note that if reset(Throwable) is invoked the barrier will remain broken, while CyclicBarrier.reset() will reset the barrier to its initial state and isBroken() will return false.

    Overrides:
    isBroken in class CyclicBarrier
    Returns:
    true if one or more parties broke out of this barrier due to interruption or timeout since construction or the last reset, or a barrier action failed due to an exception; false otherwise.
    See Also:
    inspect()

    inspect

    public void inspect()
                 throws BrokenBarrierException
    Inspects if the barrier is broken. If for any reason, the barrier was broken, a BrokenBarrierException will be thrown. Otherwise, would return gracefully.

    Throws:
    BrokenBarrierException - With a nested broken cause.


    Copyright © 2009-2012. All Rights Reserved.