001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.lmax.disruptor.RingBuffer; 025import com.lmax.disruptor.Sequence; 026import com.lmax.disruptor.Sequencer; 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.util.ArrayDeque; 030import java.util.Comparator; 031import java.util.Deque; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.SortedSet; 036import java.util.TreeSet; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.LinkedBlockingQueue; 040import java.util.concurrent.ThreadPoolExecutor; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.locks.Condition; 044import java.util.concurrent.locks.Lock; 045import java.util.concurrent.locks.ReentrantLock; 046import java.util.function.Supplier; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileSystem; 049import org.apache.hadoop.fs.Path; 050import org.apache.hadoop.hbase.Abortable; 051import org.apache.hadoop.hbase.HBaseInterfaceAudience; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 055import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 059import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 065import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 066import org.apache.hbase.thirdparty.io.netty.channel.Channel; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 068import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 069import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 070 071/** 072 * An asynchronous implementation of FSWAL. 073 * <p> 074 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 075 * <p> 076 * For append, we process it as follow: 077 * <ol> 078 * <li>In the caller thread(typically, in the rpc handler thread): 079 * <ol> 080 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 081 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 082 * </li> 083 * </ol> 084 * </li> 085 * <li>In the consumer task(executed in a single threaded thread pool) 086 * <ol> 087 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 088 * {@link #toWriteAppends}</li> 089 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 090 * {@link #unackedAppends}</li> 091 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 092 * sync on the AsyncWriter.</li> 093 * <li>In the callback methods: 094 * <ul> 095 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 096 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 097 * wait for writing them again.</li> 098 * </ul> 099 * </li> 100 * </ol> 101 * </li> 102 * </ol> 103 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 104 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 105 * <p> 106 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 107 * FSHLog.<br> 108 * For a normal roll request(for example, we have reached the log roll size): 109 * <ol> 110 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 111 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 112 * {@link #waitForSafePoint()}).</li> 113 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 114 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 115 * </li> 116 * <li>If there are unflush data in the writer, sync them.</li> 117 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 118 * signal the {@link #readyForRollingCond}.</li> 119 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 120 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 121 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 122 * <li>Schedule the consumer task.</li> 123 * <li>Schedule a background task to close the old writer.</li> 124 * </ol> 125 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 126 * point stage. 127 */ 128@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 129public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 130 131 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 132 133 private static final Comparator<SyncFuture> SEQ_COMPARATOR = 134 Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); 135 136 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 137 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 138 139 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 140 "hbase.wal.async.use-shared-event-loop"; 141 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 142 143 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 144 "hbase.wal.async.wait.on.shutdown.seconds"; 145 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 146 147 private final EventLoopGroup eventLoopGroup; 148 149 private final ExecutorService consumeExecutor; 150 151 private final Class<? extends Channel> channelClass; 152 153 private final Lock consumeLock = new ReentrantLock(); 154 155 private final Runnable consumer = this::consume; 156 157 // check if there is already a consumer task in the event loop's task queue 158 private final Supplier<Boolean> hasConsumerTask; 159 160 private static final int MAX_EPOCH = 0x3FFFFFFF; 161 // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old 162 // writer to be closed. 163 // the second lowest bit is writerBroken which means the current writer is broken and rollWriter 164 // is needed. 165 // all other bits are the epoch number of the current writer, this is used to detect whether the 166 // writer is still the one when you issue the sync. 167 // notice that, modification to this field is only allowed under the protection of consumeLock. 168 private volatile int epochAndState; 169 170 private boolean readyForRolling; 171 172 private final Condition readyForRollingCond = consumeLock.newCondition(); 173 174 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 175 176 private final Sequence waitingConsumePayloadsGatingSequence; 177 178 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 179 180 private final long batchSize; 181 182 private final ExecutorService closeExecutor = Executors.newCachedThreadPool( 183 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 184 185 private volatile AsyncFSOutput fsOut; 186 187 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 188 189 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 190 191 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 192 193 // the highest txid of WAL entries being processed 194 private long highestProcessedAppendTxid; 195 196 // file length when we issue last sync request on the writer 197 private long fileLengthAtLastSync; 198 199 private long highestProcessedAppendTxidAtLastSync; 200 201 private final int waitOnShutdownInSeconds; 202 203 private final StreamSlowMonitor streamSlowMonitor; 204 205 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 206 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 207 String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 208 throws FailedLogCloseException, IOException { 209 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, 210 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix")); 211 } 212 213 public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 214 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 215 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 216 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 217 throws FailedLogCloseException, IOException { 218 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 219 suffix); 220 this.eventLoopGroup = eventLoopGroup; 221 this.channelClass = channelClass; 222 this.streamSlowMonitor = monitor; 223 Supplier<Boolean> hasConsumerTask; 224 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 225 this.consumeExecutor = eventLoopGroup.next(); 226 if (consumeExecutor instanceof SingleThreadEventExecutor) { 227 try { 228 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 229 field.setAccessible(true); 230 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 231 hasConsumerTask = () -> queue.peek() == consumer; 232 } catch (Exception e) { 233 LOG.warn("Can not get task queue of " + consumeExecutor 234 + ", this is not necessary, just give up", e); 235 hasConsumerTask = () -> false; 236 } 237 } else { 238 hasConsumerTask = () -> false; 239 } 240 } else { 241 ThreadPoolExecutor threadPool = 242 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 243 new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d-" + rootDir.toString() 244 + "-prefix:" + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true) 245 .build()); 246 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 247 this.consumeExecutor = threadPool; 248 } 249 250 this.hasConsumerTask = hasConsumerTask; 251 int preallocatedEventCount = 252 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 253 waitingConsumePayloads = 254 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 255 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 256 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 257 258 // inrease the ringbuffer sequence so our txid is start from 1 259 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 260 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 261 262 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 263 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 264 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 265 } 266 267 /** 268 * Helper that marks the future as DONE and offers it back to the cache. 269 */ 270 private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { 271 future.done(txid, t); 272 syncFutureCache.offer(future); 273 } 274 275 private static boolean waitingRoll(int epochAndState) { 276 return (epochAndState & 1) != 0; 277 } 278 279 private static boolean writerBroken(int epochAndState) { 280 return ((epochAndState >>> 1) & 1) != 0; 281 } 282 283 private static int epoch(int epochAndState) { 284 return epochAndState >>> 2; 285 } 286 287 // return whether we have successfully set readyForRolling to true. 288 private boolean trySetReadyForRolling() { 289 // Check without holding lock first. Usually we will just return here. 290 // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to 291 // check them outside the consumeLock. 292 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 293 return false; 294 } 295 consumeLock.lock(); 296 try { 297 // 1. a roll is requested 298 // 2. all out-going entries have been acked(we have confirmed above). 299 if (waitingRoll(epochAndState)) { 300 readyForRolling = true; 301 readyForRollingCond.signalAll(); 302 return true; 303 } else { 304 return false; 305 } 306 } finally { 307 consumeLock.unlock(); 308 } 309 } 310 311 private void syncFailed(long epochWhenSync, Throwable error) { 312 LOG.warn("sync failed", error); 313 boolean shouldRequestLogRoll = true; 314 consumeLock.lock(); 315 try { 316 int currentEpochAndState = epochAndState; 317 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 318 // this is not the previous writer which means we have already rolled the writer. 319 // or this is still the current writer, but we have already marked it as broken and request 320 // a roll. 321 return; 322 } 323 this.epochAndState = currentEpochAndState | 0b10; 324 if (waitingRoll(currentEpochAndState)) { 325 readyForRolling = true; 326 readyForRollingCond.signalAll(); 327 // this means we have already in the middle of a rollWriter so just tell the roller thread 328 // that you can continue without requesting an extra log roll. 329 shouldRequestLogRoll = false; 330 } 331 } finally { 332 consumeLock.unlock(); 333 } 334 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 335 toWriteAppends.addFirst(iter.next()); 336 } 337 highestUnsyncedTxid = highestSyncedTxid.get(); 338 if (shouldRequestLogRoll) { 339 // request a roll. 340 requestLogRoll(ERROR); 341 } 342 } 343 344 private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid, 345 long startTimeNs) { 346 // Please see the last several comments on HBASE-22761, it is possible that we get a 347 // syncCompleted which acks a previous sync request after we received a syncFailed on the same 348 // writer. So here we will also check on the epoch and state, if the epoch has already been 349 // changed, i.e, we have already rolled the writer, or the writer is already broken, we should 350 // just skip here, to avoid mess up the state or accidentally release some WAL entries and 351 // cause data corruption. 352 // The syncCompleted call is on the critical write path so we should try our best to make it 353 // fast. So here we do not hold consumeLock, for increasing performance. It is safe because 354 // there are only 3 possible situations: 355 // 1. For normal case, the only place where we change epochAndState is when rolling the writer. 356 // Before rolling actually happen, we will only change the state to waitingRoll which is another 357 // bit than writerBroken, and when we actually change the epoch, we can make sure that there is 358 // no out going sync request. So we will always pass the check here and there is no problem. 359 // 2. The writer is broken, but we have not called syncFailed yet. In this case, since 360 // syncFailed and syncCompleted are executed in the same thread, we will just face the same 361 // situation with #1. 362 // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are 363 // only 2 possible situations: 364 // a. we arrive before we actually roll the writer, then we will find out the writer is broken 365 // and give up. 366 // b. we arrive after we actually roll the writer, then we will find out the epoch is changed 367 // and give up. 368 // For both #a and #b, we do not need to hold the consumeLock as we will always update the 369 // epochAndState as a whole. 370 // So in general, for all the cases above, we do not need to hold the consumeLock. 371 int epochAndState = this.epochAndState; 372 if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { 373 LOG.warn("Got a sync complete call after the writer is broken, skip"); 374 return; 375 } 376 highestSyncedTxid.set(processedTxid); 377 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 378 FSWALEntry entry = iter.next(); 379 if (entry.getTxid() <= processedTxid) { 380 entry.release(); 381 iter.remove(); 382 } else { 383 break; 384 } 385 } 386 postSync(System.nanoTime() - startTimeNs, finishSync()); 387 if (trySetReadyForRolling()) { 388 // we have just finished a roll, then do not need to check for log rolling, the writer will be 389 // closed soon. 390 return; 391 } 392 // If we haven't already requested a roll, check if we have exceeded logrollsize 393 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 394 if (LOG.isDebugEnabled()) { 395 LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() 396 + ", logrollsize=" + logrollsize); 397 } 398 requestLogRoll(SIZE); 399 } 400 } 401 402 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 403 // sync futures then just use the default one. 404 private boolean isHsync(long beginTxid, long endTxid) { 405 SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), 406 new SyncFuture().reset(endTxid + 1, false)); 407 if (futures.isEmpty()) { 408 return useHsync; 409 } 410 for (SyncFuture future : futures) { 411 if (future.isForceSync()) { 412 return true; 413 } 414 } 415 return false; 416 } 417 418 private void sync(AsyncWriter writer) { 419 fileLengthAtLastSync = writer.getLength(); 420 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 421 boolean shouldUseHsync = 422 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 423 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 424 final long startTimeNs = System.nanoTime(); 425 final long epoch = (long) epochAndState >>> 2L; 426 addListener(writer.sync(shouldUseHsync), (result, error) -> { 427 if (error != null) { 428 syncFailed(epoch, error); 429 } else { 430 syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs); 431 } 432 }, consumeExecutor); 433 } 434 435 private int finishSyncLowerThanTxid(long txid) { 436 int finished = 0; 437 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 438 SyncFuture sync = iter.next(); 439 if (sync.getTxid() <= txid) { 440 markFutureDoneAndOffer(sync, txid, null); 441 iter.remove(); 442 finished++; 443 } else { 444 break; 445 } 446 } 447 return finished; 448 } 449 450 // try advancing the highestSyncedTxid as much as possible 451 private int finishSync() { 452 if (unackedAppends.isEmpty()) { 453 // All outstanding appends have been acked. 454 if (toWriteAppends.isEmpty()) { 455 // Also no appends that wait to be written out, then just finished all pending syncs. 456 long maxSyncTxid = highestSyncedTxid.get(); 457 for (SyncFuture sync : syncFutures) { 458 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 459 markFutureDoneAndOffer(sync, maxSyncTxid, null); 460 } 461 highestSyncedTxid.set(maxSyncTxid); 462 int finished = syncFutures.size(); 463 syncFutures.clear(); 464 return finished; 465 } else { 466 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 467 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 468 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 469 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 470 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 471 long doneTxid = lowestUnprocessedAppendTxid - 1; 472 highestSyncedTxid.set(doneTxid); 473 return finishSyncLowerThanTxid(doneTxid); 474 } 475 } else { 476 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 477 // first unacked append minus 1. 478 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 479 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 480 highestSyncedTxid.set(doneTxid); 481 return finishSyncLowerThanTxid(doneTxid); 482 } 483 } 484 485 // confirm non-empty before calling 486 private static long getLastTxid(Deque<FSWALEntry> queue) { 487 return queue.peekLast().getTxid(); 488 } 489 490 private void appendAndSync() { 491 final AsyncWriter writer = this.writer; 492 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 493 // finish some. 494 finishSync(); 495 long newHighestProcessedAppendTxid = -1L; 496 // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single 497 // threaded, this could save us some cycles 498 boolean addedToUnackedAppends = false; 499 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 500 FSWALEntry entry = iter.next(); 501 boolean appended; 502 try { 503 appended = appendEntry(writer, entry); 504 } catch (IOException e) { 505 throw new AssertionError("should not happen", e); 506 } 507 newHighestProcessedAppendTxid = entry.getTxid(); 508 iter.remove(); 509 if (appended) { 510 // This is possible, when we fail to sync, we will add the unackedAppends back to 511 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 512 if ( 513 addedToUnackedAppends || unackedAppends.isEmpty() 514 || getLastTxid(unackedAppends) < entry.getTxid() 515 ) { 516 unackedAppends.addLast(entry); 517 addedToUnackedAppends = true; 518 } 519 // See HBASE-25905, here we need to make sure that, we will always write all the entries in 520 // unackedAppends out. As the code in the consume method will assume that, the entries in 521 // unackedAppends have all been sent out so if there is roll request and unackedAppends is 522 // not empty, we could just return as later there will be a syncCompleted call to clear the 523 // unackedAppends, or a syncFailed to lead us to another state. 524 // There could be other ways to fix, such as changing the logic in the consume method, but 525 // it will break the assumption and then (may) lead to a big refactoring. So here let's use 526 // this way to fix first, can optimize later. 527 if ( 528 writer.getLength() - fileLengthAtLastSync >= batchSize 529 && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) 530 ) { 531 break; 532 } 533 } 534 } 535 // if we have a newer transaction id, update it. 536 // otherwise, use the previous transaction id. 537 if (newHighestProcessedAppendTxid > 0) { 538 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 539 } else { 540 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 541 } 542 543 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 544 // sync because buffer size limit. 545 sync(writer); 546 return; 547 } 548 if (writer.getLength() == fileLengthAtLastSync) { 549 // we haven't written anything out, just advance the highestSyncedSequence since we may only 550 // stamped some region sequence id. 551 if (unackedAppends.isEmpty()) { 552 highestSyncedTxid.set(highestProcessedAppendTxid); 553 finishSync(); 554 trySetReadyForRolling(); 555 } 556 return; 557 } 558 // reach here means that we have some unsynced data but haven't reached the batch size yet 559 // but we will not issue a sync directly here even if there are sync requests because we may 560 // have some new data in the ringbuffer, so let's just return here and delay the decision of 561 // whether to issue a sync in the caller method. 562 } 563 564 private void drainNonMarkerEditsAndFailSyncs() { 565 if (toWriteAppends.isEmpty()) { 566 return; 567 } 568 boolean hasNonMarkerEdits = false; 569 Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator(); 570 while (iter.hasNext()) { 571 FSWALEntry entry = iter.next(); 572 if (!entry.getEdit().isMetaEdit()) { 573 entry.release(); 574 hasNonMarkerEdits = true; 575 break; 576 } 577 } 578 if (hasNonMarkerEdits) { 579 for (;;) { 580 iter.remove(); 581 if (!iter.hasNext()) { 582 break; 583 } 584 iter.next().release(); 585 } 586 for (FSWALEntry entry : unackedAppends) { 587 entry.release(); 588 } 589 unackedAppends.clear(); 590 // fail the sync futures which are under the txid of the first remaining edit, if none, fail 591 // all the sync futures. 592 long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); 593 IOException error = new IOException("WAL is closing, only marker edit is allowed"); 594 for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) { 595 SyncFuture future = syncIter.next(); 596 if (future.getTxid() < txid) { 597 markFutureDoneAndOffer(future, future.getTxid(), error); 598 syncIter.remove(); 599 } else { 600 break; 601 } 602 } 603 } 604 } 605 606 private void consume() { 607 consumeLock.lock(); 608 try { 609 int currentEpochAndState = epochAndState; 610 if (writerBroken(currentEpochAndState)) { 611 return; 612 } 613 if (waitingRoll(currentEpochAndState)) { 614 if (writer.getLength() > fileLengthAtLastSync) { 615 // issue a sync 616 sync(writer); 617 } else { 618 if (unackedAppends.isEmpty()) { 619 readyForRolling = true; 620 readyForRollingCond.signalAll(); 621 } 622 } 623 return; 624 } 625 } finally { 626 consumeLock.unlock(); 627 } 628 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 629 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 630 <= cursorBound; nextCursor++) { 631 if (!waitingConsumePayloads.isPublished(nextCursor)) { 632 break; 633 } 634 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 635 switch (truck.type()) { 636 case APPEND: 637 toWriteAppends.addLast(truck.unloadAppend()); 638 break; 639 case SYNC: 640 syncFutures.add(truck.unloadSync()); 641 break; 642 default: 643 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 644 break; 645 } 646 waitingConsumePayloadsGatingSequence.set(nextCursor); 647 } 648 if (markerEditOnly()) { 649 drainNonMarkerEditsAndFailSyncs(); 650 } 651 appendAndSync(); 652 if (hasConsumerTask.get()) { 653 return; 654 } 655 if (toWriteAppends.isEmpty()) { 656 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 657 consumerScheduled.set(false); 658 // recheck here since in append and sync we do not hold the consumeLock. Thing may 659 // happen like 660 // 1. we check cursor, no new entry 661 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 662 // give up scheduling the consumer task. 663 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 664 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 665 // we will give up consuming so if there are some unsynced data we need to issue a sync. 666 if ( 667 writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() 668 && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync 669 ) { 670 // no new data in the ringbuffer and we have at least one sync request 671 sync(writer); 672 } 673 return; 674 } else { 675 // maybe someone has grabbed this before us 676 if (!consumerScheduled.compareAndSet(false, true)) { 677 return; 678 } 679 } 680 } 681 } 682 // reschedule if we still have something to write. 683 consumeExecutor.execute(consumer); 684 } 685 686 private boolean shouldScheduleConsumer() { 687 int currentEpochAndState = epochAndState; 688 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 689 return false; 690 } 691 return consumerScheduled.compareAndSet(false, true); 692 } 693 694 // This is used by sync replication, where we are going to close the wal soon after we reopen all 695 // the regions. Will be overridden by sub classes. 696 protected boolean markerEditOnly() { 697 return false; 698 } 699 700 @Override 701 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 702 throws IOException { 703 if (markerEditOnly() && !edits.isMetaEdit()) { 704 throw new IOException("WAL is closing, only marker edit is allowed"); 705 } 706 long txid = 707 stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); 708 if (shouldScheduleConsumer()) { 709 consumeExecutor.execute(consumer); 710 } 711 return txid; 712 } 713 714 @Override 715 protected void doSync(boolean forceSync) throws IOException { 716 long txid = waitingConsumePayloads.next(); 717 SyncFuture future; 718 try { 719 future = getSyncFuture(txid, forceSync); 720 RingBufferTruck truck = waitingConsumePayloads.get(txid); 721 truck.load(future); 722 } finally { 723 waitingConsumePayloads.publish(txid); 724 } 725 if (shouldScheduleConsumer()) { 726 consumeExecutor.execute(consumer); 727 } 728 blockOnSync(future); 729 } 730 731 @Override 732 protected void doSync(long txid, boolean forceSync) throws IOException { 733 if (highestSyncedTxid.get() >= txid) { 734 return; 735 } 736 // here we do not use ring buffer sequence as txid 737 long sequence = waitingConsumePayloads.next(); 738 SyncFuture future; 739 try { 740 future = getSyncFuture(txid, forceSync); 741 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 742 truck.load(future); 743 } finally { 744 waitingConsumePayloads.publish(sequence); 745 } 746 if (shouldScheduleConsumer()) { 747 consumeExecutor.execute(consumer); 748 } 749 blockOnSync(future); 750 } 751 752 protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { 753 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, 754 eventLoopGroup, channelClass, streamSlowMonitor); 755 } 756 757 @Override 758 protected AsyncWriter createWriterInstance(Path path) throws IOException { 759 return createAsyncWriter(fs, path); 760 } 761 762 private void waitForSafePoint() { 763 consumeLock.lock(); 764 try { 765 int currentEpochAndState = epochAndState; 766 if (writerBroken(currentEpochAndState) || this.writer == null) { 767 return; 768 } 769 consumerScheduled.set(true); 770 epochAndState = currentEpochAndState | 1; 771 readyForRolling = false; 772 consumeExecutor.execute(consumer); 773 while (!readyForRolling) { 774 readyForRollingCond.awaitUninterruptibly(); 775 } 776 } finally { 777 consumeLock.unlock(); 778 } 779 } 780 781 protected final long closeWriter(AsyncWriter writer, Path path) { 782 if (writer != null) { 783 inflightWALClosures.put(path.getName(), writer); 784 long fileLength = writer.getLength(); 785 closeExecutor.execute(() -> { 786 try { 787 writer.close(); 788 } catch (IOException e) { 789 LOG.warn("close old writer failed", e); 790 } finally { 791 inflightWALClosures.remove(path.getName()); 792 } 793 }); 794 return fileLength; 795 } else { 796 return 0L; 797 } 798 } 799 800 @Override 801 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 802 throws IOException { 803 Preconditions.checkNotNull(nextWriter); 804 waitForSafePoint(); 805 long oldFileLen = closeWriter(this.writer, oldPath); 806 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 807 this.writer = nextWriter; 808 if (nextWriter instanceof AsyncProtobufLogWriter) { 809 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 810 } 811 this.fileLengthAtLastSync = nextWriter.getLength(); 812 this.highestProcessedAppendTxidAtLastSync = 0L; 813 consumeLock.lock(); 814 try { 815 consumerScheduled.set(true); 816 int currentEpoch = epochAndState >>> 2; 817 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 818 // set a new epoch and also clear waitingRoll and writerBroken 819 this.epochAndState = nextEpoch << 2; 820 // Reset rollRequested status 821 rollRequested.set(false); 822 consumeExecutor.execute(consumer); 823 } finally { 824 consumeLock.unlock(); 825 } 826 } 827 828 @Override 829 protected void doShutdown() throws IOException { 830 waitForSafePoint(); 831 closeWriter(this.writer, getOldPath()); 832 this.writer = null; 833 closeExecutor.shutdown(); 834 try { 835 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 836 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" 837 + " the close of async writer doesn't complete." 838 + "Please check the status of underlying filesystem" 839 + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS 840 + "\""); 841 } 842 } catch (InterruptedException e) { 843 LOG.error("The wait for close of async writer is interrupted"); 844 Thread.currentThread().interrupt(); 845 } 846 IOException error = new IOException("WAL has been closed"); 847 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 848 // drain all the pending sync requests 849 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 850 <= cursorBound; nextCursor++) { 851 if (!waitingConsumePayloads.isPublished(nextCursor)) { 852 break; 853 } 854 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 855 switch (truck.type()) { 856 case SYNC: 857 syncFutures.add(truck.unloadSync()); 858 break; 859 default: 860 break; 861 } 862 } 863 // and fail them 864 syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); 865 if (!(consumeExecutor instanceof EventLoop)) { 866 consumeExecutor.shutdown(); 867 } 868 } 869 870 @Override 871 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 872 writer.append(entry); 873 } 874 875 @Override 876 DatanodeInfo[] getPipeline() { 877 AsyncFSOutput output = this.fsOut; 878 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 879 } 880 881 @Override 882 int getLogReplication() { 883 return getPipeline().length; 884 } 885 886 @Override 887 protected boolean doCheckLogLowReplication() { 888 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 889 // typically there is no 'low replication' state, only a 'broken' state. 890 AsyncFSOutput output = this.fsOut; 891 return output != null && output.isBroken(); 892 } 893}