001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import com.lmax.disruptor.RingBuffer; 023import com.lmax.disruptor.Sequence; 024import com.lmax.disruptor.Sequencer; 025import java.io.IOException; 026import java.lang.reflect.Field; 027import java.util.ArrayDeque; 028import java.util.Comparator; 029import java.util.Deque; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Queue; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.locks.Condition; 042import java.util.concurrent.locks.Lock; 043import java.util.concurrent.locks.ReentrantLock; 044import java.util.function.Supplier; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.hbase.HBaseInterfaceAudience; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 051import org.apache.hadoop.hbase.trace.TraceUtil; 052import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 053import org.apache.hadoop.hbase.wal.WALEdit; 054import org.apache.hadoop.hbase.wal.WALKeyImpl; 055import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 056import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 058import org.apache.htrace.core.TraceScope; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 064import org.apache.hbase.thirdparty.io.netty.channel.Channel; 065import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 066import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 067import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 068 069/** 070 * An asynchronous implementation of FSWAL. 071 * <p> 072 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 073 * <p> 074 * For append, we process it as follow: 075 * <ol> 076 * <li>In the caller thread(typically, in the rpc handler thread): 077 * <ol> 078 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 079 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 080 * </li> 081 * </ol> 082 * </li> 083 * <li>In the consumer task(executed in a single threaded thread pool) 084 * <ol> 085 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 086 * {@link #toWriteAppends}</li> 087 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 088 * {@link #unackedAppends}</li> 089 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 090 * sync on the AsyncWriter.</li> 091 * <li>In the callback methods: 092 * <ul> 093 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 094 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 095 * wait for writing them again.</li> 096 * </ul> 097 * </li> 098 * </ol> 099 * </li> 100 * </ol> 101 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 102 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 103 * <p> 104 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 105 * FSHLog.<br> 106 * For a normal roll request(for example, we have reached the log roll size): 107 * <ol> 108 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 109 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 110 * {@link #waitForSafePoint()}).</li> 111 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 112 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 113 * </li> 114 * <li>If there are unflush data in the writer, sync them.</li> 115 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 116 * signal the {@link #readyForRollingCond}.</li> 117 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 118 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 119 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 120 * <li>Schedule the consumer task.</li> 121 * <li>Schedule a background task to close the old writer.</li> 122 * </ol> 123 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 124 * point stage. 125 */ 126@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 127public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 128 129 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 130 131 private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> { 132 int c = Long.compare(o1.getTxid(), o2.getTxid()); 133 return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); 134 }; 135 136 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 137 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 138 139 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 140 "hbase.wal.async.use-shared-event-loop"; 141 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 142 143 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 144 "hbase.wal.async.wait.on.shutdown.seconds"; 145 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 146 147 private final EventLoopGroup eventLoopGroup; 148 149 private final ExecutorService consumeExecutor; 150 151 private final Class<? extends Channel> channelClass; 152 153 private final Lock consumeLock = new ReentrantLock(); 154 155 private final Runnable consumer = this::consume; 156 157 // check if there is already a consumer task in the event loop's task queue 158 private final Supplier<Boolean> hasConsumerTask; 159 160 private static final int MAX_EPOCH = 0x3FFFFFFF; 161 // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old 162 // writer to be closed. 163 // the second lowest bit is writerBorken which means the current writer is broken and rollWriter 164 // is needed. 165 // all other bits are the epoch number of the current writer, this is used to detect whether the 166 // writer is still the one when you issue the sync. 167 // notice that, modification to this field is only allowed under the protection of consumeLock. 168 private volatile int epochAndState; 169 170 private boolean readyForRolling; 171 172 private final Condition readyForRollingCond = consumeLock.newCondition(); 173 174 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 175 176 private final Sequence waitingConsumePayloadsGatingSequence; 177 178 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 179 180 private final long batchSize; 181 182 private final ExecutorService closeExecutor = Executors.newCachedThreadPool( 183 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 184 185 private volatile AsyncFSOutput fsOut; 186 187 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 188 189 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 190 191 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 192 193 // the highest txid of WAL entries being processed 194 private long highestProcessedAppendTxid; 195 196 // file length when we issue last sync request on the writer 197 private long fileLengthAtLastSync; 198 199 private long highestProcessedAppendTxidAtLastSync; 200 201 private final int waitOnShutdownInSeconds; 202 203 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 204 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 205 String prefix, String suffix, EventLoopGroup eventLoopGroup, 206 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 207 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 208 this.eventLoopGroup = eventLoopGroup; 209 this.channelClass = channelClass; 210 Supplier<Boolean> hasConsumerTask; 211 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 212 this.consumeExecutor = eventLoopGroup.next(); 213 if (consumeExecutor instanceof SingleThreadEventExecutor) { 214 try { 215 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 216 field.setAccessible(true); 217 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 218 hasConsumerTask = () -> queue.peek() == consumer; 219 } catch (Exception e) { 220 LOG.warn("Can not get task queue of " + consumeExecutor + 221 ", this is not necessary, just give up", e); 222 hasConsumerTask = () -> false; 223 } 224 } else { 225 hasConsumerTask = () -> false; 226 } 227 } else { 228 ThreadPoolExecutor threadPool = 229 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 230 new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build()); 231 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 232 this.consumeExecutor = threadPool; 233 } 234 235 this.hasConsumerTask = hasConsumerTask; 236 int preallocatedEventCount = 237 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 238 waitingConsumePayloads = 239 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 240 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 241 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 242 243 // inrease the ringbuffer sequence so our txid is start from 1 244 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 245 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 246 247 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 248 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 249 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 250 } 251 252 private static boolean waitingRoll(int epochAndState) { 253 return (epochAndState & 1) != 0; 254 } 255 256 private static boolean writerBroken(int epochAndState) { 257 return ((epochAndState >>> 1) & 1) != 0; 258 } 259 260 private static int epoch(int epochAndState) { 261 return epochAndState >>> 2; 262 } 263 264 // return whether we have successfully set readyForRolling to true. 265 private boolean trySetReadyForRolling() { 266 // Check without holding lock first. Usually we will just return here. 267 // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to 268 // check them outside the consumeLock. 269 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 270 return false; 271 } 272 consumeLock.lock(); 273 try { 274 // 1. a roll is requested 275 // 2. all out-going entries have been acked(we have confirmed above). 276 if (waitingRoll(epochAndState)) { 277 readyForRolling = true; 278 readyForRollingCond.signalAll(); 279 return true; 280 } else { 281 return false; 282 } 283 } finally { 284 consumeLock.unlock(); 285 } 286 } 287 288 private void syncFailed(long epochWhenSync, Throwable error) { 289 LOG.warn("sync failed", error); 290 boolean shouldRequestLogRoll = true; 291 consumeLock.lock(); 292 try { 293 int currentEpochAndState = epochAndState; 294 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 295 // this is not the previous writer which means we have already rolled the writer. 296 // or this is still the current writer, but we have already marked it as broken and request 297 // a roll. 298 return; 299 } 300 this.epochAndState = currentEpochAndState | 0b10; 301 if (waitingRoll(currentEpochAndState)) { 302 readyForRolling = true; 303 readyForRollingCond.signalAll(); 304 // this means we have already in the middle of a rollWriter so just tell the roller thread 305 // that you can continue without requesting an extra log roll. 306 shouldRequestLogRoll = false; 307 } 308 } finally { 309 consumeLock.unlock(); 310 } 311 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 312 toWriteAppends.addFirst(iter.next()); 313 } 314 highestUnsyncedTxid = highestSyncedTxid.get(); 315 if (shouldRequestLogRoll) { 316 // request a roll. 317 requestLogRoll(); 318 } 319 } 320 321 private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { 322 highestSyncedTxid.set(processedTxid); 323 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 324 FSWALEntry entry = iter.next(); 325 if (entry.getTxid() <= processedTxid) { 326 entry.release(); 327 iter.remove(); 328 } else { 329 break; 330 } 331 } 332 postSync(System.nanoTime() - startTimeNs, finishSync(true)); 333 if (trySetReadyForRolling()) { 334 // we have just finished a roll, then do not need to check for log rolling, the writer will be 335 // closed soon. 336 return; 337 } 338 if (writer.getLength() < logrollsize || isLogRollRequested()) { 339 return; 340 } 341 requestLogRoll(); 342 } 343 344 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 345 // sync futures then just use the default one. 346 private boolean isHsync(long beginTxid, long endTxid) { 347 SortedSet<SyncFuture> futures = 348 syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); 349 if (futures.isEmpty()) { 350 return useHsync; 351 } 352 for (SyncFuture future : futures) { 353 if (future.isForceSync()) { 354 return true; 355 } 356 } 357 return false; 358 } 359 360 private void sync(AsyncWriter writer) { 361 fileLengthAtLastSync = writer.getLength(); 362 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 363 boolean shouldUseHsync = 364 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 365 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 366 final long startTimeNs = System.nanoTime(); 367 final long epoch = (long) epochAndState >>> 2L; 368 addListener(writer.sync(shouldUseHsync), (result, error) -> { 369 if (error != null) { 370 syncFailed(epoch, error); 371 } else { 372 syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); 373 } 374 }, consumeExecutor); 375 } 376 377 private void addTimeAnnotation(SyncFuture future, String annotation) { 378 TraceUtil.addTimelineAnnotation(annotation); 379 // TODO handle htrace API change, see HBASE-18895 380 // future.setSpan(scope.getSpan()); 381 } 382 383 private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { 384 int finished = 0; 385 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 386 SyncFuture sync = iter.next(); 387 if (sync.getTxid() <= txid) { 388 sync.done(txid, null); 389 iter.remove(); 390 finished++; 391 if (addSyncTrace) { 392 addTimeAnnotation(sync, "writer synced"); 393 } 394 } else { 395 break; 396 } 397 } 398 return finished; 399 } 400 401 // try advancing the highestSyncedTxid as much as possible 402 private int finishSync(boolean addSyncTrace) { 403 if (unackedAppends.isEmpty()) { 404 // All outstanding appends have been acked. 405 if (toWriteAppends.isEmpty()) { 406 // Also no appends that wait to be written out, then just finished all pending syncs. 407 long maxSyncTxid = highestSyncedTxid.get(); 408 for (SyncFuture sync : syncFutures) { 409 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 410 sync.done(maxSyncTxid, null); 411 if (addSyncTrace) { 412 addTimeAnnotation(sync, "writer synced"); 413 } 414 } 415 highestSyncedTxid.set(maxSyncTxid); 416 int finished = syncFutures.size(); 417 syncFutures.clear(); 418 return finished; 419 } else { 420 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 421 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 422 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 423 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 424 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 425 long doneTxid = lowestUnprocessedAppendTxid - 1; 426 highestSyncedTxid.set(doneTxid); 427 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 428 } 429 } else { 430 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 431 // first unacked append minus 1. 432 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 433 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 434 highestSyncedTxid.set(doneTxid); 435 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 436 } 437 } 438 439 private void appendAndSync() { 440 final AsyncWriter writer = this.writer; 441 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 442 // finish some. 443 finishSync(false); 444 long newHighestProcessedAppendTxid = -1L; 445 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 446 FSWALEntry entry = iter.next(); 447 boolean appended; 448 try { 449 appended = appendEntry(writer, entry); 450 } catch (IOException e) { 451 throw new AssertionError("should not happen", e); 452 } 453 newHighestProcessedAppendTxid = entry.getTxid(); 454 iter.remove(); 455 if (appended) { 456 // This is possible, when we fail to sync, we will add the unackedAppends back to 457 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 458 if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) { 459 unackedAppends.addLast(entry); 460 } 461 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 462 break; 463 } 464 } 465 } 466 // if we have a newer transaction id, update it. 467 // otherwise, use the previous transaction id. 468 if (newHighestProcessedAppendTxid > 0) { 469 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 470 } else { 471 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 472 } 473 474 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 475 // sync because buffer size limit. 476 sync(writer); 477 return; 478 } 479 if (writer.getLength() == fileLengthAtLastSync) { 480 // we haven't written anything out, just advance the highestSyncedSequence since we may only 481 // stamped some region sequence id. 482 if (unackedAppends.isEmpty()) { 483 highestSyncedTxid.set(highestProcessedAppendTxid); 484 finishSync(false); 485 trySetReadyForRolling(); 486 } 487 return; 488 } 489 // reach here means that we have some unsynced data but haven't reached the batch size yet 490 // but we will not issue a sync directly here even if there are sync requests because we may 491 // have some new data in the ringbuffer, so let's just return here and delay the decision of 492 // whether to issue a sync in the caller method. 493 } 494 495 private void consume() { 496 consumeLock.lock(); 497 try { 498 int currentEpochAndState = epochAndState; 499 if (writerBroken(currentEpochAndState)) { 500 return; 501 } 502 if (waitingRoll(currentEpochAndState)) { 503 if (writer.getLength() > fileLengthAtLastSync) { 504 // issue a sync 505 sync(writer); 506 } else { 507 if (unackedAppends.isEmpty()) { 508 readyForRolling = true; 509 readyForRollingCond.signalAll(); 510 } 511 } 512 return; 513 } 514 } finally { 515 consumeLock.unlock(); 516 } 517 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 518 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 519 nextCursor++) { 520 if (!waitingConsumePayloads.isPublished(nextCursor)) { 521 break; 522 } 523 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 524 switch (truck.type()) { 525 case APPEND: 526 toWriteAppends.addLast(truck.unloadAppend()); 527 break; 528 case SYNC: 529 syncFutures.add(truck.unloadSync()); 530 break; 531 default: 532 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 533 break; 534 } 535 waitingConsumePayloadsGatingSequence.set(nextCursor); 536 } 537 appendAndSync(); 538 if (hasConsumerTask.get()) { 539 return; 540 } 541 if (toWriteAppends.isEmpty()) { 542 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 543 consumerScheduled.set(false); 544 // recheck here since in append and sync we do not hold the consumeLock. Thing may 545 // happen like 546 // 1. we check cursor, no new entry 547 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 548 // give up scheduling the consumer task. 549 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 550 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 551 // we will give up consuming so if there are some unsynced data we need to issue a sync. 552 if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() && 553 syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { 554 // no new data in the ringbuffer and we have at least one sync request 555 sync(writer); 556 } 557 return; 558 } else { 559 // maybe someone has grabbed this before us 560 if (!consumerScheduled.compareAndSet(false, true)) { 561 return; 562 } 563 } 564 } 565 } 566 // reschedule if we still have something to write. 567 consumeExecutor.execute(consumer); 568 } 569 570 private boolean shouldScheduleConsumer() { 571 int currentEpochAndState = epochAndState; 572 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 573 return false; 574 } 575 return consumerScheduled.compareAndSet(false, true); 576 } 577 578 @Override 579 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 580 throws IOException { 581 long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, 582 waitingConsumePayloads); 583 if (shouldScheduleConsumer()) { 584 consumeExecutor.execute(consumer); 585 } 586 return txid; 587 } 588 589 @Override 590 public void sync() throws IOException { 591 sync(useHsync); 592 } 593 594 @Override 595 public void sync(long txid) throws IOException { 596 sync(txid, useHsync); 597 } 598 599 @Override 600 public void sync(boolean forceSync) throws IOException { 601 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 602 long txid = waitingConsumePayloads.next(); 603 SyncFuture future; 604 try { 605 future = getSyncFuture(txid, forceSync); 606 RingBufferTruck truck = waitingConsumePayloads.get(txid); 607 truck.load(future); 608 } finally { 609 waitingConsumePayloads.publish(txid); 610 } 611 if (shouldScheduleConsumer()) { 612 consumeExecutor.execute(consumer); 613 } 614 blockOnSync(future); 615 } 616 } 617 618 @Override 619 public void sync(long txid, boolean forceSync) throws IOException { 620 if (highestSyncedTxid.get() >= txid) { 621 return; 622 } 623 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 624 // here we do not use ring buffer sequence as txid 625 long sequence = waitingConsumePayloads.next(); 626 SyncFuture future; 627 try { 628 future = getSyncFuture(txid, forceSync); 629 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 630 truck.load(future); 631 } finally { 632 waitingConsumePayloads.publish(sequence); 633 } 634 if (shouldScheduleConsumer()) { 635 consumeExecutor.execute(consumer); 636 } 637 blockOnSync(future); 638 } 639 } 640 641 @Override 642 protected AsyncWriter createWriterInstance(Path path) throws IOException { 643 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, 644 this.blocksize, eventLoopGroup, channelClass); 645 } 646 647 private void waitForSafePoint() { 648 consumeLock.lock(); 649 try { 650 int currentEpochAndState = epochAndState; 651 if (writerBroken(currentEpochAndState) || this.writer == null) { 652 return; 653 } 654 consumerScheduled.set(true); 655 epochAndState = currentEpochAndState | 1; 656 readyForRolling = false; 657 consumeExecutor.execute(consumer); 658 while (!readyForRolling) { 659 readyForRollingCond.awaitUninterruptibly(); 660 } 661 } finally { 662 consumeLock.unlock(); 663 } 664 } 665 666 private long closeWriter() { 667 AsyncWriter oldWriter = this.writer; 668 if (oldWriter != null) { 669 long fileLength = oldWriter.getLength(); 670 closeExecutor.execute(() -> { 671 try { 672 oldWriter.close(); 673 } catch (IOException e) { 674 LOG.warn("close old writer failed", e); 675 } 676 }); 677 return fileLength; 678 } else { 679 return 0L; 680 } 681 } 682 683 @Override 684 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 685 throws IOException { 686 Preconditions.checkNotNull(nextWriter); 687 waitForSafePoint(); 688 long oldFileLen = closeWriter(); 689 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 690 this.writer = nextWriter; 691 if (nextWriter instanceof AsyncProtobufLogWriter) { 692 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 693 } 694 this.fileLengthAtLastSync = nextWriter.getLength(); 695 this.highestProcessedAppendTxidAtLastSync = 0L; 696 consumeLock.lock(); 697 try { 698 consumerScheduled.set(true); 699 int currentEpoch = epochAndState >>> 2; 700 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 701 // set a new epoch and also clear waitingRoll and writerBroken 702 this.epochAndState = nextEpoch << 2; 703 // Reset rollRequested status 704 rollRequested.set(false); 705 consumeExecutor.execute(consumer); 706 } finally { 707 consumeLock.unlock(); 708 } 709 } 710 711 @Override 712 protected void doShutdown() throws IOException { 713 waitForSafePoint(); 714 closeWriter(); 715 closeExecutor.shutdown(); 716 try { 717 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 718 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + 719 " the close of async writer doesn't complete." + 720 "Please check the status of underlying filesystem" + 721 " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + 722 "\""); 723 } 724 } catch (InterruptedException e) { 725 LOG.error("The wait for close of async writer is interrupted"); 726 Thread.currentThread().interrupt(); 727 } 728 IOException error = new IOException("WAL has been closed"); 729 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 730 // drain all the pending sync requests 731 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 732 nextCursor++) { 733 if (!waitingConsumePayloads.isPublished(nextCursor)) { 734 break; 735 } 736 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 737 switch (truck.type()) { 738 case SYNC: 739 syncFutures.add(truck.unloadSync()); 740 break; 741 default: 742 break; 743 } 744 } 745 // and fail them 746 syncFutures.forEach(f -> f.done(f.getTxid(), error)); 747 if (!(consumeExecutor instanceof EventLoop)) { 748 consumeExecutor.shutdown(); 749 } 750 } 751 752 @Override 753 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 754 writer.append(entry); 755 } 756 757 @Override 758 DatanodeInfo[] getPipeline() { 759 AsyncFSOutput output = this.fsOut; 760 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 761 } 762 763 @Override 764 int getLogReplication() { 765 return getPipeline().length; 766 } 767 768 @Override 769 protected boolean doCheckLogLowReplication() { 770 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 771 // typically there is no 'low replication' state, only a 'broken' state. 772 AsyncFSOutput output = this.fsOut; 773 return output != null && output.isBroken(); 774 } 775}