class FSHLog.RingBufferEventHandler extends Object implements com.lmax.disruptor.EventHandler<RingBufferTruck>, com.lmax.disruptor.LifecycleAware
Herein, we have an array into which we store the sync futures as they come in. When we
have a 'batch', we'll then pass what we have collected to a SyncRunner thread to do the
filesystem sync. When it completes, it will then call
SyncFuture.done(long, Throwable)
on each of SyncFutures in the batch to release
blocked Handler threads.
I've tried various effects to try and make latencies low while keeping throughput high. I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the syncs coming and having sync runner threads poll off the head to 'finish' completed SyncFutures. I've tried linkedlist, and various from concurrent utils whether LinkedBlockingQueue or ArrayBlockingQueue, etc. The more points of synchronization, the more 'work' (according to 'perf stats') that has to be done; small increases in stall percentages seem to have a big impact on throughput/latencies. The below model where we have an array into which we stash the syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 for more detail.
Modifier and Type | Field and Description |
---|---|
private Exception |
exception
Set if we get an exception appending or syncing so that all subsequence appends and syncs
on this WAL fail until WAL is replaced.
|
private Object |
safePointWaiter
Object to block on while waiting on safe point.
|
private boolean |
shutdown |
private SyncFuture[] |
syncFutures |
private int |
syncFuturesCount |
private int |
syncRunnerIndex
Which syncrunner to use next.
|
private FSHLog.SyncRunner[] |
syncRunners |
private FSHLog.SafePointZigZagLatch |
zigzagLatch |
Constructor and Description |
---|
FSHLog.RingBufferEventHandler(int syncRunnerCount,
int maxHandlersCount) |
Modifier and Type | Method and Description |
---|---|
(package private) void |
append(FSWALEntry entry)
Append to the WAL.
|
(package private) FSHLog.SafePointZigZagLatch |
attainSafePoint() |
private void |
attainSafePoint(long currentSequence)
Check if we should attain safe point.
|
private void |
cleanupOutstandingSyncsOnException(long sequence,
Exception e) |
private boolean |
isOutstandingSyncs() |
private boolean |
isOutstandingSyncsFromRunners() |
void |
onEvent(RingBufferTruck truck,
long sequence,
boolean endOfBatch) |
void |
onShutdown() |
void |
onStart() |
private final FSHLog.SyncRunner[] syncRunners
private final SyncFuture[] syncFutures
private volatile int syncFuturesCount
private volatile FSHLog.SafePointZigZagLatch zigzagLatch
private Exception exception
private final Object safePointWaiter
private volatile boolean shutdown
private int syncRunnerIndex
FSHLog.RingBufferEventHandler(int syncRunnerCount, int maxHandlersCount)
private void cleanupOutstandingSyncsOnException(long sequence, Exception e)
private boolean isOutstandingSyncs()
private boolean isOutstandingSyncsFromRunners()
public void onEvent(RingBufferTruck truck, long sequence, boolean endOfBatch) throws Exception
onEvent
in interface com.lmax.disruptor.EventHandler<RingBufferTruck>
Exception
FSHLog.SafePointZigZagLatch attainSafePoint()
private void attainSafePoint(long currentSequence)
void append(FSWALEntry entry) throws Exception
entry
- Exception
public void onStart()
onStart
in interface com.lmax.disruptor.LifecycleAware
public void onShutdown()
onShutdown
in interface com.lmax.disruptor.LifecycleAware
Copyright © 2007–2019 The Apache Software Foundation. All rights reserved.