Class WaitFreeQueue

java.lang.Object
EDU.oswego.cs.dl.util.concurrent.WaitFreeQueue
All Implemented Interfaces:
Channel, Puttable, Takable

public class WaitFreeQueue extends Object implements Channel
A wait-free linked list based queue implementation.

While this class conforms to the full Channel interface, only the put and poll methods are useful in most applications. Because the queue does not support blocking operations, take relies on spin-loops, which can be extremely wasteful.

This class is adapted from the algorithm described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott. This implementation is not strictly wait-free since it relies on locking for basic atomicity and visibility requirements. Locks can impose unbounded waits, although this should not be a major practical concern here since each lock is held for the duration of only a few statements. (However, the overhead of using so many locks can make it less attractive than other Channel implementations on JVMs where locking operations are very slow.)

See Also:
  • Field Details

    • tail

      protected volatile WaitFreeQueue.Node tail
      Pointer to last node on list
    • tailLock

      protected final Object tailLock
      Lock for simulating CAS for tail field
  • Constructor Details

    • WaitFreeQueue

      public WaitFreeQueue()
  • Method Details

    • CASHead

      protected boolean CASHead(WaitFreeQueue.Node oldHead, WaitFreeQueue.Node newHead)
      Simulate CAS for head field, using 'this' lock
    • CASTail

      protected boolean CASTail(WaitFreeQueue.Node oldTail, WaitFreeQueue.Node newTail)
      Simulate CAS for tail field
    • put

      public void put(Object x) throws InterruptedException
      Description copied from interface: Channel
      Place item in the channel, possibly waiting indefinitely until it can be accepted. Channels implementing the BoundedChannel subinterface are generally guaranteed to block on puts upon reaching capacity, but other implementations may or may not block.
      Specified by:
      put in interface Channel
      Specified by:
      put in interface Puttable
      Parameters:
      x - the element to be inserted. Should be non-null.
      Throws:
      InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case the element is guaranteed not to be inserted. Otherwise, on normal return, the element is guaranteed to have been inserted.
    • offer

      public boolean offer(Object x, long msecs) throws InterruptedException
      Description copied from interface: Channel
      Place item in channel only if it can be accepted within msecs milliseconds. The time bound is interpreted in a coarse-grained, best-effort fashion.
      Specified by:
      offer in interface Channel
      Specified by:
      offer in interface Puttable
      Parameters:
      x - the element to be inserted. Should be non-null.
      msecs - the number of milliseconds to wait. If less than or equal to zero, the method does not perform any timed waits, but might still require access to a synchronization lock, which can impose unbounded delay if there is a lot of contention for the channel.
      Returns:
      true if accepted, else false
      Throws:
      InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case the element is guaranteed not to be inserted (i.e., is equivalent to a false return).
    • extract

      protected Object extract() throws InterruptedException
      Main dequeue algorithm, called by poll, take.
      Throws:
      InterruptedException
    • peek

      public Object peek()
      Description copied from interface: Channel
      Return, but do not remove object at head of Channel, or null if it is empty.
      Specified by:
      peek in interface Channel
    • take

      public Object take() throws InterruptedException
      Spin until poll returns a non-null value. You probably don't want to call this method. A Thread.sleep(0) is performed on each iteration as a heuristic to reduce contention. If you would rather use, for example, an exponential backoff, you could manually set this up using poll.
      Specified by:
      take in interface Channel
      Specified by:
      take in interface Takable
      Returns:
      some item from the channel. Different implementations may guarantee various properties (such as FIFO) about that item
      Throws:
      InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case state of the channel is unchanged.
    • poll

      public Object poll(long msecs) throws InterruptedException
      Spin until poll returns a non-null value or time elapses. if msecs is positive, a Thread.sleep(0) is performed on each iteration as a heuristic to reduce contention.
      Specified by:
      poll in interface Channel
      Specified by:
      poll in interface Takable
      Parameters:
      msecs - the number of milliseconds to wait. If less than or equal to zero, the operation does not perform any timed waits, but might still require access to a synchronization lock, which can impose unbounded delay if there is a lot of contention for the channel.
      Returns:
      some item, or null if the channel is empty.
      Throws:
      InterruptedException - if the current thread has been interrupted at a point at which interruption is detected, in which case state of the channel is unchanged (i.e., equivalent to a null return).