public final class PendingMessageQueue extends Object
To reduce thread contention the queue uses a set of integer values to track the position of the ready messages (the last message that is ready to be read), reserve (the first message that can be reserved to be written to), and the take (the next (first) message to be read). For an infinite queue the following invariant holds:
To make handling a limited size queue easier the size of the queue is forced to power of 2 less than 1048576. The roll over can then be handled with a simple mask operation.take < readyBefore <= reserve
Rather than allocate a pending message per request we use an array of pre-allocated PendingMessages and copy the data into and out of the objects. this has a net positive effect on object allocation and garbage collection time at the cost of a longer initialization.
Lastly, This queue assumes there is a single consumer of messages. This is true for the driver's use case but don't copy the code and expect it to work with multiple consumers. The consumer should use the following basic structure:
PendingMessage pm = new
PendingMessage
(); queue.take(pm); // Blocks. // Handle the message. // or if( queue.poll(pm) ) { // Non-blocking. // Handle The Message. }
Warning: This class has been carefully tuned for the driver's use case. Changes should be carefully bench marked and tested. Comments have been embedded in the source indicating attempted changes and reverts. Due to its position in the driver subtle changes in this class can cause large changes in the performance of the driver.
Modifier and Type | Field and Description |
---|---|
static long |
MAX_MESSAGE_ID_MASK
The mask for constraining the size the message id.
|
static int |
MAX_SIZE
The maximum size of the queue.
|
static long |
SPIN_TIME_NS
Amount of time to spin before yielding.
|
Constructor and Description |
---|
PendingMessageQueue(int size,
LockType lockType)
Creates a new PendingMessageQueue.
|
Modifier and Type | Method and Description |
---|---|
int |
capacity()
Returns the size of the queue.
|
void |
drainTo(List<PendingMessage> pending)
Drains the list of pending messages into the provided list.
|
protected int |
increment(int index)
Increments the index handling roll-over.
|
boolean |
isEmpty()
Returns true if the queue is empty.
|
protected void |
markReady(int index)
Marks the position as ready by incrementing the ready position to the
provided position.
|
protected void |
markReady2(int index)
Marks the position and the next position as ready by incrementing the
ready position to the provided position + 1.
|
protected void |
notifyWaiters(boolean all)
Notifies the waiting threads that the state of the queue has changed.
|
protected int |
offer()
Checks if there is remove for another message.
|
boolean |
offer(PendingMessage pendingMessage)
Puts a message onto the queue.
|
protected int |
offer2()
Checks if there is remove for another two message.
|
boolean |
poll(PendingMessage copyOut)
Returns the next message from the queue without blocking.
|
void |
put(Message message,
Callback<Reply> replyCallback)
Puts a message onto the queue.
|
void |
put(Message message,
Callback<Reply> replyCallback,
Message message2,
Callback<Reply> replyCallback2)
Puts two messages onto the queue.
|
void |
put(PendingMessage pendingMessage)
Puts a message onto the queue.
|
int |
size()
Returns the number of messages in the queue.
|
void |
take(PendingMessage copyOut)
Returns the next message from the queue and will block waiting for a
message.
|
public static final long MAX_MESSAGE_ID_MASK
public static final int MAX_SIZE
public static final long SPIN_TIME_NS
public PendingMessageQueue(int size, LockType lockType)
size
- The size of the queue to create.lockType
- The lock type to use with the queue.public int capacity()
public void drainTo(List<PendingMessage> pending)
pending
- The list to add all of the pending messages to.public boolean isEmpty()
public boolean offer(PendingMessage pendingMessage)
pendingMessage
- The message to add.public boolean poll(PendingMessage copyOut)
PendingMessage pm = new PendingMessage();
if( queue.poll(pm) } {
// Handle the message copied into pm.
}
copyOut
- The PendingMessage
to copy the pending message into.public void put(Message message, Callback<Reply> replyCallback) throws InterruptedException
message
- The message to add.replyCallback
- The callback for the message to add.InterruptedException
- If the thread is interrupted while waiting for the message.
If thrown the message will not have been enqueued.public void put(Message message, Callback<Reply> replyCallback, Message message2, Callback<Reply> replyCallback2) throws InterruptedException
message
- The first message to add.replyCallback
- The callback for the first message to add.message2
- The second message to add.replyCallback2
- The callback for the second message to add.InterruptedException
- If the thread is interrupted while waiting for the message.
If thrown neither message will have been enqueued.public void put(PendingMessage pendingMessage) throws InterruptedException
pendingMessage
- The message to add.InterruptedException
- If the thread is interrupted while waiting for the message.
If thrown the message will not have been enqueued.public int size()
public void take(PendingMessage copyOut) throws InterruptedException
copyOut
- The PendingMessage
to copy the pending message into.InterruptedException
- If the thread is interrupted while waiting for the message.protected int increment(int index)
index
- The value to increment.protected void markReady(int index)
index
.index
- The index of the ready message.protected void markReady2(int index)
index
.index
- The index of the ready message.protected void notifyWaiters(boolean all)
all
- If true then all threads will be woken. Otherwise only a
single thread is woken.protected int offer()
protected int offer2()
Copyright © 2011-2013 Allanbank Consulting, Inc.. All Rights Reserved.