001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.lmax.disruptor.RingBuffer; 025import com.lmax.disruptor.Sequence; 026import com.lmax.disruptor.Sequencer; 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.util.ArrayDeque; 030import java.util.Comparator; 031import java.util.Deque; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.SortedSet; 036import java.util.TreeSet; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.LinkedBlockingQueue; 040import java.util.concurrent.ThreadPoolExecutor; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.locks.Condition; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046import java.util.function.Supplier; 047 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.hbase.Abortable; 052import org.apache.hadoop.hbase.HBaseInterfaceAudience; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 055import org.apache.hadoop.hbase.trace.TraceUtil; 056import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 057import org.apache.hadoop.hbase.wal.WALEdit; 058import org.apache.hadoop.hbase.wal.WALKeyImpl; 059import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 060import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 063import org.apache.htrace.core.TraceScope; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067import org.apache.hbase.thirdparty.io.netty.channel.Channel; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 069import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 070import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 071 072 073/** 074 * An asynchronous implementation of FSWAL. 075 * <p> 076 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 077 * <p> 078 * For append, we process it as follow: 079 * <ol> 080 * <li>In the caller thread(typically, in the rpc handler thread): 081 * <ol> 082 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 083 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 084 * </li> 085 * </ol> 086 * </li> 087 * <li>In the consumer task(executed in a single threaded thread pool) 088 * <ol> 089 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 090 * {@link #toWriteAppends}</li> 091 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 092 * {@link #unackedAppends}</li> 093 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 094 * sync on the AsyncWriter.</li> 095 * <li>In the callback methods: 096 * <ul> 097 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 098 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 099 * wait for writing them again.</li> 100 * </ul> 101 * </li> 102 * </ol> 103 * </li> 104 * </ol> 105 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 106 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 107 * <p> 108 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 109 * FSHLog.<br> 110 * For a normal roll request(for example, we have reached the log roll size): 111 * <ol> 112 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 113 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 114 * {@link #waitForSafePoint()}).</li> 115 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 116 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 117 * </li> 118 * <li>If there are unflush data in the writer, sync them.</li> 119 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 120 * signal the {@link #readyForRollingCond}.</li> 121 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 122 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 123 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 124 * <li>Schedule the consumer task.</li> 125 * <li>Schedule a background task to close the old writer.</li> 126 * </ol> 127 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 128 * point stage. 129 */ 130@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 131public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 132 133 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 134 135 private static final Comparator<SyncFuture> SEQ_COMPARATOR = (o1, o2) -> { 136 int c = Long.compare(o1.getTxid(), o2.getTxid()); 137 return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); 138 }; 139 140 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 141 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 142 143 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 144 "hbase.wal.async.use-shared-event-loop"; 145 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 146 147 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 148 "hbase.wal.async.wait.on.shutdown.seconds"; 149 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 150 151 private final EventLoopGroup eventLoopGroup; 152 153 private final ExecutorService consumeExecutor; 154 155 private final Class<? extends Channel> channelClass; 156 157 private final Lock consumeLock = new ReentrantLock(); 158 159 private final Runnable consumer = this::consume; 160 161 // check if there is already a consumer task in the event loop's task queue 162 private final Supplier<Boolean> hasConsumerTask; 163 164 private static final int MAX_EPOCH = 0x3FFFFFFF; 165 // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old 166 // writer to be closed. 167 // the second lowest bit is writerBorken which means the current writer is broken and rollWriter 168 // is needed. 169 // all other bits are the epoch number of the current writer, this is used to detect whether the 170 // writer is still the one when you issue the sync. 171 // notice that, modification to this field is only allowed under the protection of consumeLock. 172 private volatile int epochAndState; 173 174 private boolean readyForRolling; 175 176 private final Condition readyForRollingCond = consumeLock.newCondition(); 177 178 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 179 180 private final Sequence waitingConsumePayloadsGatingSequence; 181 182 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 183 184 private final long batchSize; 185 186 private final ExecutorService closeExecutor = Executors.newCachedThreadPool( 187 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 188 189 private volatile AsyncFSOutput fsOut; 190 191 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 192 193 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 194 195 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 196 197 // the highest txid of WAL entries being processed 198 private long highestProcessedAppendTxid; 199 200 // file length when we issue last sync request on the writer 201 private long fileLengthAtLastSync; 202 203 private long highestProcessedAppendTxidAtLastSync; 204 205 private final int waitOnShutdownInSeconds; 206 207 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 208 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, 209 String prefix, String suffix, EventLoopGroup eventLoopGroup, 210 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 211 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, 212 eventLoopGroup, channelClass); 213 } 214 215 public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 216 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 217 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 218 Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { 219 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 220 suffix); 221 this.eventLoopGroup = eventLoopGroup; 222 this.channelClass = channelClass; 223 Supplier<Boolean> hasConsumerTask; 224 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 225 this.consumeExecutor = eventLoopGroup.next(); 226 if (consumeExecutor instanceof SingleThreadEventExecutor) { 227 try { 228 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 229 field.setAccessible(true); 230 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 231 hasConsumerTask = () -> queue.peek() == consumer; 232 } catch (Exception e) { 233 LOG.warn("Can not get task queue of " + consumeExecutor + 234 ", this is not necessary, just give up", e); 235 hasConsumerTask = () -> false; 236 } 237 } else { 238 hasConsumerTask = () -> false; 239 } 240 } else { 241 ThreadPoolExecutor threadPool = 242 new ThreadPoolExecutor(1, 1, 0L, 243 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 244 new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()). 245 setDaemon(true).build()); 246 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 247 this.consumeExecutor = threadPool; 248 } 249 250 this.hasConsumerTask = hasConsumerTask; 251 int preallocatedEventCount = 252 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 253 waitingConsumePayloads = 254 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 255 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 256 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 257 258 // inrease the ringbuffer sequence so our txid is start from 1 259 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 260 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 261 262 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 263 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 264 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 265 } 266 267 private static boolean waitingRoll(int epochAndState) { 268 return (epochAndState & 1) != 0; 269 } 270 271 private static boolean writerBroken(int epochAndState) { 272 return ((epochAndState >>> 1) & 1) != 0; 273 } 274 275 private static int epoch(int epochAndState) { 276 return epochAndState >>> 2; 277 } 278 279 // return whether we have successfully set readyForRolling to true. 280 private boolean trySetReadyForRolling() { 281 // Check without holding lock first. Usually we will just return here. 282 // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to 283 // check them outside the consumeLock. 284 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 285 return false; 286 } 287 consumeLock.lock(); 288 try { 289 // 1. a roll is requested 290 // 2. all out-going entries have been acked(we have confirmed above). 291 if (waitingRoll(epochAndState)) { 292 readyForRolling = true; 293 readyForRollingCond.signalAll(); 294 return true; 295 } else { 296 return false; 297 } 298 } finally { 299 consumeLock.unlock(); 300 } 301 } 302 303 private void syncFailed(long epochWhenSync, Throwable error) { 304 LOG.warn("sync failed", error); 305 boolean shouldRequestLogRoll = true; 306 consumeLock.lock(); 307 try { 308 int currentEpochAndState = epochAndState; 309 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 310 // this is not the previous writer which means we have already rolled the writer. 311 // or this is still the current writer, but we have already marked it as broken and request 312 // a roll. 313 return; 314 } 315 this.epochAndState = currentEpochAndState | 0b10; 316 if (waitingRoll(currentEpochAndState)) { 317 readyForRolling = true; 318 readyForRollingCond.signalAll(); 319 // this means we have already in the middle of a rollWriter so just tell the roller thread 320 // that you can continue without requesting an extra log roll. 321 shouldRequestLogRoll = false; 322 } 323 } finally { 324 consumeLock.unlock(); 325 } 326 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 327 toWriteAppends.addFirst(iter.next()); 328 } 329 highestUnsyncedTxid = highestSyncedTxid.get(); 330 if (shouldRequestLogRoll) { 331 // request a roll. 332 requestLogRoll(ERROR); 333 } 334 } 335 336 private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { 337 highestSyncedTxid.set(processedTxid); 338 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 339 FSWALEntry entry = iter.next(); 340 if (entry.getTxid() <= processedTxid) { 341 entry.release(); 342 iter.remove(); 343 } else { 344 break; 345 } 346 } 347 postSync(System.nanoTime() - startTimeNs, finishSync(true)); 348 if (trySetReadyForRolling()) { 349 // we have just finished a roll, then do not need to check for log rolling, the writer will be 350 // closed soon. 351 return; 352 } 353 // If we haven't already requested a roll, check if we have exceeded logrollsize 354 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 355 if (LOG.isDebugEnabled()) { 356 LOG.debug("Requesting log roll because of file size threshold; length=" + 357 writer.getLength() + ", logrollsize=" + logrollsize); 358 } 359 requestLogRoll(SIZE); 360 } 361 } 362 363 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 364 // sync futures then just use the default one. 365 private boolean isHsync(long beginTxid, long endTxid) { 366 SortedSet<SyncFuture> futures = 367 syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); 368 if (futures.isEmpty()) { 369 return useHsync; 370 } 371 for (SyncFuture future : futures) { 372 if (future.isForceSync()) { 373 return true; 374 } 375 } 376 return false; 377 } 378 379 private void sync(AsyncWriter writer) { 380 fileLengthAtLastSync = writer.getLength(); 381 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 382 boolean shouldUseHsync = 383 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 384 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 385 final long startTimeNs = System.nanoTime(); 386 final long epoch = (long) epochAndState >>> 2L; 387 addListener(writer.sync(shouldUseHsync), (result, error) -> { 388 if (error != null) { 389 syncFailed(epoch, error); 390 } else { 391 syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); 392 } 393 }, consumeExecutor); 394 } 395 396 private void addTimeAnnotation(SyncFuture future, String annotation) { 397 TraceUtil.addTimelineAnnotation(annotation); 398 // TODO handle htrace API change, see HBASE-18895 399 // future.setSpan(scope.getSpan()); 400 } 401 402 private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { 403 int finished = 0; 404 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 405 SyncFuture sync = iter.next(); 406 if (sync.getTxid() <= txid) { 407 sync.done(txid, null); 408 iter.remove(); 409 finished++; 410 if (addSyncTrace) { 411 addTimeAnnotation(sync, "writer synced"); 412 } 413 } else { 414 break; 415 } 416 } 417 return finished; 418 } 419 420 // try advancing the highestSyncedTxid as much as possible 421 private int finishSync(boolean addSyncTrace) { 422 if (unackedAppends.isEmpty()) { 423 // All outstanding appends have been acked. 424 if (toWriteAppends.isEmpty()) { 425 // Also no appends that wait to be written out, then just finished all pending syncs. 426 long maxSyncTxid = highestSyncedTxid.get(); 427 for (SyncFuture sync : syncFutures) { 428 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 429 sync.done(maxSyncTxid, null); 430 if (addSyncTrace) { 431 addTimeAnnotation(sync, "writer synced"); 432 } 433 } 434 highestSyncedTxid.set(maxSyncTxid); 435 int finished = syncFutures.size(); 436 syncFutures.clear(); 437 return finished; 438 } else { 439 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 440 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 441 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 442 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 443 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 444 long doneTxid = lowestUnprocessedAppendTxid - 1; 445 highestSyncedTxid.set(doneTxid); 446 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 447 } 448 } else { 449 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 450 // first unacked append minus 1. 451 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 452 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 453 highestSyncedTxid.set(doneTxid); 454 return finishSyncLowerThanTxid(doneTxid, addSyncTrace); 455 } 456 } 457 458 private void appendAndSync() { 459 final AsyncWriter writer = this.writer; 460 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 461 // finish some. 462 finishSync(false); 463 long newHighestProcessedAppendTxid = -1L; 464 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 465 FSWALEntry entry = iter.next(); 466 boolean appended; 467 try { 468 appended = appendEntry(writer, entry); 469 } catch (IOException e) { 470 throw new AssertionError("should not happen", e); 471 } 472 newHighestProcessedAppendTxid = entry.getTxid(); 473 iter.remove(); 474 if (appended) { 475 // This is possible, when we fail to sync, we will add the unackedAppends back to 476 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 477 if (unackedAppends.isEmpty() || unackedAppends.peekLast().getTxid() < entry.getTxid()) { 478 unackedAppends.addLast(entry); 479 } 480 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 481 break; 482 } 483 } 484 } 485 // if we have a newer transaction id, update it. 486 // otherwise, use the previous transaction id. 487 if (newHighestProcessedAppendTxid > 0) { 488 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 489 } else { 490 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 491 } 492 493 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 494 // sync because buffer size limit. 495 sync(writer); 496 return; 497 } 498 if (writer.getLength() == fileLengthAtLastSync) { 499 // we haven't written anything out, just advance the highestSyncedSequence since we may only 500 // stamped some region sequence id. 501 if (unackedAppends.isEmpty()) { 502 highestSyncedTxid.set(highestProcessedAppendTxid); 503 finishSync(false); 504 trySetReadyForRolling(); 505 } 506 return; 507 } 508 // reach here means that we have some unsynced data but haven't reached the batch size yet 509 // but we will not issue a sync directly here even if there are sync requests because we may 510 // have some new data in the ringbuffer, so let's just return here and delay the decision of 511 // whether to issue a sync in the caller method. 512 } 513 514 private void consume() { 515 consumeLock.lock(); 516 try { 517 int currentEpochAndState = epochAndState; 518 if (writerBroken(currentEpochAndState)) { 519 return; 520 } 521 if (waitingRoll(currentEpochAndState)) { 522 if (writer.getLength() > fileLengthAtLastSync) { 523 // issue a sync 524 sync(writer); 525 } else { 526 if (unackedAppends.isEmpty()) { 527 readyForRolling = true; 528 readyForRollingCond.signalAll(); 529 } 530 } 531 return; 532 } 533 } finally { 534 consumeLock.unlock(); 535 } 536 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 537 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 538 nextCursor++) { 539 if (!waitingConsumePayloads.isPublished(nextCursor)) { 540 break; 541 } 542 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 543 switch (truck.type()) { 544 case APPEND: 545 toWriteAppends.addLast(truck.unloadAppend()); 546 break; 547 case SYNC: 548 syncFutures.add(truck.unloadSync()); 549 break; 550 default: 551 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 552 break; 553 } 554 waitingConsumePayloadsGatingSequence.set(nextCursor); 555 } 556 appendAndSync(); 557 if (hasConsumerTask.get()) { 558 return; 559 } 560 if (toWriteAppends.isEmpty()) { 561 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 562 consumerScheduled.set(false); 563 // recheck here since in append and sync we do not hold the consumeLock. Thing may 564 // happen like 565 // 1. we check cursor, no new entry 566 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 567 // give up scheduling the consumer task. 568 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 569 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 570 // we will give up consuming so if there are some unsynced data we need to issue a sync. 571 if (writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() && 572 syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync) { 573 // no new data in the ringbuffer and we have at least one sync request 574 sync(writer); 575 } 576 return; 577 } else { 578 // maybe someone has grabbed this before us 579 if (!consumerScheduled.compareAndSet(false, true)) { 580 return; 581 } 582 } 583 } 584 } 585 // reschedule if we still have something to write. 586 consumeExecutor.execute(consumer); 587 } 588 589 private boolean shouldScheduleConsumer() { 590 int currentEpochAndState = epochAndState; 591 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 592 return false; 593 } 594 return consumerScheduled.compareAndSet(false, true); 595 } 596 597 @Override 598 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 599 throws IOException { 600 long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, 601 waitingConsumePayloads); 602 if (shouldScheduleConsumer()) { 603 consumeExecutor.execute(consumer); 604 } 605 return txid; 606 } 607 608 @Override 609 public void sync() throws IOException { 610 sync(useHsync); 611 } 612 613 @Override 614 public void sync(long txid) throws IOException { 615 sync(txid, useHsync); 616 } 617 618 @Override 619 public void sync(boolean forceSync) throws IOException { 620 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 621 long txid = waitingConsumePayloads.next(); 622 SyncFuture future; 623 try { 624 future = getSyncFuture(txid, forceSync); 625 RingBufferTruck truck = waitingConsumePayloads.get(txid); 626 truck.load(future); 627 } finally { 628 waitingConsumePayloads.publish(txid); 629 } 630 if (shouldScheduleConsumer()) { 631 consumeExecutor.execute(consumer); 632 } 633 blockOnSync(future); 634 } 635 } 636 637 @Override 638 public void sync(long txid, boolean forceSync) throws IOException { 639 if (highestSyncedTxid.get() >= txid) { 640 return; 641 } 642 try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { 643 // here we do not use ring buffer sequence as txid 644 long sequence = waitingConsumePayloads.next(); 645 SyncFuture future; 646 try { 647 future = getSyncFuture(txid, forceSync); 648 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 649 truck.load(future); 650 } finally { 651 waitingConsumePayloads.publish(sequence); 652 } 653 if (shouldScheduleConsumer()) { 654 consumeExecutor.execute(consumer); 655 } 656 blockOnSync(future); 657 } 658 } 659 660 @Override 661 protected AsyncWriter createWriterInstance(Path path) throws IOException { 662 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, 663 this.blocksize, eventLoopGroup, channelClass); 664 } 665 666 private void waitForSafePoint() { 667 consumeLock.lock(); 668 try { 669 int currentEpochAndState = epochAndState; 670 if (writerBroken(currentEpochAndState) || this.writer == null) { 671 return; 672 } 673 consumerScheduled.set(true); 674 epochAndState = currentEpochAndState | 1; 675 readyForRolling = false; 676 consumeExecutor.execute(consumer); 677 while (!readyForRolling) { 678 readyForRollingCond.awaitUninterruptibly(); 679 } 680 } finally { 681 consumeLock.unlock(); 682 } 683 } 684 685 protected final long closeWriter(AsyncWriter writer) { 686 if (writer != null) { 687 long fileLength = writer.getLength(); 688 closeExecutor.execute(() -> { 689 try { 690 writer.close(); 691 } catch (IOException e) { 692 LOG.warn("close old writer failed", e); 693 } 694 }); 695 return fileLength; 696 } else { 697 return 0L; 698 } 699 } 700 701 @Override 702 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 703 throws IOException { 704 Preconditions.checkNotNull(nextWriter); 705 waitForSafePoint(); 706 long oldFileLen = closeWriter(this.writer); 707 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 708 this.writer = nextWriter; 709 if (nextWriter instanceof AsyncProtobufLogWriter) { 710 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 711 } 712 this.fileLengthAtLastSync = nextWriter.getLength(); 713 this.highestProcessedAppendTxidAtLastSync = 0L; 714 consumeLock.lock(); 715 try { 716 consumerScheduled.set(true); 717 int currentEpoch = epochAndState >>> 2; 718 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 719 // set a new epoch and also clear waitingRoll and writerBroken 720 this.epochAndState = nextEpoch << 2; 721 // Reset rollRequested status 722 rollRequested.set(false); 723 consumeExecutor.execute(consumer); 724 } finally { 725 consumeLock.unlock(); 726 } 727 } 728 729 @Override 730 protected void doShutdown() throws IOException { 731 waitForSafePoint(); 732 closeWriter(this.writer); 733 this.writer = null; 734 closeExecutor.shutdown(); 735 try { 736 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 737 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + 738 " the close of async writer doesn't complete." + 739 "Please check the status of underlying filesystem" + 740 " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + 741 "\""); 742 } 743 } catch (InterruptedException e) { 744 LOG.error("The wait for close of async writer is interrupted"); 745 Thread.currentThread().interrupt(); 746 } 747 IOException error = new IOException("WAL has been closed"); 748 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 749 // drain all the pending sync requests 750 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 751 nextCursor++) { 752 if (!waitingConsumePayloads.isPublished(nextCursor)) { 753 break; 754 } 755 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 756 switch (truck.type()) { 757 case SYNC: 758 syncFutures.add(truck.unloadSync()); 759 break; 760 default: 761 break; 762 } 763 } 764 // and fail them 765 syncFutures.forEach(f -> f.done(f.getTxid(), error)); 766 if (!(consumeExecutor instanceof EventLoop)) { 767 consumeExecutor.shutdown(); 768 } 769 } 770 771 @Override 772 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 773 writer.append(entry); 774 } 775 776 @Override 777 DatanodeInfo[] getPipeline() { 778 AsyncFSOutput output = this.fsOut; 779 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 780 } 781 782 @Override 783 int getLogReplication() { 784 return getPipeline().length; 785 } 786 787 @Override 788 protected boolean doCheckLogLowReplication() { 789 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 790 // typically there is no 'low replication' state, only a 'broken' state. 791 AsyncFSOutput output = this.fsOut; 792 return output != null && output.isBroken(); 793 } 794}