001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.lmax.disruptor.RingBuffer; 025import com.lmax.disruptor.Sequence; 026import com.lmax.disruptor.Sequencer; 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.util.ArrayDeque; 030import java.util.Comparator; 031import java.util.Deque; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.SortedSet; 036import java.util.TreeSet; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.LinkedBlockingQueue; 040import java.util.concurrent.ThreadPoolExecutor; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.locks.Condition; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046import java.util.function.Supplier; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.HBaseInterfaceAudience; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 053import org.apache.hadoop.hbase.trace.TraceUtil; 054import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 058import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.htrace.core.TraceScope; 061import org.apache.yetus.audience.InterfaceAudience; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 066import org.apache.hbase.thirdparty.io.netty.channel.Channel; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 069import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 070 071/** 072 * An asynchronous implementation of FSWAL. 073 * <p> 074 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 075 * <p> 076 * For append, we process it as follow: 077 * <ol> 078 * <li>In the caller thread(typically, in the rpc handler thread): 079 * <ol> 080 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 081 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 082 * </li> 083 * </ol> 084 * </li> 085 * <li>In the consumer task(executed in a single threaded thread pool) 086 * <ol> 087 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 088 * {@link #toWriteAppends}</li> 089 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 090 * {@link #unackedAppends}</li> 091 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 092 * sync on the AsyncWriter.</li> 093 * <li>In the callback methods: 094 * <ul> 095 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 096 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 097 * wait for writing them again.</li> 098 * </ul> 099 * </li> 100 * </ol> 101 * </li> 102 * </ol> 103 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 104 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 105 * <p> 106 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 107 * FSHLog.<br> 108 * For a normal roll request(for example, we have reached the log roll size): 109 * <ol> 110 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 111 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 112 * {@link #waitForSafePoint()}).</li> 113 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 114 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 115 * </li> 116 * <li>If there are unflush data in the writer, sync them.</li> 117 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 118 * signal the {@link #readyForRollingCond}.</li> 119 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 120 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 121 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 122 * <li>Schedule the consumer task.</li> 123 * <li>Schedule a background task to close the old writer.</li> 124 * </ol> 125 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 126 * point stage. 127 */ 128@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 129public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 130 131 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 132 133 private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> { 134 int c = Long.compare(o1.getTxid(), o2.getTxid()); 135 return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); 136 }; 137 138 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 139 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 140 141 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 142 "hbase.wal.async.use-shared-event-loop"; 143 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 144 145 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 146 "hbase.wal.async.wait.on.shutdown.seconds"; 147 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 148 149 private final EventLoopGroup eventLoopGroup; 150 151 private final ExecutorService consumeExecutor; 152 153 private final Class<? extends Channel> channelClass; 154 155 private final Lock consumeLock = new ReentrantLock(); 156 157 private final Runnable consumer = this::consume; 158 159 // check if there is already a consumer task in the event loop's task queue 160 private final Supplier<Boolean> hasConsumerTask; 161 162 private static final int MAX_EPOCH = 0x3FFFFFFF; 163 // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old 164 // writer to be closed. 165 // the second lowest bit is writerBorken which means the current writer is broken and rollWriter 166 // is needed. 167 // all other bits are the epoch number of the current writer, this is used to detect whether the 168 // writer is still the one when you issue the sync. 169 // notice that, modification to this field is only allowed under the protection of consumeLock. 170 private volatile int epochAndState; 171 172 private boolean readyForRolling; 173 174 private final Condition readyForRollingCond = consumeLock.newCondition(); 175 176 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 177 178 private final Sequence waitingConsumePayloadsGatingSequence; 179 180 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 181 182 private final long batchSize; 183 184 private final ExecutorService closeExecutor = Executors.newCachedThreadPool( 185 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 186 187 private volatile AsyncFSOutput fsOut; 188 189 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 190 191 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 192 193 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 194 195 // the highest txid of WAL entries being processed 196 private long highestProcessedAppendTxid; 197 198 // file length when we issue last sync request on the writer 199 private long fileLengthAtLastSync; 200 201 private long highestProcessedAppendTxidAtLastSync; 202 203 private final int waitOnShutdownInSeconds; 204 205 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 206 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 207 String prefix, String suffix, EventLoopGroup eventLoopGroup, 208 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 209 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 210 this.eventLoopGroup = eventLoopGroup; 211 this.channelClass = channelClass; 212 Supplier<Boolean> hasConsumerTask; 213 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 214 this.consumeExecutor = eventLoopGroup.next(); 215 if (consumeExecutor instanceof SingleThreadEventExecutor) { 216 try { 217 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 218 field.setAccessible(true); 219 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 220 hasConsumerTask = () -> queue.peek() == consumer; 221 } catch (Exception e) { 222 LOG.warn("Can not get task queue of " + consumeExecutor + 223 ", this is not necessary, just give up", e); 224 hasConsumerTask = () -> false; 225 } 226 } else { 227 hasConsumerTask = () -> false; 228 } 229 } else { 230 ThreadPoolExecutor threadPool = 231 new ThreadPoolExecutor(1, 1, 0L, 232 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 233 new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()). 234 setDaemon(true).build()); 235 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 236 this.consumeExecutor = threadPool; 237 } 238 239 this.hasConsumerTask = hasConsumerTask; 240 int preallocatedEventCount = 241 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 242 waitingConsumePayloads = 243 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 244 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 245 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 246 247 // inrease the ringbuffer sequence so our txid is start from 1 248 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 249 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 250 251 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 252 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 253 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 254 } 255 256 private static boolean waitingRoll(int epochAndState) { 257 return (epochAndState & 1) != 0; 258 } 259 260 private static boolean writerBroken(int epochAndState) { 261 return ((epochAndState >>> 1) & 1) != 0; 262 } 263 264 private static int epoch(int epochAndState) { 265 return epochAndState >>> 2; 266 } 267 268 // return whether we have successfully set readyForRolling to true. 269 private boolean trySetReadyForRolling() { 270 // Check without holding lock first. Usually we will just return here. 271 // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to 272 // check them outside the consumeLock. 273 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 274 return false; 275 } 276 consumeLock.lock(); 277 try { 278 // 1. a roll is requested 279 // 2. all out-going entries have been acked(we have confirmed above). 280 if (waitingRoll(epochAndState)) { 281 readyForRolling = true; 282 readyForRollingCond.signalAll(); 283 return true; 284 } else { 285 return false; 286 } 287 } finally { 288 consumeLock.unlock(); 289 } 290 } 291 292 private void syncFailed(long epochWhenSync, Throwable error) { 293 LOG.warn("sync failed", error); 294 boolean shouldRequestLogRoll = true; 295 consumeLock.lock(); 296 try { 297 int currentEpochAndState = epochAndState; 298 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 299 // this is not the previous writer which means we have already rolled the writer. 300 // or this is still the current writer, but we have already marked it as broken and request 301 // a roll. 302 return; 303 } 304 this.epochAndState = currentEpochAndState | 0b10; 305 if (waitingRoll(currentEpochAndState)) { 306 readyForRolling = true; 307 readyForRollingCond.signalAll(); 308 // this means we have already in the middle of a rollWriter so just tell the roller thread 309 // that you can continue without requesting an extra log roll. 310 shouldRequestLogRoll = false; 311 } 312 } finally { 313 consumeLock.unlock(); 314 } 315 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 316 toWriteAppends.addFirst(iter.next()); 317 } 318 highestUnsyncedTxid = highestSyncedTxid.get(); 319 if (shouldRequestLogRoll) { 320 // request a roll. 321 requestLogRoll(ERROR); 322 } 323 } 324 325 private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { 326 highestSyncedTxid.set(processedTxid); 327 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 328 FSWALEntry entry = iter.next(); 329 if (entry.getTxid() <= processedTxid) { 330 entry.release(); 331 iter.remove(); 332 } else { 333 break; 334 } 335 } 336 postSync(System.nanoTime() - startTimeNs, finishSync(true)); 337 if (trySetReadyForRolling()) { 338 // we have just finished a roll, then do not need to check for log rolling, the writer will be 339 // closed soon. 340 return; 341 } 342 // If we haven't already requested a roll, check if we have exceeded logrollsize 343 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 344 if (LOG.isDebugEnabled()) { 345 LOG.debug("Requesting log roll because of file size threshold; length=" + 346 writer.getLength() + ", logrollsize=" + logrollsize); 347 } 348 requestLogRoll(SIZE); 349 } 350 } 351 352 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 353 // sync futures then just use the default one. 354 private boolean isHsync(long beginTxid, long endTxid) { 355 SortedSet<SyncFuture> futures = 356 syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); 357 if (futures.isEmpty()) { 358 return useHsync; 359 } 360 for (SyncFuture future : futures) { 361 if (future.isForceSync()) { 362 return true; 363 } 364 } 365 return false; 366 } 367 368 private void sync(AsyncWriter writer) { 369 fileLengthAtLastSync = writer.getLength(); 370 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 371 boolean shouldUseHsync = 372 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 373 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 374 final long startTimeNs = System.nanoTime(); 375 final long epoch = (long) epochAndState >>> 2L; 376 addListener(writer.sync(shouldUseHsync), (result, error) -> { 377 if (error != null) { 378 syncFailed(epoch, error); 379 } else { 380 syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); 381 } 382 }, consumeExecutor); 383 } 384 385 private void addTimeAnnotation(SyncFuture future, String annotation) { 386 TraceUtil.addTimelineAnnotation(annotation); 387 // TODO handle htrace API change, see HBASE-18895 388 // future.setSpan(scope.getSpan()); 389 } 390 391 private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { 392 int finished = 0; 393 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 394 SyncFuture sync = iter.next(); 395 if (sync.getTxid() <= txid) { 396 sync.done(txid, null); 397 iter.remove(); 398 finished++; 399 if (addSyncTrace) { 400 addTimeAnnotation(sync, "writer synced"); 401 } 402 } else { 403 break; 404 } 405 } 406 return finished; 407 } 408 409 // try advancing the highestSyncedTxid as much as possible 410 private int finishSync(boolean addSyncTrace) { 411 if (unackedAppends.isEmpty()) { 412 // All outstanding appends have been acked. 413 if (toWriteAppends.isEmpty()) { 414 // Also no appends that wait to be written out, then just finished all pending syncs. 415 long maxSyncTxid = highestSyncedTxid.get(); 416 for (SyncFuture sync : syncFutures) { 417 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 418 sync.done(maxSyncTxid, null); 419 if (addSyncTrace) { 420 addTimeAnnotation(sync, "writer synced"); 421 } 422 } 423 highestSyncedTxid.set(maxSyncTxid); 424 int finished = syncFutures.size(); 425 syncFutures.clear(); 426 return finished; 427 } else { 428 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 429 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 430 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 431 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 432 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 433 long doneTxid = lowestUnprocessedAppendTxid - 1; 434 highestSyncedTxid.set(doneTxid); 435 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 436 } 437 } else { 438 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 439 // first unacked append minus 1. 440 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 441 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 442 highestSyncedTxid.set(doneTxid); 443 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 444 } 445 } 446 447 private void appendAndSync() { 448 final AsyncWriter writer = this.writer; 449 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 450 // finish some. 451 finishSync(false); 452 long newHighestProcessedAppendTxid = -1L; 453 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 454 FSWALEntry entry = iter.next(); 455 boolean appended; 456 try { 457 appended = appendEntry(writer, entry); 458 } catch (IOException e) { 459 throw new AssertionError("should not happen", e); 460 } 461 newHighestProcessedAppendTxid = entry.getTxid(); 462 iter.remove(); 463 if (appended) { 464 // This is possible, when we fail to sync, we will add the unackedAppends back to 465 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 466 if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) { 467 unackedAppends.addLast(entry); 468 } 469 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 470 break; 471 } 472 } 473 } 474 // if we have a newer transaction id, update it. 475 // otherwise, use the previous transaction id. 476 if (newHighestProcessedAppendTxid > 0) { 477 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 478 } else { 479 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 480 } 481 482 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 483 // sync because buffer size limit. 484 sync(writer); 485 return; 486 } 487 if (writer.getLength() == fileLengthAtLastSync) { 488 // we haven't written anything out, just advance the highestSyncedSequence since we may only 489 // stamped some region sequence id. 490 if (unackedAppends.isEmpty()) { 491 highestSyncedTxid.set(highestProcessedAppendTxid); 492 finishSync(false); 493 trySetReadyForRolling(); 494 } 495 return; 496 } 497 // reach here means that we have some unsynced data but haven't reached the batch size yet 498 // but we will not issue a sync directly here even if there are sync requests because we may 499 // have some new data in the ringbuffer, so let's just return here and delay the decision of 500 // whether to issue a sync in the caller method. 501 } 502 503 private void consume() { 504 consumeLock.lock(); 505 try { 506 int currentEpochAndState = epochAndState; 507 if (writerBroken(currentEpochAndState)) { 508 return; 509 } 510 if (waitingRoll(currentEpochAndState)) { 511 if (writer.getLength() > fileLengthAtLastSync) { 512 // issue a sync 513 sync(writer); 514 } else { 515 if (unackedAppends.isEmpty()) { 516 readyForRolling = true; 517 readyForRollingCond.signalAll(); 518 } 519 } 520 return; 521 } 522 } finally { 523 consumeLock.unlock(); 524 } 525 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 526 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 527 nextCursor++) { 528 if (!waitingConsumePayloads.isPublished(nextCursor)) { 529 break; 530 } 531 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 532 switch (truck.type()) { 533 case APPEND: 534 toWriteAppends.addLast(truck.unloadAppend()); 535 break; 536 case SYNC: 537 syncFutures.add(truck.unloadSync()); 538 break; 539 default: 540 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 541 break; 542 } 543 waitingConsumePayloadsGatingSequence.set(nextCursor); 544 } 545 appendAndSync(); 546 if (hasConsumerTask.get()) { 547 return; 548 } 549 if (toWriteAppends.isEmpty()) { 550 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 551 consumerScheduled.set(false); 552 // recheck here since in append and sync we do not hold the consumeLock. Thing may 553 // happen like 554 // 1. we check cursor, no new entry 555 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 556 // give up scheduling the consumer task. 557 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 558 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 559 // we will give up consuming so if there are some unsynced data we need to issue a sync. 560 if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() && 561 syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { 562 // no new data in the ringbuffer and we have at least one sync request 563 sync(writer); 564 } 565 return; 566 } else { 567 // maybe someone has grabbed this before us 568 if (!consumerScheduled.compareAndSet(false, true)) { 569 return; 570 } 571 } 572 } 573 } 574 // reschedule if we still have something to write. 575 consumeExecutor.execute(consumer); 576 } 577 578 private boolean shouldScheduleConsumer() { 579 int currentEpochAndState = epochAndState; 580 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 581 return false; 582 } 583 return consumerScheduled.compareAndSet(false, true); 584 } 585 586 @Override 587 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 588 throws IOException { 589 long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, 590 waitingConsumePayloads); 591 if (shouldScheduleConsumer()) { 592 consumeExecutor.execute(consumer); 593 } 594 return txid; 595 } 596 597 @Override 598 public void sync() throws IOException { 599 sync(useHsync); 600 } 601 602 @Override 603 public void sync(long txid) throws IOException { 604 sync(txid, useHsync); 605 } 606 607 @Override 608 public void sync(boolean forceSync) throws IOException { 609 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 610 long txid = waitingConsumePayloads.next(); 611 SyncFuture future; 612 try { 613 future = getSyncFuture(txid, forceSync); 614 RingBufferTruck truck = waitingConsumePayloads.get(txid); 615 truck.load(future); 616 } finally { 617 waitingConsumePayloads.publish(txid); 618 } 619 if (shouldScheduleConsumer()) { 620 consumeExecutor.execute(consumer); 621 } 622 blockOnSync(future); 623 } 624 } 625 626 @Override 627 public void sync(long txid, boolean forceSync) throws IOException { 628 if (highestSyncedTxid.get() >= txid) { 629 return; 630 } 631 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 632 // here we do not use ring buffer sequence as txid 633 long sequence = waitingConsumePayloads.next(); 634 SyncFuture future; 635 try { 636 future = getSyncFuture(txid, forceSync); 637 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 638 truck.load(future); 639 } finally { 640 waitingConsumePayloads.publish(sequence); 641 } 642 if (shouldScheduleConsumer()) { 643 consumeExecutor.execute(consumer); 644 } 645 blockOnSync(future); 646 } 647 } 648 649 @Override 650 protected AsyncWriter createWriterInstance(Path path) throws IOException { 651 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, 652 this.blocksize, eventLoopGroup, channelClass); 653 } 654 655 private void waitForSafePoint() { 656 consumeLock.lock(); 657 try { 658 int currentEpochAndState = epochAndState; 659 if (writerBroken(currentEpochAndState) || this.writer == null) { 660 return; 661 } 662 consumerScheduled.set(true); 663 epochAndState = currentEpochAndState | 1; 664 readyForRolling = false; 665 consumeExecutor.execute(consumer); 666 while (!readyForRolling) { 667 readyForRollingCond.awaitUninterruptibly(); 668 } 669 } finally { 670 consumeLock.unlock(); 671 } 672 } 673 674 private long closeWriter() { 675 AsyncWriter oldWriter = this.writer; 676 if (oldWriter != null) { 677 long fileLength = oldWriter.getLength(); 678 closeExecutor.execute(() -> { 679 try { 680 oldWriter.close(); 681 } catch (IOException e) { 682 LOG.warn("close old writer failed", e); 683 } 684 }); 685 return fileLength; 686 } else { 687 return 0L; 688 } 689 } 690 691 @Override 692 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 693 throws IOException { 694 Preconditions.checkNotNull(nextWriter); 695 waitForSafePoint(); 696 long oldFileLen = closeWriter(); 697 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 698 this.writer = nextWriter; 699 if (nextWriter instanceof AsyncProtobufLogWriter) { 700 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 701 } 702 this.fileLengthAtLastSync = nextWriter.getLength(); 703 this.highestProcessedAppendTxidAtLastSync = 0L; 704 consumeLock.lock(); 705 try { 706 consumerScheduled.set(true); 707 int currentEpoch = epochAndState >>> 2; 708 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 709 // set a new epoch and also clear waitingRoll and writerBroken 710 this.epochAndState = nextEpoch << 2; 711 // Reset rollRequested status 712 rollRequested.set(false); 713 consumeExecutor.execute(consumer); 714 } finally { 715 consumeLock.unlock(); 716 } 717 } 718 719 @Override 720 protected void doShutdown() throws IOException { 721 waitForSafePoint(); 722 closeWriter(); 723 closeExecutor.shutdown(); 724 try { 725 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 726 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + 727 " the close of async writer doesn't complete." + 728 "Please check the status of underlying filesystem" + 729 " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + 730 "\""); 731 } 732 } catch (InterruptedException e) { 733 LOG.error("The wait for close of async writer is interrupted"); 734 Thread.currentThread().interrupt(); 735 } 736 IOException error = new IOException("WAL has been closed"); 737 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 738 // drain all the pending sync requests 739 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 740 nextCursor++) { 741 if (!waitingConsumePayloads.isPublished(nextCursor)) { 742 break; 743 } 744 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 745 switch (truck.type()) { 746 case SYNC: 747 syncFutures.add(truck.unloadSync()); 748 break; 749 default: 750 break; 751 } 752 } 753 // and fail them 754 syncFutures.forEach(f -> f.done(f.getTxid(), error)); 755 if (!(consumeExecutor instanceof EventLoop)) { 756 consumeExecutor.shutdown(); 757 } 758 } 759 760 @Override 761 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 762 writer.append(entry); 763 } 764 765 @Override 766 DatanodeInfo[] getPipeline() { 767 AsyncFSOutput output = this.fsOut; 768 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 769 } 770 771 @Override 772 int getLogReplication() { 773 return getPipeline().length; 774 } 775 776 @Override 777 protected boolean doCheckLogLowReplication() { 778 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 779 // typically there is no 'low replication' state, only a 'broken' state. 780 AsyncFSOutput output = this.fsOut; 781 return output != null && output.isBroken(); 782 } 783}