class FSHLog.RingBufferEventHandler extends Object implements com.lmax.disruptor.EventHandler<RingBufferTruck>, com.lmax.disruptor.LifecycleAware
Herein, we have an array into which we store the sync futures as they come in. When we have a
'batch', we'll then pass what we have collected to a SyncRunner thread to do the filesystem
sync. When it completes, it will then call SyncFuture.done(long, Throwable)
on each of
SyncFutures in the batch to release blocked Handler threads.
I've tried various effects to try and make latencies low while keeping throughput high. I've tried keeping a single Queue of SyncFutures in this class appending to its tail as the syncs coming and having sync runner threads poll off the head to 'finish' completed SyncFutures. I've tried linkedlist, and various from concurrent utils whether LinkedBlockingQueue or ArrayBlockingQueue, etc. The more points of synchronization, the more 'work' (according to 'perf stats') that has to be done; small increases in stall percentages seem to have a big impact on throughput/latencies. The below model where we have an array into which we stash the syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 for more detail.
Modifier and Type | Field and Description |
---|---|
private Exception |
exception
Set if we get an exception appending or syncing so that all subsequence appends and syncs on
this WAL fail until WAL is replaced.
|
private Object |
safePointWaiter
Object to block on while waiting on safe point.
|
private boolean |
shutdown |
private SyncFuture[] |
syncFutures |
private AtomicInteger |
syncFuturesCount |
private int |
syncRunnerIndex
Which syncrunner to use next.
|
private FSHLog.SyncRunner[] |
syncRunners |
private FSHLog.SafePointZigZagLatch |
zigzagLatch |
Constructor and Description |
---|
RingBufferEventHandler(int syncRunnerCount,
int maxBatchCount) |
Modifier and Type | Method and Description |
---|---|
(package private) void |
append(FSWALEntry entry)
Append to the WAL.
|
(package private) FSHLog.SafePointZigZagLatch |
attainSafePoint() |
private void |
attainSafePoint(long currentSequence)
Check if we should attain safe point.
|
private void |
cleanupOutstandingSyncsOnException(long sequence,
Exception e) |
private boolean |
isOutstandingSyncs()
Returns True if outstanding sync futures still
|
private boolean |
isOutstandingSyncsFromRunners() |
private void |
offerDoneSyncsBackToCache()
Offers the finished syncs back to the cache for reuse.
|
void |
onEvent(RingBufferTruck truck,
long sequence,
boolean endOfBatch) |
void |
onShutdown() |
void |
onStart() |
private final FSHLog.SyncRunner[] syncRunners
private final SyncFuture[] syncFutures
private AtomicInteger syncFuturesCount
private volatile FSHLog.SafePointZigZagLatch zigzagLatch
private Exception exception
private final Object safePointWaiter
private volatile boolean shutdown
private int syncRunnerIndex
RingBufferEventHandler(int syncRunnerCount, int maxBatchCount)
private void cleanupOutstandingSyncsOnException(long sequence, Exception e)
private void offerDoneSyncsBackToCache()
private boolean isOutstandingSyncs()
private boolean isOutstandingSyncsFromRunners()
public void onEvent(RingBufferTruck truck, long sequence, boolean endOfBatch) throws Exception
onEvent
in interface com.lmax.disruptor.EventHandler<RingBufferTruck>
Exception
FSHLog.SafePointZigZagLatch attainSafePoint()
private void attainSafePoint(long currentSequence)
void append(FSWALEntry entry) throws Exception
Exception
public void onStart()
onStart
in interface com.lmax.disruptor.LifecycleAware
public void onShutdown()
onShutdown
in interface com.lmax.disruptor.LifecycleAware
Copyright © 2007–2020 The Apache Software Foundation. All rights reserved.