001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import com.lmax.disruptor.BlockingWaitStrategy; 021import com.lmax.disruptor.EventHandler; 022import com.lmax.disruptor.ExceptionHandler; 023import com.lmax.disruptor.LifecycleAware; 024import com.lmax.disruptor.TimeoutException; 025import com.lmax.disruptor.dsl.Disruptor; 026import com.lmax.disruptor.dsl.ProducerType; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.util.Arrays; 030import java.util.List; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.LinkedBlockingQueue; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.trace.TraceUtil; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.ClassSize; 045import org.apache.hadoop.hbase.util.FSUtils; 046import org.apache.hadoop.hbase.util.HasThread; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.hbase.wal.FSHLogProvider; 049import org.apache.hadoop.hbase.wal.WALEdit; 050import org.apache.hadoop.hbase.wal.WALKeyImpl; 051import org.apache.hadoop.hbase.wal.WALProvider.Writer; 052import org.apache.hadoop.hdfs.DFSOutputStream; 053import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 054import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 055import org.apache.htrace.core.TraceScope; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 061 062/** 063 * The default implementation of FSWAL. 064 */ 065@InterfaceAudience.Private 066public class FSHLog extends AbstractFSWAL<Writer> { 067 // IMPLEMENTATION NOTES: 068 // 069 // At the core is a ring buffer. Our ring buffer is the LMAX Disruptor. It tries to 070 // minimize synchronizations and volatile writes when multiple contending threads as is the case 071 // here appending and syncing on a single WAL. The Disruptor is configured to handle multiple 072 // producers but it has one consumer only (the producers in HBase are IPC Handlers calling append 073 // and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer. 074 // When a handler calls sync, it is given back a future. The producer 'blocks' on the future so 075 // it does not return until the sync completes. The future is passed over the ring buffer from 076 // the producer/handler to the consumer thread where it does its best to batch up the producer 077 // syncs so one WAL sync actually spans multiple producer sync invocations. How well the 078 // batching works depends on the write rate; i.e. we tend to batch more in times of 079 // high writes/syncs. 080 // 081 // Calls to append now also wait until the append has been done on the consumer side of the 082 // disruptor. We used to not wait but it makes the implementation easier to grok if we have 083 // the region edit/sequence id after the append returns. 084 // 085 // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend 086 // once only? Probably hard given syncs take way longer than an append. 087 // 088 // The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion 089 // to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the 090 // WAL). The consumer thread passes the futures to the sync threads for it to complete 091 // the futures when done. 092 // 093 // The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It 094 // acts as a sort-of transaction id. It is always incrementing. 095 // 096 // The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that 097 // do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a 098 // synchronization class used to halt the consumer at a safe point -- just after all outstanding 099 // syncs and appends have completed -- so the log roller can swap the WAL out under it. 100 // 101 // We use ring buffer sequence as txid of FSWALEntry and SyncFuture. 102 private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class); 103 104 /** 105 * The nexus at which all incoming handlers meet. Does appends and sync with an ordering. Appends 106 * and syncs are each put on the ring which means handlers need to smash up against the ring twice 107 * (can we make it once only? ... maybe not since time to append is so different from time to sync 108 * and sometimes we don't want to sync or we want to async the sync). The ring is where we make 109 * sure of our ordering and it is also where we do batching up of handler sync calls. 110 */ 111 private final Disruptor<RingBufferTruck> disruptor; 112 113 /** 114 * This fellow is run by the above appendExecutor service but it is all about batching up appends 115 * and syncs; it may shutdown without cleaning out the last few appends or syncs. To guard against 116 * this, keep a reference to this handler and do explicit close on way out to make sure all 117 * flushed out before we exit. 118 */ 119 private final RingBufferEventHandler ringBufferEventHandler; 120 121 /** 122 * FSDataOutputStream associated with the current SequenceFile.writer 123 */ 124 private FSDataOutputStream hdfs_out; 125 126 // All about log rolling if not enough replicas outstanding. 127 128 // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered 129 private final int minTolerableReplication; 130 131 // If live datanode count is lower than the default replicas value, 132 // RollWriter will be triggered in each sync(So the RollWriter will be 133 // triggered one by one in a short time). Using it as a workaround to slow 134 // down the roll frequency triggered by checkLowReplication(). 135 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); 136 137 private final int lowReplicationRollLimit; 138 139 // If consecutiveLogRolls is larger than lowReplicationRollLimit, 140 // then disable the rolling in checkLowReplication(). 141 // Enable it if the replications recover. 142 private volatile boolean lowReplicationRollEnabled = true; 143 144 /** Number of log close errors tolerated before we abort */ 145 private final int closeErrorsTolerated; 146 147 private final AtomicInteger closeErrorCount = new AtomicInteger(); 148 149 /** 150 * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs 151 * using our logger instead of java native logger. 152 */ 153 static class RingBufferExceptionHandler implements ExceptionHandler<RingBufferTruck> { 154 155 @Override 156 public void handleEventException(Throwable ex, long sequence, RingBufferTruck event) { 157 LOG.error("Sequence=" + sequence + ", event=" + event, ex); 158 throw new RuntimeException(ex); 159 } 160 161 @Override 162 public void handleOnStartException(Throwable ex) { 163 LOG.error(ex.toString(), ex); 164 throw new RuntimeException(ex); 165 } 166 167 @Override 168 public void handleOnShutdownException(Throwable ex) { 169 LOG.error(ex.toString(), ex); 170 throw new RuntimeException(ex); 171 } 172 } 173 174 /** 175 * Constructor. 176 * @param fs filesystem handle 177 * @param root path for stored and archived wals 178 * @param logDir dir where wals are stored 179 * @param conf configuration to use 180 */ 181 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) 182 throws IOException { 183 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); 184 } 185 186 /** 187 * Create an edit log at the given <code>dir</code> location. You should never have to load an 188 * existing log. If there is a log at startup, it should have already been processed and deleted 189 * by the time the WAL object is started up. 190 * @param fs filesystem handle 191 * @param rootDir path to where logs and oldlogs 192 * @param logDir dir where wals are stored 193 * @param archiveDir dir where wals are archived 194 * @param conf configuration to use 195 * @param listeners Listeners on WAL events. Listeners passed here will be registered before we do 196 * anything else; e.g. the Constructor {@link #rollWriter()}. 197 * @param failIfWALExists If true IOException will be thrown if files related to this wal already 198 * exist. 199 * @param prefix should always be hostname and port in distributed env and it will be URL encoded 200 * before being used. If prefix is null, "wal" will be used 201 * @param suffix will be url encoded. null is treated as empty. non-empty must start with 202 * {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} 203 */ 204 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, 205 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 206 final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { 207 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 208 this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", 209 FSUtils.getDefaultReplication(fs, this.walDir)); 210 this.lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 211 5); 212 this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2); 213 214 // rollWriter sets this.hdfs_out if it can. 215 rollWriter(); 216 217 // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is 218 // put on the ring buffer. 219 String hostingThreadName = Thread.currentThread().getName(); 220 // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense 221 // spinning as other strategies do. 222 this.disruptor = new Disruptor<>(RingBufferTruck::new, 223 getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"), 224 ProducerType.MULTI, new BlockingWaitStrategy()); 225 // Advance the ring buffer sequence so that it starts from 1 instead of 0, 226 // because SyncFuture.NOT_DONE = 0. 227 this.disruptor.getRingBuffer().next(); 228 int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); 229 this.ringBufferEventHandler = new RingBufferEventHandler( 230 conf.getInt("hbase.regionserver.hlog.syncer.count", 5), maxHandlersCount); 231 this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler()); 232 this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler }); 233 // Starting up threads in constructor is a no no; Interface should have an init call. 234 this.disruptor.start(); 235 } 236 237 /** 238 * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate the 239 * default behavior (such as setting the maxRecoveryErrorCount value). This is 240 * done using reflection on the underlying HDFS OutputStream. NOTE: This could be removed once Hadoop1 support is 241 * removed. 242 * @return null if underlying stream is not ready. 243 */ 244 @VisibleForTesting 245 OutputStream getOutputStream() { 246 FSDataOutputStream fsdos = this.hdfs_out; 247 return fsdos != null ? fsdos.getWrappedStream() : null; 248 } 249 250 /** 251 * Run a sync after opening to set up the pipeline. 252 */ 253 private void preemptiveSync(final ProtobufLogWriter nextWriter) { 254 long startTimeNanos = System.nanoTime(); 255 try { 256 nextWriter.sync(); 257 postSync(System.nanoTime() - startTimeNanos, 0); 258 } catch (IOException e) { 259 // optimization failed, no need to abort here. 260 LOG.warn("pre-sync failed but an optimization so keep going", e); 261 } 262 } 263 264 /** 265 * This method allows subclasses to inject different writers without having to extend other 266 * methods like rollWriter(). 267 * @return Writer instance 268 */ 269 @Override 270 protected Writer createWriterInstance(final Path path) throws IOException { 271 Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); 272 if (writer instanceof ProtobufLogWriter) { 273 preemptiveSync((ProtobufLogWriter) writer); 274 } 275 return writer; 276 } 277 278 /** 279 * Used to manufacture race condition reliably. For testing only. 280 * @see #beforeWaitOnSafePoint() 281 */ 282 @VisibleForTesting 283 protected void afterCreatingZigZagLatch() { 284 } 285 286 /** 287 * @see #afterCreatingZigZagLatch() 288 */ 289 @VisibleForTesting 290 protected void beforeWaitOnSafePoint() { 291 }; 292 293 @Override 294 protected void doAppend(Writer writer, FSWALEntry entry) throws IOException { 295 writer.append(entry); 296 } 297 298 @Override 299 protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException { 300 // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer 301 // thread will eventually pause. An error hereafter needs to release the writer thread 302 // regardless -- hence the finally block below. Note, this method is called from the FSHLog 303 // constructor BEFORE the ring buffer is set running so it is null on first time through 304 // here; allow for that. 305 SyncFuture syncFuture = null; 306 SafePointZigZagLatch zigzagLatch = null; 307 long sequence = -1L; 308 if (this.ringBufferEventHandler != null) { 309 // Get sequence first to avoid dead lock when ring buffer is full 310 // Considering below sequence 311 // 1. replaceWriter is called and zigzagLatch is initialized 312 // 2. ringBufferEventHandler#onEvent is called and arrives at #attainSafePoint(long) then wait 313 // on safePointReleasedLatch 314 // 3. Since ring buffer is full, if we get sequence when publish sync, the replaceWriter 315 // thread will wait for the ring buffer to be consumed, but the only consumer is waiting 316 // replaceWriter thread to release safePointReleasedLatch, which causes a deadlock 317 sequence = getSequenceOnRingBuffer(); 318 zigzagLatch = this.ringBufferEventHandler.attainSafePoint(); 319 } 320 afterCreatingZigZagLatch(); 321 try { 322 // Wait on the safe point to be achieved. Send in a sync in case nothing has hit the 323 // ring buffer between the above notification of writer that we want it to go to 324 // 'safe point' and then here where we are waiting on it to attain safe point. Use 325 // 'sendSync' instead of 'sync' because we do not want this thread to block waiting on it 326 // to come back. Cleanup this syncFuture down below after we are ready to run again. 327 try { 328 if (zigzagLatch != null) { 329 // use assert to make sure no change breaks the logic that 330 // sequence and zigzagLatch will be set together 331 assert sequence > 0L : "Failed to get sequence from ring buffer"; 332 TraceUtil.addTimelineAnnotation("awaiting safepoint"); 333 syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence)); 334 } 335 } catch (FailedSyncBeforeLogCloseException e) { 336 // If unflushed/unsynced entries on close, it is reason to abort. 337 if (isUnflushedEntries()) { 338 throw e; 339 } 340 LOG.warn( 341 "Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage()); 342 } 343 long oldFileLen = 0L; 344 // It is at the safe point. Swap out writer from under the blocked writer thread. 345 // TODO: This is close is inline with critical section. Should happen in background? 346 if (this.writer != null) { 347 oldFileLen = this.writer.getLength(); 348 try { 349 TraceUtil.addTimelineAnnotation("closing writer"); 350 this.writer.close(); 351 TraceUtil.addTimelineAnnotation("writer closed"); 352 this.closeErrorCount.set(0); 353 } catch (IOException ioe) { 354 int errors = closeErrorCount.incrementAndGet(); 355 if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) { 356 LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" + ioe.getMessage() 357 + "\", errors=" + errors 358 + "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK"); 359 } else { 360 throw ioe; 361 } 362 } 363 } 364 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 365 this.writer = nextWriter; 366 if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) { 367 this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream(); 368 } else { 369 this.hdfs_out = null; 370 } 371 } catch (InterruptedException ie) { 372 // Perpetuate the interrupt 373 Thread.currentThread().interrupt(); 374 } catch (IOException e) { 375 long count = getUnflushedEntriesCount(); 376 LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e); 377 throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e); 378 } finally { 379 // Let the writer thread go regardless, whether error or not. 380 if (zigzagLatch != null) { 381 // Reset rollRequested status 382 rollRequested.set(false); 383 zigzagLatch.releaseSafePoint(); 384 // syncFuture will be null if we failed our wait on safe point above. Otherwise, if 385 // latch was obtained successfully, the sync we threw in either trigger the latch or it 386 // got stamped with an exception because the WAL was damaged and we could not sync. Now 387 // the write pipeline has been opened up again by releasing the safe point, process the 388 // syncFuture we got above. This is probably a noop but it may be stale exception from 389 // when old WAL was in place. Catch it if so. 390 if (syncFuture != null) { 391 try { 392 blockOnSync(syncFuture); 393 } catch (IOException ioe) { 394 if (LOG.isTraceEnabled()) { 395 LOG.trace("Stale sync exception", ioe); 396 } 397 } 398 } 399 } 400 } 401 } 402 403 @Override 404 protected void doShutdown() throws IOException { 405 // Shutdown the disruptor. Will stop after all entries have been processed. Make sure we 406 // have stopped incoming appends before calling this else it will not shutdown. We are 407 // conservative below waiting a long time and if not elapsed, then halting. 408 if (this.disruptor != null) { 409 long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000); 410 try { 411 this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS); 412 } catch (TimeoutException e) { 413 LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " 414 + "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)"); 415 this.disruptor.halt(); 416 this.disruptor.shutdown(); 417 } 418 } 419 420 if (LOG.isDebugEnabled()) { 421 LOG.debug("Closing WAL writer in " + FSUtils.getPath(walDir)); 422 } 423 if (this.writer != null) { 424 this.writer.close(); 425 this.writer = null; 426 } 427 } 428 429 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", 430 justification = "Will never be null") 431 @Override 432 public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, 433 final boolean inMemstore) throws IOException { 434 return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, 435 disruptor.getRingBuffer()); 436 } 437 438 /** 439 * Thread to runs the hdfs sync call. This call takes a while to complete. This is the longest 440 * pole adding edits to the WAL and this must complete to be sure all edits persisted. We run 441 * multiple threads sync'ng rather than one that just syncs in series so we have better latencies; 442 * otherwise, an edit that arrived just after a sync started, might have to wait almost the length 443 * of two sync invocations before it is marked done. 444 * <p> 445 * When the sync completes, it marks all the passed in futures done. On the other end of the sync 446 * future is a blocked thread, usually a regionserver Handler. There may be more than one future 447 * passed in the case where a few threads arrive at about the same time and all invoke 'sync'. In 448 * this case we'll batch up the invocations and run one filesystem sync only for a batch of 449 * Handler sync invocations. Do not confuse these Handler SyncFutures with the futures an 450 * ExecutorService returns when you call submit. We have no use for these in this model. These 451 * SyncFutures are 'artificial', something to hold the Handler until the filesystem sync 452 * completes. 453 */ 454 private class SyncRunner extends HasThread { 455 private volatile long sequence; 456 // Keep around last exception thrown. Clear on successful sync. 457 private final BlockingQueue<SyncFuture> syncFutures; 458 private volatile SyncFuture takeSyncFuture = null; 459 460 SyncRunner(final String name, final int maxHandlersCount) { 461 super(name); 462 // LinkedBlockingQueue because of 463 // http://www.javacodegeeks.com/2010/09/java-best-practices-queue-battle-and.html 464 // Could use other blockingqueues here or concurrent queues. 465 // 466 // We could let the capacity be 'open' but bound it so we get alerted in pathological case 467 // where we cannot sync and we have a bunch of threads all backed up waiting on their syncs 468 // to come in. LinkedBlockingQueue actually shrinks when you remove elements so Q should 469 // stay neat and tidy in usual case. Let the max size be three times the maximum handlers. 470 // The passed in maxHandlerCount is the user-level handlers which is what we put up most of 471 // but HBase has other handlers running too -- opening region handlers which want to write 472 // the meta table when succesful (i.e. sync), closing handlers -- etc. These are usually 473 // much fewer in number than the user-space handlers so Q-size should be user handlers plus 474 // some space for these other handlers. Lets multiply by 3 for good-measure. 475 this.syncFutures = new LinkedBlockingQueue<>(maxHandlersCount * 3); 476 } 477 478 void offer(final long sequence, final SyncFuture[] syncFutures, final int syncFutureCount) { 479 // Set sequence first because the add to the queue will wake the thread if sleeping. 480 this.sequence = sequence; 481 for (int i = 0; i < syncFutureCount; ++i) { 482 this.syncFutures.add(syncFutures[i]); 483 } 484 } 485 486 /** 487 * Release the passed <code>syncFuture</code> 488 * @return Returns 1. 489 */ 490 private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence, 491 final Throwable t) { 492 if (!syncFuture.done(currentSequence, t)) { 493 throw new IllegalStateException(); 494 } 495 496 // This function releases one sync future only. 497 return 1; 498 } 499 500 /** 501 * Release all SyncFutures whose sequence is <= <code>currentSequence</code>. 502 * @param t May be non-null if we are processing SyncFutures because an exception was thrown. 503 * @return Count of SyncFutures we let go. 504 */ 505 private int releaseSyncFutures(final long currentSequence, final Throwable t) { 506 int syncCount = 0; 507 for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) { 508 if (syncFuture.getTxid() > currentSequence) { 509 break; 510 } 511 releaseSyncFuture(syncFuture, currentSequence, t); 512 if (!this.syncFutures.remove(syncFuture)) { 513 throw new IllegalStateException(syncFuture.toString()); 514 } 515 syncCount++; 516 } 517 return syncCount; 518 } 519 520 /** 521 * @param sequence The sequence we ran the filesystem sync against. 522 * @return Current highest synced sequence. 523 */ 524 private long updateHighestSyncedSequence(long sequence) { 525 long currentHighestSyncedSequence; 526 // Set the highestSyncedSequence IFF our current sequence id is the 'highest'. 527 do { 528 currentHighestSyncedSequence = highestSyncedTxid.get(); 529 if (currentHighestSyncedSequence >= sequence) { 530 // Set the sync number to current highwater mark; might be able to let go more 531 // queued sync futures 532 sequence = currentHighestSyncedSequence; 533 break; 534 } 535 } while (!highestSyncedTxid.compareAndSet(currentHighestSyncedSequence, sequence)); 536 return sequence; 537 } 538 539 boolean areSyncFuturesReleased() { 540 // check whether there is no sync futures offered, and no in-flight sync futures that is being 541 // processed. 542 return syncFutures.size() <= 0 543 && takeSyncFuture == null; 544 } 545 546 @Override 547 public void run() { 548 long currentSequence; 549 while (!isInterrupted()) { 550 int syncCount = 0; 551 552 try { 553 while (true) { 554 takeSyncFuture = null; 555 // We have to process what we 'take' from the queue 556 takeSyncFuture = this.syncFutures.take(); 557 currentSequence = this.sequence; 558 long syncFutureSequence = takeSyncFuture.getTxid(); 559 if (syncFutureSequence > currentSequence) { 560 throw new IllegalStateException("currentSequence=" + currentSequence 561 + ", syncFutureSequence=" + syncFutureSequence); 562 } 563 // See if we can process any syncfutures BEFORE we go sync. 564 long currentHighestSyncedSequence = highestSyncedTxid.get(); 565 if (currentSequence < currentHighestSyncedSequence) { 566 syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null); 567 // Done with the 'take'. Go around again and do a new 'take'. 568 continue; 569 } 570 break; 571 } 572 // I got something. Lets run. Save off current sequence number in case it changes 573 // while we run. 574 //TODO handle htrace API change, see HBASE-18895 575 //TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan()); 576 long start = System.nanoTime(); 577 Throwable lastException = null; 578 try { 579 TraceUtil.addTimelineAnnotation("syncing writer"); 580 writer.sync(); 581 TraceUtil.addTimelineAnnotation("writer synced"); 582 currentSequence = updateHighestSyncedSequence(currentSequence); 583 } catch (IOException e) { 584 LOG.error("Error syncing, request close of WAL", e); 585 lastException = e; 586 } catch (Exception e) { 587 LOG.warn("UNEXPECTED", e); 588 lastException = e; 589 } finally { 590 // reattach the span to the future before releasing. 591 //TODO handle htrace API change, see HBASE-18895 592 // takeSyncFuture.setSpan(scope.getSpan()); 593 // First release what we 'took' from the queue. 594 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException); 595 // Can we release other syncs? 596 syncCount += releaseSyncFutures(currentSequence, lastException); 597 if (lastException != null) { 598 requestLogRoll(); 599 } else { 600 checkLogRoll(); 601 } 602 } 603 postSync(System.nanoTime() - start, syncCount); 604 } catch (InterruptedException e) { 605 // Presume legit interrupt. 606 Thread.currentThread().interrupt(); 607 } catch (Throwable t) { 608 LOG.warn("UNEXPECTED, continuing", t); 609 } 610 } 611 } 612 } 613 614 /** 615 * Schedule a log roll if needed. 616 */ 617 private void checkLogRoll() { 618 // Will return immediately if we are in the middle of a WAL log roll currently. 619 if (!rollWriterLock.tryLock()) { 620 return; 621 } 622 boolean lowReplication; 623 try { 624 lowReplication = doCheckLogLowReplication(); 625 } finally { 626 rollWriterLock.unlock(); 627 } 628 if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { 629 requestLogRoll(lowReplication); 630 } 631 } 632 633 /** 634 * @return true if number of replicas for the WAL is lower than threshold 635 */ 636 @Override 637 protected boolean doCheckLogLowReplication() { 638 boolean logRollNeeded = false; 639 // if the number of replicas in HDFS has fallen below the configured 640 // value, then roll logs. 641 try { 642 int numCurrentReplicas = getLogReplication(); 643 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { 644 if (this.lowReplicationRollEnabled) { 645 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { 646 LOG.warn("HDFS pipeline error detected. " + "Found " + numCurrentReplicas 647 + " replicas but expecting no less than " + this.minTolerableReplication 648 + " replicas. " + " Requesting close of WAL. current pipeline: " 649 + Arrays.toString(getPipeline())); 650 logRollNeeded = true; 651 // If rollWriter is requested, increase consecutiveLogRolls. Once it 652 // is larger than lowReplicationRollLimit, disable the 653 // LowReplication-Roller 654 this.consecutiveLogRolls.getAndIncrement(); 655 } else { 656 LOG.warn("Too many consecutive RollWriter requests, it's a sign of " 657 + "the total number of live datanodes is lower than the tolerable replicas."); 658 this.consecutiveLogRolls.set(0); 659 this.lowReplicationRollEnabled = false; 660 } 661 } 662 } else if (numCurrentReplicas >= this.minTolerableReplication) { 663 if (!this.lowReplicationRollEnabled) { 664 // The new writer's log replicas is always the default value. 665 // So we should not enable LowReplication-Roller. If numEntries 666 // is lower than or equals 1, we consider it as a new writer. 667 if (this.numEntries.get() <= 1) { 668 return logRollNeeded; 669 } 670 // Once the live datanode number and the replicas return to normal, 671 // enable the LowReplication-Roller. 672 this.lowReplicationRollEnabled = true; 673 LOG.info("LowReplication-Roller was enabled."); 674 } 675 } 676 } catch (Exception e) { 677 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + ", continuing..."); 678 } 679 return logRollNeeded; 680 } 681 682 @VisibleForTesting 683 protected long getSequenceOnRingBuffer() { 684 return this.disruptor.getRingBuffer().next(); 685 } 686 687 private SyncFuture publishSyncOnRingBuffer() { 688 long sequence = getSequenceOnRingBuffer(); 689 return publishSyncOnRingBuffer(sequence); 690 } 691 692 @VisibleForTesting 693 protected SyncFuture publishSyncOnRingBuffer(long sequence) { 694 // here we use ring buffer sequence as transaction id 695 SyncFuture syncFuture = getSyncFuture(sequence); 696 try { 697 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence); 698 truck.load(syncFuture); 699 } finally { 700 this.disruptor.getRingBuffer().publish(sequence); 701 } 702 return syncFuture; 703 } 704 705 // Sync all known transactions 706 private void publishSyncThenBlockOnCompletion(TraceScope scope) throws IOException { 707 SyncFuture syncFuture = publishSyncOnRingBuffer(); 708 blockOnSync(syncFuture); 709 } 710 711 /** 712 * {@inheritDoc} 713 * <p> 714 * If the pipeline isn't started yet or is empty, you will get the default replication factor. 715 * Therefore, if this function returns 0, it means you are not properly running with the HDFS-826 716 * patch. 717 */ 718 @Override 719 @VisibleForTesting 720 int getLogReplication() { 721 try { 722 // in standalone mode, it will return 0 723 if (this.hdfs_out instanceof HdfsDataOutputStream) { 724 return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication(); 725 } 726 } catch (IOException e) { 727 LOG.info("", e); 728 } 729 return 0; 730 } 731 732 @Override 733 public void sync() throws IOException { 734 try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) { 735 publishSyncThenBlockOnCompletion(scope); 736 } 737 } 738 739 @Override 740 public void sync(long txid) throws IOException { 741 if (this.highestSyncedTxid.get() >= txid) { 742 // Already sync'd. 743 return; 744 } 745 try (TraceScope scope = TraceUtil.createTrace("FSHLog.sync")) { 746 publishSyncThenBlockOnCompletion(scope); 747 } 748 } 749 750 @VisibleForTesting 751 boolean isLowReplicationRollEnabled() { 752 return lowReplicationRollEnabled; 753 } 754 755 public static final long FIXED_OVERHEAD = ClassSize 756 .align(ClassSize.OBJECT + (5 * ClassSize.REFERENCE) + ClassSize.ATOMIC_INTEGER 757 + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG)); 758 759 /** 760 * This class is used coordinating two threads holding one thread at a 'safe point' while the 761 * orchestrating thread does some work that requires the first thread paused: e.g. holding the WAL 762 * writer while its WAL is swapped out from under it by another thread. 763 * <p> 764 * Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until Thread B 765 * gets there. When the 'safe point' has been attained, Thread B signals Thread A. Thread B then 766 * holds at the 'safe point'. Thread A on notification that Thread B is paused, goes ahead and 767 * does the work it needs to do while Thread B is holding. When Thread A is done, it flags B and 768 * then Thread A and Thread B continue along on their merry way. Pause and signalling 'zigzags' 769 * between the two participating threads. We use two latches -- one the inverse of the other -- 770 * pausing and signaling when states are achieved. 771 * <p> 772 * To start up the drama, Thread A creates an instance of this class each time it would do this 773 * zigzag dance and passes it to Thread B (these classes use Latches so it is one shot only). 774 * Thread B notices the new instance (via reading a volatile reference or how ever) and it starts 775 * to work toward the 'safe point'. Thread A calls {@link #waitSafePoint(SyncFuture)} when it cannot proceed 776 * until the Thread B 'safe point' is attained. Thread A will be held inside in 777 * {@link #waitSafePoint(SyncFuture)} until Thread B reaches the 'safe point'. Once there, Thread B frees 778 * Thread A by calling {@link #safePointAttained()}. Thread A now knows Thread B is at the 'safe 779 * point' and that it is holding there (When Thread B calls {@link #safePointAttained()} it blocks 780 * here until Thread A calls {@link #releaseSafePoint()}). Thread A proceeds to do what it needs 781 * to do while Thread B is paused. When finished, it lets Thread B lose by calling 782 * {@link #releaseSafePoint()} and away go both Threads again. 783 */ 784 static class SafePointZigZagLatch { 785 /** 786 * Count down this latch when safe point attained. 787 */ 788 private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1); 789 /** 790 * Latch to wait on. Will be released when we can proceed. 791 */ 792 private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); 793 794 private void checkIfSyncFailed(SyncFuture syncFuture) throws FailedSyncBeforeLogCloseException { 795 if (syncFuture.isThrowable()) { 796 throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable()); 797 } 798 } 799 800 /** 801 * For Thread A to call when it is ready to wait on the 'safe point' to be attained. Thread A 802 * will be held in here until Thread B calls {@link #safePointAttained()} 803 * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with an 804 * exception, then something is up w/ our syncing. 805 * @return The passed <code>syncFuture</code> 806 */ 807 SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException, 808 FailedSyncBeforeLogCloseException { 809 while (!this.safePointAttainedLatch.await(1, TimeUnit.MILLISECONDS)) { 810 checkIfSyncFailed(syncFuture); 811 } 812 checkIfSyncFailed(syncFuture); 813 return syncFuture; 814 } 815 816 /** 817 * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread 818 * A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called 819 * by Thread A. 820 */ 821 void safePointAttained() throws InterruptedException { 822 this.safePointAttainedLatch.countDown(); 823 this.safePointReleasedLatch.await(); 824 } 825 826 /** 827 * Called by Thread A when it is done with the work it needs to do while Thread B is halted. 828 * This will release the Thread B held in a call to {@link #safePointAttained()} 829 */ 830 void releaseSafePoint() { 831 this.safePointReleasedLatch.countDown(); 832 } 833 834 /** 835 * @return True is this is a 'cocked', fresh instance, and not one that has already fired. 836 */ 837 boolean isCocked() { 838 return this.safePointAttainedLatch.getCount() > 0 839 && this.safePointReleasedLatch.getCount() > 0; 840 } 841 } 842 843 /** 844 * Handler that is run by the disruptor ringbuffer consumer. Consumer is a SINGLE 845 * 'writer/appender' thread. Appends edits and starts up sync runs. Tries its best to batch up 846 * syncs. There is no discernible benefit batching appends so we just append as they come in 847 * because it simplifies the below implementation. See metrics for batching effectiveness (In 848 * measurement, at 100 concurrent handlers writing 1k, we are batching > 10 appends and 10 handler 849 * sync invocations for every actual dfsclient sync call; at 10 concurrent handlers, YMMV). 850 * <p> 851 * Herein, we have an array into which we store the sync futures as they come in. When we have a 852 * 'batch', we'll then pass what we have collected to a SyncRunner thread to do the filesystem 853 * sync. When it completes, it will then call {@link SyncFuture#done(long, Throwable)} on each of 854 * SyncFutures in the batch to release blocked Handler threads. 855 * <p> 856 * I've tried various effects to try and make latencies low while keeping throughput high. I've 857 * tried keeping a single Queue of SyncFutures in this class appending to its tail as the syncs 858 * coming and having sync runner threads poll off the head to 'finish' completed SyncFutures. I've 859 * tried linkedlist, and various from concurrent utils whether LinkedBlockingQueue or 860 * ArrayBlockingQueue, etc. The more points of synchronization, the more 'work' (according to 861 * 'perf stats') that has to be done; small increases in stall percentages seem to have a big 862 * impact on throughput/latencies. The below model where we have an array into which we stash the 863 * syncs and then hand them off to the sync thread seemed like a decent compromise. See HBASE-8755 864 * for more detail. 865 */ 866 class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware { 867 private final SyncRunner[] syncRunners; 868 private final SyncFuture[] syncFutures; 869 // Had 'interesting' issues when this was non-volatile. On occasion, we'd not pass all 870 // syncFutures to the next sync'ing thread. 871 private AtomicInteger syncFuturesCount = new AtomicInteger(); 872 private volatile SafePointZigZagLatch zigzagLatch; 873 /** 874 * Set if we get an exception appending or syncing so that all subsequence appends and syncs on 875 * this WAL fail until WAL is replaced. 876 */ 877 private Exception exception = null; 878 /** 879 * Object to block on while waiting on safe point. 880 */ 881 private final Object safePointWaiter = new Object(); 882 private volatile boolean shutdown = false; 883 884 /** 885 * Which syncrunner to use next. 886 */ 887 private int syncRunnerIndex; 888 889 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) { 890 this.syncFutures = new SyncFuture[maxHandlersCount]; 891 this.syncRunners = new SyncRunner[syncRunnerCount]; 892 for (int i = 0; i < syncRunnerCount; i++) { 893 this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount); 894 } 895 } 896 897 private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) { 898 // There could be handler-count syncFutures outstanding. 899 for (int i = 0; i < this.syncFuturesCount.get(); i++) { 900 this.syncFutures[i].done(sequence, e); 901 } 902 this.syncFuturesCount.set(0); 903 } 904 905 /** 906 * @return True if outstanding sync futures still 907 */ 908 private boolean isOutstandingSyncs() { 909 // Look at SyncFutures in the EventHandler 910 for (int i = 0; i < this.syncFuturesCount.get(); i++) { 911 if (!this.syncFutures[i].isDone()) { 912 return true; 913 } 914 } 915 916 return false; 917 } 918 919 private boolean isOutstandingSyncsFromRunners() { 920 // Look at SyncFutures in the SyncRunners 921 for (SyncRunner syncRunner: syncRunners) { 922 if(syncRunner.isAlive() && !syncRunner.areSyncFuturesReleased()) { 923 return true; 924 } 925 } 926 return false; 927 } 928 929 @Override 930 // We can set endOfBatch in the below method if at end of our this.syncFutures array 931 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch) 932 throws Exception { 933 // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll 934 // add appends to dfsclient as they come in. Batching appends doesn't give any significant 935 // benefit on measurement. Handler sync calls we will batch up. If we get an exception 936 // appending an edit, we fail all subsequent appends and syncs with the same exception until 937 // the WAL is reset. It is important that we not short-circuit and exit early this method. 938 // It is important that we always go through the attainSafePoint on the end. Another thread, 939 // the log roller may be waiting on a signal from us here and will just hang without it. 940 941 try { 942 if (truck.type() == RingBufferTruck.Type.SYNC) { 943 this.syncFutures[this.syncFuturesCount.getAndIncrement()] = truck.unloadSync(); 944 // Force flush of syncs if we are carrying a full complement of syncFutures. 945 if (this.syncFuturesCount.get() == this.syncFutures.length) { 946 endOfBatch = true; 947 } 948 } else if (truck.type() == RingBufferTruck.Type.APPEND) { 949 FSWALEntry entry = truck.unloadAppend(); 950 //TODO handle htrace API change, see HBASE-18895 951 //TraceScope scope = Trace.continueSpan(entry.detachSpan()); 952 try { 953 if (this.exception != null) { 954 // Return to keep processing events coming off the ringbuffer 955 return; 956 } 957 append(entry); 958 } catch (Exception e) { 959 // Failed append. Record the exception. 960 this.exception = e; 961 // invoking cleanupOutstandingSyncsOnException when append failed with exception, 962 // it will cleanup existing sync requests recorded in syncFutures but not offered to SyncRunner yet, 963 // so there won't be any sync future left over if no further truck published to disruptor. 964 cleanupOutstandingSyncsOnException(sequence, 965 this.exception instanceof DamagedWALException ? this.exception 966 : new DamagedWALException("On sync", this.exception)); 967 // Return to keep processing events coming off the ringbuffer 968 return; 969 } finally { 970 entry.release(); 971 } 972 } else { 973 // What is this if not an append or sync. Fail all up to this!!! 974 cleanupOutstandingSyncsOnException(sequence, 975 new IllegalStateException("Neither append nor sync")); 976 // Return to keep processing. 977 return; 978 } 979 980 // TODO: Check size and if big go ahead and call a sync if we have enough data. 981 // This is a sync. If existing exception, fall through. Else look to see if batch. 982 if (this.exception == null) { 983 // If not a batch, return to consume more events from the ring buffer before proceeding; 984 // we want to get up a batch of syncs and appends before we go do a filesystem sync. 985 if (!endOfBatch || this.syncFuturesCount.get() <= 0) { 986 return; 987 } 988 // syncRunnerIndex is bound to the range [0, Integer.MAX_INT - 1] as follows: 989 // * The maximum value possible for syncRunners.length is Integer.MAX_INT 990 // * syncRunnerIndex starts at 0 and is incremented only here 991 // * after the increment, the value is bounded by the '%' operator to 992 // [0, syncRunners.length), presuming the value was positive prior to 993 // the '%' operator. 994 // * after being bound to [0, Integer.MAX_INT - 1], the new value is stored in 995 // syncRunnerIndex ensuring that it can't grow without bound and overflow. 996 // * note that the value after the increment must be positive, because the most it 997 // could have been prior was Integer.MAX_INT - 1 and we only increment by 1. 998 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length; 999 try { 1000 // Below expects that the offer 'transfers' responsibility for the outstanding syncs to 1001 // the syncRunner. We should never get an exception in here. 1002 this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures, 1003 this.syncFuturesCount.get()); 1004 } catch (Exception e) { 1005 // Should NEVER get here. 1006 requestLogRoll(); 1007 this.exception = new DamagedWALException("Failed offering sync", e); 1008 } 1009 } 1010 // We may have picked up an exception above trying to offer sync 1011 if (this.exception != null) { 1012 cleanupOutstandingSyncsOnException(sequence, this.exception instanceof DamagedWALException 1013 ? this.exception : new DamagedWALException("On sync", this.exception)); 1014 } 1015 attainSafePoint(sequence); 1016 this.syncFuturesCount.set(0); 1017 } catch (Throwable t) { 1018 LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t); 1019 } 1020 } 1021 1022 SafePointZigZagLatch attainSafePoint() { 1023 this.zigzagLatch = new SafePointZigZagLatch(); 1024 return this.zigzagLatch; 1025 } 1026 1027 /** 1028 * Check if we should attain safe point. If so, go there and then wait till signalled before we 1029 * proceeding. 1030 */ 1031 private void attainSafePoint(final long currentSequence) { 1032 if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) { 1033 return; 1034 } 1035 // If here, another thread is waiting on us to get to safe point. Don't leave it hanging. 1036 beforeWaitOnSafePoint(); 1037 try { 1038 // Wait on outstanding syncers; wait for them to finish syncing (unless we've been 1039 // shutdown or unless our latch has been thrown because we have been aborted or unless 1040 // this WAL is broken and we can't get a sync/append to complete). 1041 while ((!this.shutdown && this.zigzagLatch.isCocked() 1042 && highestSyncedTxid.get() < currentSequence && 1043 // We could be in here and all syncs are failing or failed. Check for this. Otherwise 1044 // we'll just be stuck here for ever. In other words, ensure there syncs running. 1045 isOutstandingSyncs()) 1046 // Wait for all SyncRunners to finish their work so that we can replace the writer 1047 || isOutstandingSyncsFromRunners()) { 1048 synchronized (this.safePointWaiter) { 1049 this.safePointWaiter.wait(0, 1); 1050 } 1051 } 1052 // Tell waiting thread we've attained safe point. Can clear this.throwable if set here 1053 // because we know that next event through the ringbuffer will be going to a new WAL 1054 // after we do the zigzaglatch dance. 1055 this.exception = null; 1056 this.zigzagLatch.safePointAttained(); 1057 } catch (InterruptedException e) { 1058 LOG.warn("Interrupted ", e); 1059 Thread.currentThread().interrupt(); 1060 } 1061 } 1062 1063 /** 1064 * Append to the WAL. Does all CP and WAL listener calls. 1065 */ 1066 void append(final FSWALEntry entry) throws Exception { 1067 try { 1068 FSHLog.this.append(writer, entry); 1069 } catch (Exception e) { 1070 String msg = "Append sequenceId=" + entry.getKey().getSequenceId() 1071 + ", requesting roll of WAL"; 1072 LOG.warn(msg, e); 1073 requestLogRoll(); 1074 throw new DamagedWALException(msg, e); 1075 } 1076 } 1077 1078 @Override 1079 public void onStart() { 1080 for (SyncRunner syncRunner : this.syncRunners) { 1081 syncRunner.start(); 1082 } 1083 } 1084 1085 @Override 1086 public void onShutdown() { 1087 for (SyncRunner syncRunner : this.syncRunners) { 1088 syncRunner.interrupt(); 1089 } 1090 } 1091 } 1092 1093 /** 1094 * This method gets the pipeline for the current WAL. 1095 */ 1096 @Override 1097 DatanodeInfo[] getPipeline() { 1098 if (this.hdfs_out != null) { 1099 if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) { 1100 return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline(); 1101 } 1102 } 1103 return new DatanodeInfo[0]; 1104 } 1105}