001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import com.lmax.disruptor.RingBuffer; 023import com.lmax.disruptor.Sequence; 024import com.lmax.disruptor.Sequencer; 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.util.ArrayDeque; 028import java.util.Comparator; 029import java.util.Deque; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Queue; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.locks.Condition; 042import java.util.concurrent.locks.Lock; 043import java.util.concurrent.locks.ReentrantLock; 044import java.util.function.Supplier; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.HBaseInterfaceAudience; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 051import org.apache.hadoop.hbase.trace.TraceUtil; 052import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 053import org.apache.hadoop.hbase.wal.WALEdit; 054import org.apache.hadoop.hbase.wal.WALKeyImpl; 055import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 056import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058import org.apache.htrace.core.TraceScope; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 064import org.apache.hbase.thirdparty.io.netty.channel.Channel; 065import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 066import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 067import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 068 069/** 070 * An asynchronous implementation of FSWAL. 071 * <p> 072 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 073 * <p> 074 * For append, we process it as follow: 075 * <ol> 076 * <li>In the caller thread(typically, in the rpc handler thread): 077 * <ol> 078 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 079 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 080 * </li> 081 * </ol> 082 * </li> 083 * <li>In the consumer task(executed in a single threaded thread pool) 084 * <ol> 085 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 086 * {@link #toWriteAppends}</li> 087 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 088 * {@link #unackedAppends}</li> 089 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 090 * sync on the AsyncWriter.</li> 091 * <li>In the callback methods: 092 * <ul> 093 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 094 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 095 * wait for writing them again.</li> 096 * </ul> 097 * </li> 098 * </ol> 099 * </li> 100 * </ol> 101 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 102 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 103 * <p> 104 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 105 * FSHLog.<br> 106 * For a normal roll request(for example, we have reached the log roll size): 107 * <ol> 108 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 109 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 110 * {@link #waitForSafePoint()}).</li> 111 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 112 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 113 * </li> 114 * <li>If there are unflush data in the writer, sync them.</li> 115 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 116 * signal the {@link #readyForRollingCond}.</li> 117 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 118 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 119 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 120 * <li>Schedule the consumer task.</li> 121 * <li>Schedule a background task to close the old writer.</li> 122 * </ol> 123 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 124 * point stage. 125 */ 126@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 127public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 128 129 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 130 131 private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> { 132 int c = Long.compare(o1.getTxid(), o2.getTxid()); 133 return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); 134 }; 135 136 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 137 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 138 139 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 140 "hbase.wal.async.use-shared-event-loop"; 141 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 142 143 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 144 "hbase.wal.async.wait.on.shutdown.seconds"; 145 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 146 147 private final EventLoopGroup eventLoopGroup; 148 149 private final ExecutorService consumeExecutor; 150 151 private final Class<? extends Channel> channelClass; 152 153 private final Lock consumeLock = new ReentrantLock(); 154 155 private final Runnable consumer = this::consume; 156 157 // check if there is already a consumer task in the event loop's task queue 158 private final Supplier<Boolean> hasConsumerTask; 159 160 private static final int MAX_EPOCH = 0x3FFFFFFF; 161 // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old 162 // writer to be closed. 163 // the second lowest bit is writerBorken which means the current writer is broken and rollWriter 164 // is needed. 165 // all other bits are the epoch number of the current writer, this is used to detect whether the 166 // writer is still the one when you issue the sync. 167 // notice that, modification to this field is only allowed under the protection of consumeLock. 168 private volatile int epochAndState; 169 170 private boolean readyForRolling; 171 172 private final Condition readyForRollingCond = consumeLock.newCondition(); 173 174 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 175 176 private final Sequence waitingConsumePayloadsGatingSequence; 177 178 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 179 180 private final long batchSize; 181 182 private final ExecutorService closeExecutor = Executors.newCachedThreadPool( 183 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 184 185 private volatile AsyncFSOutput fsOut; 186 187 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 188 189 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 190 191 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 192 193 // the highest txid of WAL entries being processed 194 private long highestProcessedAppendTxid; 195 196 // file length when we issue last sync request on the writer 197 private long fileLengthAtLastSync; 198 199 private long highestProcessedAppendTxidAtLastSync; 200 201 private final int waitOnShutdownInSeconds; 202 203 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 204 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 205 String prefix, String suffix, EventLoopGroup eventLoopGroup, 206 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 207 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 208 this.eventLoopGroup = eventLoopGroup; 209 this.channelClass = channelClass; 210 Supplier<Boolean> hasConsumerTask; 211 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 212 this.consumeExecutor = eventLoopGroup.next(); 213 if (consumeExecutor instanceof SingleThreadEventExecutor) { 214 try { 215 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 216 field.setAccessible(true); 217 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 218 hasConsumerTask = () -> queue.peek() == consumer; 219 } catch (Exception e) { 220 LOG.warn("Can not get task queue of " + consumeExecutor + 221 ", this is not necessary, just give up", e); 222 hasConsumerTask = () -> false; 223 } 224 } else { 225 hasConsumerTask = () -> false; 226 } 227 } else { 228 ThreadPoolExecutor threadPool = 229 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 230 new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build()); 231 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 232 this.consumeExecutor = threadPool; 233 } 234 235 this.hasConsumerTask = hasConsumerTask; 236 int preallocatedEventCount = 237 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 238 waitingConsumePayloads = 239 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 240 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 241 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 242 243 // inrease the ringbuffer sequence so our txid is start from 1 244 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 245 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 246 247 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 248 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 249 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 250 rollWriter(); 251 } 252 253 private static boolean waitingRoll(int epochAndState) { 254 return (epochAndState & 1) != 0; 255 } 256 257 private static boolean writerBroken(int epochAndState) { 258 return ((epochAndState >>> 1) & 1) != 0; 259 } 260 261 private static int epoch(int epochAndState) { 262 return epochAndState >>> 2; 263 } 264 265 // return whether we have successfully set readyForRolling to true. 266 private boolean trySetReadyForRolling() { 267 // Check without holding lock first. Usually we will just return here. 268 // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to 269 // check them outside the consumeLock. 270 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 271 return false; 272 } 273 consumeLock.lock(); 274 try { 275 // 1. a roll is requested 276 // 2. all out-going entries have been acked(we have confirmed above). 277 if (waitingRoll(epochAndState)) { 278 readyForRolling = true; 279 readyForRollingCond.signalAll(); 280 return true; 281 } else { 282 return false; 283 } 284 } finally { 285 consumeLock.unlock(); 286 } 287 } 288 289 private void syncFailed(long epochWhenSync, Throwable error) { 290 LOG.warn("sync failed", error); 291 boolean shouldRequestLogRoll = true; 292 consumeLock.lock(); 293 try { 294 int currentEpochAndState = epochAndState; 295 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 296 // this is not the previous writer which means we have already rolled the writer. 297 // or this is still the current writer, but we have already marked it as broken and request 298 // a roll. 299 return; 300 } 301 this.epochAndState = currentEpochAndState | 0b10; 302 if (waitingRoll(currentEpochAndState)) { 303 readyForRolling = true; 304 readyForRollingCond.signalAll(); 305 // this means we have already in the middle of a rollWriter so just tell the roller thread 306 // that you can continue without requesting an extra log roll. 307 shouldRequestLogRoll = false; 308 } 309 } finally { 310 consumeLock.unlock(); 311 } 312 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 313 toWriteAppends.addFirst(iter.next()); 314 } 315 highestUnsyncedTxid = highestSyncedTxid.get(); 316 if (shouldRequestLogRoll) { 317 // request a roll. 318 requestLogRoll(); 319 } 320 } 321 322 private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { 323 highestSyncedTxid.set(processedTxid); 324 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 325 FSWALEntry entry = iter.next(); 326 if (entry.getTxid() <= processedTxid) { 327 entry.release(); 328 iter.remove(); 329 } else { 330 break; 331 } 332 } 333 postSync(System.nanoTime() - startTimeNs, finishSync(true)); 334 if (trySetReadyForRolling()) { 335 // we have just finished a roll, then do not need to check for log rolling, the writer will be 336 // closed soon. 337 return; 338 } 339 if (writer.getLength() < logrollsize || isLogRollRequested()) { 340 return; 341 } 342 requestLogRoll(); 343 } 344 345 private void sync(AsyncWriter writer) { 346 fileLengthAtLastSync = writer.getLength(); 347 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 348 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 349 final long startTimeNs = System.nanoTime(); 350 final long epoch = (long) epochAndState >>> 2L; 351 addListener(writer.sync(), (result, error) -> { 352 if (error != null) { 353 syncFailed(epoch, error); 354 } else { 355 syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); 356 } 357 }, consumeExecutor); 358 } 359 360 private void addTimeAnnotation(SyncFuture future, String annotation) { 361 TraceUtil.addTimelineAnnotation(annotation); 362 // TODO handle htrace API change, see HBASE-18895 363 // future.setSpan(scope.getSpan()); 364 } 365 366 private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { 367 int finished = 0; 368 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 369 SyncFuture sync = iter.next(); 370 if (sync.getTxid() <= txid) { 371 sync.done(txid, null); 372 iter.remove(); 373 finished++; 374 if (addSyncTrace) { 375 addTimeAnnotation(sync, "writer synced"); 376 } 377 } else { 378 break; 379 } 380 } 381 return finished; 382 } 383 384 // try advancing the highestSyncedTxid as much as possible 385 private int finishSync(boolean addSyncTrace) { 386 if (unackedAppends.isEmpty()) { 387 // All outstanding appends have been acked. 388 if (toWriteAppends.isEmpty()) { 389 // Also no appends that wait to be written out, then just finished all pending syncs. 390 long maxSyncTxid = highestSyncedTxid.get(); 391 for (SyncFuture sync : syncFutures) { 392 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 393 sync.done(maxSyncTxid, null); 394 if (addSyncTrace) { 395 addTimeAnnotation(sync, "writer synced"); 396 } 397 } 398 highestSyncedTxid.set(maxSyncTxid); 399 int finished = syncFutures.size(); 400 syncFutures.clear(); 401 return finished; 402 } else { 403 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 404 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 405 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 406 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 407 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 408 long doneTxid = lowestUnprocessedAppendTxid - 1; 409 highestSyncedTxid.set(doneTxid); 410 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 411 } 412 } else { 413 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 414 // first unacked append minus 1. 415 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 416 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 417 highestSyncedTxid.set(doneTxid); 418 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 419 } 420 } 421 422 private void appendAndSync() { 423 final AsyncWriter writer = this.writer; 424 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 425 // finish some. 426 finishSync(false); 427 long newHighestProcessedAppendTxid = -1L; 428 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 429 FSWALEntry entry = iter.next(); 430 boolean appended; 431 try { 432 appended = append(writer, entry); 433 } catch (IOException e) { 434 throw new AssertionError("should not happen", e); 435 } 436 newHighestProcessedAppendTxid = entry.getTxid(); 437 iter.remove(); 438 if (appended) { 439 // This is possible, when we fail to sync, we will add the unackedAppends back to 440 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 441 if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) { 442 unackedAppends.addLast(entry); 443 } 444 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 445 break; 446 } 447 } 448 } 449 // if we have a newer transaction id, update it. 450 // otherwise, use the previous transaction id. 451 if (newHighestProcessedAppendTxid > 0) { 452 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 453 } else { 454 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 455 } 456 457 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 458 // sync because buffer size limit. 459 sync(writer); 460 return; 461 } 462 if (writer.getLength() == fileLengthAtLastSync) { 463 // we haven't written anything out, just advance the highestSyncedSequence since we may only 464 // stamped some region sequence id. 465 if (unackedAppends.isEmpty()) { 466 highestSyncedTxid.set(highestProcessedAppendTxid); 467 finishSync(false); 468 trySetReadyForRolling(); 469 } 470 return; 471 } 472 // reach here means that we have some unsynced data but haven't reached the batch size yet 473 // but we will not issue a sync directly here even if there are sync requests because we may 474 // have some new data in the ringbuffer, so let's just return here and delay the decision of 475 // whether to issue a sync in the caller method. 476 } 477 478 private void consume() { 479 consumeLock.lock(); 480 try { 481 int currentEpochAndState = epochAndState; 482 if (writerBroken(currentEpochAndState)) { 483 return; 484 } 485 if (waitingRoll(currentEpochAndState)) { 486 if (writer.getLength() > fileLengthAtLastSync) { 487 // issue a sync 488 sync(writer); 489 } else { 490 if (unackedAppends.isEmpty()) { 491 readyForRolling = true; 492 readyForRollingCond.signalAll(); 493 } 494 } 495 return; 496 } 497 } finally { 498 consumeLock.unlock(); 499 } 500 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 501 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 502 nextCursor++) { 503 if (!waitingConsumePayloads.isPublished(nextCursor)) { 504 break; 505 } 506 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 507 switch (truck.type()) { 508 case APPEND: 509 toWriteAppends.addLast(truck.unloadAppend()); 510 break; 511 case SYNC: 512 syncFutures.add(truck.unloadSync()); 513 break; 514 default: 515 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 516 break; 517 } 518 waitingConsumePayloadsGatingSequence.set(nextCursor); 519 } 520 appendAndSync(); 521 if (hasConsumerTask.get()) { 522 return; 523 } 524 if (toWriteAppends.isEmpty()) { 525 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 526 consumerScheduled.set(false); 527 // recheck here since in append and sync we do not hold the consumeLock. Thing may 528 // happen like 529 // 1. we check cursor, no new entry 530 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 531 // give up scheduling the consumer task. 532 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 533 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 534 // we will give up consuming so if there are some unsynced data we need to issue a sync. 535 if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() && 536 syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { 537 // no new data in the ringbuffer and we have at least one sync request 538 sync(writer); 539 } 540 return; 541 } else { 542 // maybe someone has grabbed this before us 543 if (!consumerScheduled.compareAndSet(false, true)) { 544 return; 545 } 546 } 547 } 548 } 549 // reschedule if we still have something to write. 550 consumeExecutor.execute(consumer); 551 } 552 553 private boolean shouldScheduleConsumer() { 554 int currentEpochAndState = epochAndState; 555 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 556 return false; 557 } 558 return consumerScheduled.compareAndSet(false, true); 559 } 560 561 @Override 562 public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 563 throws IOException { 564 long txid = 565 stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); 566 if (shouldScheduleConsumer()) { 567 consumeExecutor.execute(consumer); 568 } 569 return txid; 570 } 571 572 @Override 573 public void sync() throws IOException { 574 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 575 long txid = waitingConsumePayloads.next(); 576 SyncFuture future; 577 try { 578 future = getSyncFuture(txid); 579 RingBufferTruck truck = waitingConsumePayloads.get(txid); 580 truck.load(future); 581 } finally { 582 waitingConsumePayloads.publish(txid); 583 } 584 if (shouldScheduleConsumer()) { 585 consumeExecutor.execute(consumer); 586 } 587 blockOnSync(future); 588 } 589 } 590 591 @Override 592 public void sync(long txid) throws IOException { 593 if (highestSyncedTxid.get() >= txid) { 594 return; 595 } 596 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 597 // here we do not use ring buffer sequence as txid 598 long sequence = waitingConsumePayloads.next(); 599 SyncFuture future; 600 try { 601 future = getSyncFuture(txid); 602 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 603 truck.load(future); 604 } finally { 605 waitingConsumePayloads.publish(sequence); 606 } 607 if (shouldScheduleConsumer()) { 608 consumeExecutor.execute(consumer); 609 } 610 blockOnSync(future); 611 } 612 } 613 614 @Override 615 protected AsyncWriter createWriterInstance(Path path) throws IOException { 616 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, 617 this.blocksize, eventLoopGroup, channelClass); 618 } 619 620 private void waitForSafePoint() { 621 consumeLock.lock(); 622 try { 623 int currentEpochAndState = epochAndState; 624 if (writerBroken(currentEpochAndState) || this.writer == null) { 625 return; 626 } 627 consumerScheduled.set(true); 628 epochAndState = currentEpochAndState | 1; 629 readyForRolling = false; 630 consumeExecutor.execute(consumer); 631 while (!readyForRolling) { 632 readyForRollingCond.awaitUninterruptibly(); 633 } 634 } finally { 635 consumeLock.unlock(); 636 } 637 } 638 639 private long closeWriter() { 640 AsyncWriter oldWriter = this.writer; 641 if (oldWriter != null) { 642 long fileLength = oldWriter.getLength(); 643 closeExecutor.execute(() -> { 644 try { 645 oldWriter.close(); 646 } catch (IOException e) { 647 LOG.warn("close old writer failed", e); 648 } 649 }); 650 return fileLength; 651 } else { 652 return 0L; 653 } 654 } 655 656 @Override 657 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 658 throws IOException { 659 Preconditions.checkNotNull(nextWriter); 660 waitForSafePoint(); 661 long oldFileLen = closeWriter(); 662 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 663 this.writer = nextWriter; 664 if (nextWriter instanceof AsyncProtobufLogWriter) { 665 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 666 } 667 this.fileLengthAtLastSync = nextWriter.getLength(); 668 this.highestProcessedAppendTxidAtLastSync = 0L; 669 consumeLock.lock(); 670 try { 671 consumerScheduled.set(true); 672 int currentEpoch = epochAndState >>> 2; 673 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 674 // set a new epoch and also clear waitingRoll and writerBroken 675 this.epochAndState = nextEpoch << 2; 676 // Reset rollRequested status 677 rollRequested.set(false); 678 consumeExecutor.execute(consumer); 679 } finally { 680 consumeLock.unlock(); 681 } 682 } 683 684 @Override 685 protected void doShutdown() throws IOException { 686 waitForSafePoint(); 687 closeWriter(); 688 closeExecutor.shutdown(); 689 try { 690 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 691 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + 692 " the close of async writer doesn't complete." + 693 "Please check the status of underlying filesystem" + 694 " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + 695 "\""); 696 } 697 } catch (InterruptedException e) { 698 LOG.error("The wait for close of async writer is interrupted"); 699 Thread.currentThread().interrupt(); 700 } 701 IOException error = new IOException("WAL has been closed"); 702 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 703 // drain all the pending sync requests 704 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 705 nextCursor++) { 706 if (!waitingConsumePayloads.isPublished(nextCursor)) { 707 break; 708 } 709 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 710 switch (truck.type()) { 711 case SYNC: 712 syncFutures.add(truck.unloadSync()); 713 break; 714 default: 715 break; 716 } 717 } 718 // and fail them 719 syncFutures.forEach(f -> f.done(f.getTxid(), error)); 720 if (!(consumeExecutor instanceof EventLoop)) { 721 consumeExecutor.shutdown(); 722 } 723 } 724 725 @Override 726 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 727 writer.append(entry); 728 } 729 730 @Override 731 DatanodeInfo[] getPipeline() { 732 AsyncFSOutput output = this.fsOut; 733 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 734 } 735 736 @Override 737 int getLogReplication() { 738 return getPipeline().length; 739 } 740 741 @Override 742 protected boolean doCheckLogLowReplication() { 743 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 744 // typically there is no 'low replication' state, only a 'broken' state. 745 AsyncFSOutput output = this.fsOut; 746 return output != null && output.isBroken(); 747 } 748}