001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.lmax.disruptor.RingBuffer; 025import com.lmax.disruptor.Sequence; 026import com.lmax.disruptor.Sequencer; 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.util.ArrayDeque; 030import java.util.Comparator; 031import java.util.Deque; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.SortedSet; 036import java.util.TreeSet; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.LinkedBlockingQueue; 040import java.util.concurrent.ThreadPoolExecutor; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.locks.Condition; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046import java.util.function.Supplier; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Abortable; 051import org.apache.hadoop.hbase.HBaseInterfaceAudience; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 055import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 059import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 066import org.apache.hbase.thirdparty.io.netty.channel.Channel; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 069import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 070 071/** 072 * An asynchronous implementation of FSWAL. 073 * <p> 074 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 075 * <p> 076 * For append, we process it as follow: 077 * <ol> 078 * <li>In the caller thread(typically, in the rpc handler thread): 079 * <ol> 080 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 081 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 082 * </li> 083 * </ol> 084 * </li> 085 * <li>In the consumer task(executed in a single threaded thread pool) 086 * <ol> 087 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 088 * {@link #toWriteAppends}</li> 089 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 090 * {@link #unackedAppends}</li> 091 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 092 * sync on the AsyncWriter.</li> 093 * <li>In the callback methods: 094 * <ul> 095 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 096 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 097 * wait for writing them again.</li> 098 * </ul> 099 * </li> 100 * </ol> 101 * </li> 102 * </ol> 103 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 104 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 105 * <p> 106 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 107 * FSHLog.<br> 108 * For a normal roll request(for example, we have reached the log roll size): 109 * <ol> 110 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 111 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 112 * {@link #waitForSafePoint()}).</li> 113 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 114 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 115 * </li> 116 * <li>If there are unflush data in the writer, sync them.</li> 117 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 118 * signal the {@link #readyForRollingCond}.</li> 119 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 120 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 121 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 122 * <li>Schedule the consumer task.</li> 123 * <li>Schedule a background task to close the old writer.</li> 124 * </ol> 125 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 126 * point stage. 127 */ 128@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 129public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 130 131 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 132 133 private static final Comparator<SyncFuture> SEQ_COMPARATOR = 134 Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); 135 136 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 137 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 138 139 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 140 "hbase.wal.async.use-shared-event-loop"; 141 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 142 143 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 144 "hbase.wal.async.wait.on.shutdown.seconds"; 145 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 146 147 private final EventLoopGroup eventLoopGroup; 148 149 private final ExecutorService consumeExecutor; 150 151 private final Class<? extends Channel> channelClass; 152 153 private final Lock consumeLock = new ReentrantLock(); 154 155 private final Runnable consumer = this::consume; 156 157 // check if there is already a consumer task in the event loop's task queue 158 private final Supplier<Boolean> hasConsumerTask; 159 160 private static final int MAX_EPOCH = 0x3FFFFFFF; 161 // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old 162 // writer to be closed. 163 // the second lowest bit is writerBroken which means the current writer is broken and rollWriter 164 // is needed. 165 // all other bits are the epoch number of the current writer, this is used to detect whether the 166 // writer is still the one when you issue the sync. 167 // notice that, modification to this field is only allowed under the protection of consumeLock. 168 private volatile int epochAndState; 169 170 private boolean readyForRolling; 171 172 private final Condition readyForRollingCond = consumeLock.newCondition(); 173 174 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 175 176 private final Sequence waitingConsumePayloadsGatingSequence; 177 178 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 179 180 private final long batchSize; 181 182 private final ExecutorService closeExecutor = Executors.newCachedThreadPool( 183 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 184 185 private volatile AsyncFSOutput fsOut; 186 187 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 188 189 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 190 191 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 192 193 // the highest txid of WAL entries being processed 194 private long highestProcessedAppendTxid; 195 196 // file length when we issue last sync request on the writer 197 private long fileLengthAtLastSync; 198 199 private long highestProcessedAppendTxidAtLastSync; 200 201 private final int waitOnShutdownInSeconds; 202 203 private final StreamSlowMonitor streamSlowMonitor; 204 205 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 206 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 207 String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 208 throws FailedLogCloseException, IOException { 209 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, 210 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix")); 211 } 212 213 public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 214 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 215 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 216 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 217 throws FailedLogCloseException, IOException { 218 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 219 suffix); 220 this.eventLoopGroup = eventLoopGroup; 221 this.channelClass = channelClass; 222 this.streamSlowMonitor = monitor; 223 Supplier<Boolean> hasConsumerTask; 224 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 225 this.consumeExecutor = eventLoopGroup.next(); 226 if (consumeExecutor instanceof SingleThreadEventExecutor) { 227 try { 228 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 229 field.setAccessible(true); 230 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 231 hasConsumerTask = () -> queue.peek() == consumer; 232 } catch (Exception e) { 233 LOG.warn("Can not get task queue of " + consumeExecutor 234 + ", this is not necessary, just give up", e); 235 hasConsumerTask = () -> false; 236 } 237 } else { 238 hasConsumerTask = () -> false; 239 } 240 } else { 241 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 242 new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder() 243 .setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()).setDaemon(true).build()); 244 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 245 this.consumeExecutor = threadPool; 246 } 247 248 this.hasConsumerTask = hasConsumerTask; 249 int preallocatedEventCount = 250 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 251 waitingConsumePayloads = 252 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 253 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 254 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 255 256 // inrease the ringbuffer sequence so our txid is start from 1 257 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 258 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 259 260 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 261 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 262 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 263 } 264 265 /** 266 * Helper that marks the future as DONE and offers it back to the cache. 267 */ 268 private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { 269 future.done(txid, t); 270 syncFutureCache.offer(future); 271 } 272 273 private static boolean waitingRoll(int epochAndState) { 274 return (epochAndState & 1) != 0; 275 } 276 277 private static boolean writerBroken(int epochAndState) { 278 return ((epochAndState >>> 1) & 1) != 0; 279 } 280 281 private static int epoch(int epochAndState) { 282 return epochAndState >>> 2; 283 } 284 285 // return whether we have successfully set readyForRolling to true. 286 private boolean trySetReadyForRolling() { 287 // Check without holding lock first. Usually we will just return here. 288 // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to 289 // check them outside the consumeLock. 290 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 291 return false; 292 } 293 consumeLock.lock(); 294 try { 295 // 1. a roll is requested 296 // 2. all out-going entries have been acked(we have confirmed above). 297 if (waitingRoll(epochAndState)) { 298 readyForRolling = true; 299 readyForRollingCond.signalAll(); 300 return true; 301 } else { 302 return false; 303 } 304 } finally { 305 consumeLock.unlock(); 306 } 307 } 308 309 private void syncFailed(long epochWhenSync, Throwable error) { 310 LOG.warn("sync failed", error); 311 boolean shouldRequestLogRoll = true; 312 consumeLock.lock(); 313 try { 314 int currentEpochAndState = epochAndState; 315 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 316 // this is not the previous writer which means we have already rolled the writer. 317 // or this is still the current writer, but we have already marked it as broken and request 318 // a roll. 319 return; 320 } 321 this.epochAndState = currentEpochAndState | 0b10; 322 if (waitingRoll(currentEpochAndState)) { 323 readyForRolling = true; 324 readyForRollingCond.signalAll(); 325 // this means we have already in the middle of a rollWriter so just tell the roller thread 326 // that you can continue without requesting an extra log roll. 327 shouldRequestLogRoll = false; 328 } 329 } finally { 330 consumeLock.unlock(); 331 } 332 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 333 toWriteAppends.addFirst(iter.next()); 334 } 335 highestUnsyncedTxid = highestSyncedTxid.get(); 336 if (shouldRequestLogRoll) { 337 // request a roll. 338 requestLogRoll(ERROR); 339 } 340 } 341 342 private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid, 343 long startTimeNs) { 344 // Please see the last several comments on HBASE-22761, it is possible that we get a 345 // syncCompleted which acks a previous sync request after we received a syncFailed on the same 346 // writer. So here we will also check on the epoch and state, if the epoch has already been 347 // changed, i.e, we have already rolled the writer, or the writer is already broken, we should 348 // just skip here, to avoid mess up the state or accidentally release some WAL entries and 349 // cause data corruption. 350 // The syncCompleted call is on the critical write path so we should try our best to make it 351 // fast. So here we do not hold consumeLock, for increasing performance. It is safe because 352 // there are only 3 possible situations: 353 // 1. For normal case, the only place where we change epochAndState is when rolling the writer. 354 // Before rolling actually happen, we will only change the state to waitingRoll which is another 355 // bit than writerBroken, and when we actually change the epoch, we can make sure that there is 356 // no out going sync request. So we will always pass the check here and there is no problem. 357 // 2. The writer is broken, but we have not called syncFailed yet. In this case, since 358 // syncFailed and syncCompleted are executed in the same thread, we will just face the same 359 // situation with #1. 360 // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are 361 // only 2 possible situations: 362 // a. we arrive before we actually roll the writer, then we will find out the writer is broken 363 // and give up. 364 // b. we arrive after we actually roll the writer, then we will find out the epoch is changed 365 // and give up. 366 // For both #a and #b, we do not need to hold the consumeLock as we will always update the 367 // epochAndState as a whole. 368 // So in general, for all the cases above, we do not need to hold the consumeLock. 369 int epochAndState = this.epochAndState; 370 if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { 371 LOG.warn("Got a sync complete call after the writer is broken, skip"); 372 return; 373 } 374 highestSyncedTxid.set(processedTxid); 375 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 376 FSWALEntry entry = iter.next(); 377 if (entry.getTxid() <= processedTxid) { 378 entry.release(); 379 iter.remove(); 380 } else { 381 break; 382 } 383 } 384 postSync(System.nanoTime() - startTimeNs, finishSync()); 385 if (trySetReadyForRolling()) { 386 // we have just finished a roll, then do not need to check for log rolling, the writer will be 387 // closed soon. 388 return; 389 } 390 // If we haven't already requested a roll, check if we have exceeded logrollsize 391 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 392 if (LOG.isDebugEnabled()) { 393 LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() 394 + ", logrollsize=" + logrollsize); 395 } 396 requestLogRoll(SIZE); 397 } 398 } 399 400 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 401 // sync futures then just use the default one. 402 private boolean isHsync(long beginTxid, long endTxid) { 403 SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), 404 new SyncFuture().reset(endTxid + 1, false)); 405 if (futures.isEmpty()) { 406 return useHsync; 407 } 408 for (SyncFuture future : futures) { 409 if (future.isForceSync()) { 410 return true; 411 } 412 } 413 return false; 414 } 415 416 private void sync(AsyncWriter writer) { 417 fileLengthAtLastSync = writer.getLength(); 418 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 419 boolean shouldUseHsync = 420 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 421 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 422 final long startTimeNs = System.nanoTime(); 423 final long epoch = (long) epochAndState >>> 2L; 424 addListener(writer.sync(shouldUseHsync), (result, error) -> { 425 if (error != null) { 426 syncFailed(epoch, error); 427 } else { 428 syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs); 429 } 430 }, consumeExecutor); 431 } 432 433 private int finishSyncLowerThanTxid(long txid) { 434 int finished = 0; 435 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 436 SyncFuture sync = iter.next(); 437 if (sync.getTxid() <= txid) { 438 markFutureDoneAndOffer(sync, txid, null); 439 iter.remove(); 440 finished++; 441 } else { 442 break; 443 } 444 } 445 return finished; 446 } 447 448 // try advancing the highestSyncedTxid as much as possible 449 private int finishSync() { 450 if (unackedAppends.isEmpty()) { 451 // All outstanding appends have been acked. 452 if (toWriteAppends.isEmpty()) { 453 // Also no appends that wait to be written out, then just finished all pending syncs. 454 long maxSyncTxid = highestSyncedTxid.get(); 455 for (SyncFuture sync : syncFutures) { 456 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 457 sync.done(maxSyncTxid, null); 458 } 459 highestSyncedTxid.set(maxSyncTxid); 460 int finished = syncFutures.size(); 461 syncFutures.clear(); 462 return finished; 463 } else { 464 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 465 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 466 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 467 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 468 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 469 long doneTxid = lowestUnprocessedAppendTxid - 1; 470 highestSyncedTxid.set(doneTxid); 471 return finishSyncLowerThanTxid(doneTxid); 472 } 473 } else { 474 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 475 // first unacked append minus 1. 476 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 477 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 478 highestSyncedTxid.set(doneTxid); 479 return finishSyncLowerThanTxid(doneTxid); 480 } 481 } 482 483 // confirm non-empty before calling 484 private static long getLastTxid(Deque<FSWALEntry> queue) { 485 return queue.peekLast().getTxid(); 486 } 487 488 private void appendAndSync() { 489 final AsyncWriter writer = this.writer; 490 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 491 // finish some. 492 finishSync(); 493 long newHighestProcessedAppendTxid = -1L; 494 // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single 495 // threaded, this could save us some cycles 496 boolean addedToUnackedAppends = false; 497 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 498 FSWALEntry entry = iter.next(); 499 boolean appended; 500 try { 501 appended = appendEntry(writer, entry); 502 } catch (IOException e) { 503 throw new AssertionError("should not happen", e); 504 } 505 newHighestProcessedAppendTxid = entry.getTxid(); 506 iter.remove(); 507 if (appended) { 508 // This is possible, when we fail to sync, we will add the unackedAppends back to 509 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 510 if ( 511 addedToUnackedAppends || unackedAppends.isEmpty() 512 || getLastTxid(unackedAppends) < entry.getTxid() 513 ) { 514 unackedAppends.addLast(entry); 515 addedToUnackedAppends = true; 516 } 517 // See HBASE-25905, here we need to make sure that, we will always write all the entries in 518 // unackedAppends out. As the code in the consume method will assume that, the entries in 519 // unackedAppends have all been sent out so if there is roll request and unackedAppends is 520 // not empty, we could just return as later there will be a syncCompleted call to clear the 521 // unackedAppends, or a syncFailed to lead us to another state. 522 // There could be other ways to fix, such as changing the logic in the consume method, but 523 // it will break the assumption and then (may) lead to a big refactoring. So here let's use 524 // this way to fix first, can optimize later. 525 if ( 526 writer.getLength() - fileLengthAtLastSync >= batchSize 527 && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) 528 ) { 529 break; 530 } 531 } 532 } 533 // if we have a newer transaction id, update it. 534 // otherwise, use the previous transaction id. 535 if (newHighestProcessedAppendTxid > 0) { 536 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 537 } else { 538 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 539 } 540 541 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 542 // sync because buffer size limit. 543 sync(writer); 544 return; 545 } 546 if (writer.getLength() == fileLengthAtLastSync) { 547 // we haven't written anything out, just advance the highestSyncedSequence since we may only 548 // stamped some region sequence id. 549 if (unackedAppends.isEmpty()) { 550 highestSyncedTxid.set(highestProcessedAppendTxid); 551 finishSync(); 552 trySetReadyForRolling(); 553 } 554 return; 555 } 556 // reach here means that we have some unsynced data but haven't reached the batch size yet 557 // but we will not issue a sync directly here even if there are sync requests because we may 558 // have some new data in the ringbuffer, so let's just return here and delay the decision of 559 // whether to issue a sync in the caller method. 560 } 561 562 private void consume() { 563 consumeLock.lock(); 564 try { 565 int currentEpochAndState = epochAndState; 566 if (writerBroken(currentEpochAndState)) { 567 return; 568 } 569 if (waitingRoll(currentEpochAndState)) { 570 if (writer.getLength() > fileLengthAtLastSync) { 571 // issue a sync 572 sync(writer); 573 } else { 574 if (unackedAppends.isEmpty()) { 575 readyForRolling = true; 576 readyForRollingCond.signalAll(); 577 } 578 } 579 return; 580 } 581 } finally { 582 consumeLock.unlock(); 583 } 584 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 585 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 586 <= cursorBound; nextCursor++) { 587 if (!waitingConsumePayloads.isPublished(nextCursor)) { 588 break; 589 } 590 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 591 switch (truck.type()) { 592 case APPEND: 593 toWriteAppends.addLast(truck.unloadAppend()); 594 break; 595 case SYNC: 596 syncFutures.add(truck.unloadSync()); 597 break; 598 default: 599 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 600 break; 601 } 602 waitingConsumePayloadsGatingSequence.set(nextCursor); 603 } 604 appendAndSync(); 605 if (hasConsumerTask.get()) { 606 return; 607 } 608 if (toWriteAppends.isEmpty()) { 609 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 610 consumerScheduled.set(false); 611 // recheck here since in append and sync we do not hold the consumeLock. Thing may 612 // happen like 613 // 1. we check cursor, no new entry 614 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 615 // give up scheduling the consumer task. 616 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 617 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 618 // we will give up consuming so if there are some unsynced data we need to issue a sync. 619 if ( 620 writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() 621 && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync 622 ) { 623 // no new data in the ringbuffer and we have at least one sync request 624 sync(writer); 625 } 626 return; 627 } else { 628 // maybe someone has grabbed this before us 629 if (!consumerScheduled.compareAndSet(false, true)) { 630 return; 631 } 632 } 633 } 634 } 635 // reschedule if we still have something to write. 636 consumeExecutor.execute(consumer); 637 } 638 639 private boolean shouldScheduleConsumer() { 640 int currentEpochAndState = epochAndState; 641 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 642 return false; 643 } 644 return consumerScheduled.compareAndSet(false, true); 645 } 646 647 @Override 648 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 649 throws IOException { 650 long txid = 651 stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); 652 if (shouldScheduleConsumer()) { 653 consumeExecutor.execute(consumer); 654 } 655 return txid; 656 } 657 658 @Override 659 protected void doSync(boolean forceSync) throws IOException { 660 long txid = waitingConsumePayloads.next(); 661 SyncFuture future; 662 try { 663 future = getSyncFuture(txid, forceSync); 664 RingBufferTruck truck = waitingConsumePayloads.get(txid); 665 truck.load(future); 666 } finally { 667 waitingConsumePayloads.publish(txid); 668 } 669 if (shouldScheduleConsumer()) { 670 consumeExecutor.execute(consumer); 671 } 672 blockOnSync(future); 673 } 674 675 @Override 676 protected void doSync(long txid, boolean forceSync) throws IOException { 677 if (highestSyncedTxid.get() >= txid) { 678 return; 679 } 680 // here we do not use ring buffer sequence as txid 681 long sequence = waitingConsumePayloads.next(); 682 SyncFuture future; 683 try { 684 future = getSyncFuture(txid, forceSync); 685 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 686 truck.load(future); 687 } finally { 688 waitingConsumePayloads.publish(sequence); 689 } 690 if (shouldScheduleConsumer()) { 691 consumeExecutor.execute(consumer); 692 } 693 blockOnSync(future); 694 } 695 696 @Override 697 protected AsyncWriter createWriterInstance(Path path) throws IOException { 698 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, 699 eventLoopGroup, channelClass, streamSlowMonitor); 700 } 701 702 private void waitForSafePoint() { 703 consumeLock.lock(); 704 try { 705 int currentEpochAndState = epochAndState; 706 if (writerBroken(currentEpochAndState) || this.writer == null) { 707 return; 708 } 709 consumerScheduled.set(true); 710 epochAndState = currentEpochAndState | 1; 711 readyForRolling = false; 712 consumeExecutor.execute(consumer); 713 while (!readyForRolling) { 714 readyForRollingCond.awaitUninterruptibly(); 715 } 716 } finally { 717 consumeLock.unlock(); 718 } 719 } 720 721 protected final long closeWriter(AsyncWriter writer, Path path) { 722 if (writer != null) { 723 inflightWALClosures.put(path.getName(), writer); 724 long fileLength = writer.getLength(); 725 closeExecutor.execute(() -> { 726 try { 727 writer.close(); 728 } catch (IOException e) { 729 LOG.warn("close old writer failed", e); 730 } finally { 731 inflightWALClosures.remove(path.getName()); 732 } 733 }); 734 return fileLength; 735 } else { 736 return 0L; 737 } 738 } 739 740 @Override 741 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 742 throws IOException { 743 Preconditions.checkNotNull(nextWriter); 744 waitForSafePoint(); 745 long oldFileLen = closeWriter(this.writer, oldPath); 746 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 747 this.writer = nextWriter; 748 if (nextWriter instanceof AsyncProtobufLogWriter) { 749 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 750 } 751 this.fileLengthAtLastSync = nextWriter.getLength(); 752 this.highestProcessedAppendTxidAtLastSync = 0L; 753 consumeLock.lock(); 754 try { 755 consumerScheduled.set(true); 756 int currentEpoch = epochAndState >>> 2; 757 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 758 // set a new epoch and also clear waitingRoll and writerBroken 759 this.epochAndState = nextEpoch << 2; 760 // Reset rollRequested status 761 rollRequested.set(false); 762 consumeExecutor.execute(consumer); 763 } finally { 764 consumeLock.unlock(); 765 } 766 } 767 768 @Override 769 protected void doShutdown() throws IOException { 770 waitForSafePoint(); 771 closeWriter(this.writer, getOldPath()); 772 this.writer = null; 773 closeExecutor.shutdown(); 774 try { 775 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 776 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" 777 + " the close of async writer doesn't complete." 778 + "Please check the status of underlying filesystem" 779 + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS 780 + "\""); 781 } 782 } catch (InterruptedException e) { 783 LOG.error("The wait for close of async writer is interrupted"); 784 Thread.currentThread().interrupt(); 785 } 786 IOException error = new IOException("WAL has been closed"); 787 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 788 // drain all the pending sync requests 789 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 790 <= cursorBound; nextCursor++) { 791 if (!waitingConsumePayloads.isPublished(nextCursor)) { 792 break; 793 } 794 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 795 switch (truck.type()) { 796 case SYNC: 797 syncFutures.add(truck.unloadSync()); 798 break; 799 default: 800 break; 801 } 802 } 803 // and fail them 804 syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); 805 if (!(consumeExecutor instanceof EventLoop)) { 806 consumeExecutor.shutdown(); 807 } 808 } 809 810 @Override 811 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 812 writer.append(entry); 813 } 814 815 @Override 816 DatanodeInfo[] getPipeline() { 817 AsyncFSOutput output = this.fsOut; 818 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 819 } 820 821 @Override 822 int getLogReplication() { 823 return getPipeline().length; 824 } 825 826 @Override 827 protected boolean doCheckLogLowReplication() { 828 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 829 // typically there is no 'low replication' state, only a 'broken' state. 830 AsyncFSOutput output = this.fsOut; 831 return output != null && output.isBroken(); 832 } 833}