001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 023import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 024import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 027import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 028import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 029 030import com.google.errorprone.annotations.RestrictedApi; 031import com.lmax.disruptor.RingBuffer; 032import com.lmax.disruptor.Sequence; 033import com.lmax.disruptor.Sequencer; 034import io.opentelemetry.api.trace.Span; 035import java.io.FileNotFoundException; 036import java.io.IOException; 037import java.io.InterruptedIOException; 038import java.lang.management.MemoryType; 039import java.net.URLEncoder; 040import java.nio.charset.StandardCharsets; 041import java.util.ArrayDeque; 042import java.util.ArrayList; 043import java.util.Arrays; 044import java.util.Comparator; 045import java.util.Deque; 046import java.util.Iterator; 047import java.util.List; 048import java.util.Map; 049import java.util.OptionalLong; 050import java.util.Set; 051import java.util.SortedSet; 052import java.util.TreeSet; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CompletableFuture; 055import java.util.concurrent.ConcurrentHashMap; 056import java.util.concurrent.ConcurrentNavigableMap; 057import java.util.concurrent.ConcurrentSkipListMap; 058import java.util.concurrent.CopyOnWriteArrayList; 059import java.util.concurrent.ExecutionException; 060import java.util.concurrent.ExecutorService; 061import java.util.concurrent.Executors; 062import java.util.concurrent.Future; 063import java.util.concurrent.LinkedBlockingQueue; 064import java.util.concurrent.ThreadPoolExecutor; 065import java.util.concurrent.TimeUnit; 066import java.util.concurrent.TimeoutException; 067import java.util.concurrent.atomic.AtomicBoolean; 068import java.util.concurrent.atomic.AtomicInteger; 069import java.util.concurrent.atomic.AtomicLong; 070import java.util.concurrent.locks.Condition; 071import java.util.concurrent.locks.Lock; 072import java.util.concurrent.locks.ReentrantLock; 073import java.util.function.Supplier; 074import java.util.stream.Collectors; 075import org.apache.commons.lang3.mutable.MutableLong; 076import org.apache.hadoop.conf.Configuration; 077import org.apache.hadoop.fs.FileStatus; 078import org.apache.hadoop.fs.FileSystem; 079import org.apache.hadoop.fs.Path; 080import org.apache.hadoop.fs.PathFilter; 081import org.apache.hadoop.hbase.Abortable; 082import org.apache.hadoop.hbase.Cell; 083import org.apache.hadoop.hbase.HBaseConfiguration; 084import org.apache.hadoop.hbase.HConstants; 085import org.apache.hadoop.hbase.PrivateCellUtil; 086import org.apache.hadoop.hbase.client.ConnectionUtils; 087import org.apache.hadoop.hbase.client.RegionInfo; 088import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 089import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; 090import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 091import org.apache.hadoop.hbase.ipc.RpcServer; 092import org.apache.hadoop.hbase.ipc.ServerCall; 093import org.apache.hadoop.hbase.log.HBaseMarkers; 094import org.apache.hadoop.hbase.regionserver.HRegion; 095import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 096import org.apache.hadoop.hbase.trace.TraceUtil; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.CommonFSUtils; 099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 100import org.apache.hadoop.hbase.util.Pair; 101import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 102import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 103import org.apache.hadoop.hbase.wal.WAL; 104import org.apache.hadoop.hbase.wal.WALEdit; 105import org.apache.hadoop.hbase.wal.WALFactory; 106import org.apache.hadoop.hbase.wal.WALKeyImpl; 107import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 108import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 109import org.apache.hadoop.hbase.wal.WALSplitter; 110import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 111import org.apache.hadoop.util.StringUtils; 112import org.apache.yetus.audience.InterfaceAudience; 113import org.slf4j.Logger; 114import org.slf4j.LoggerFactory; 115 116import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 117import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 118import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 119 120/** 121 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 122 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 123 * This is done internal to the implementation. 124 * <p> 125 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 126 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 127 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 128 * flushed out to hfiles, and what is yet in WAL and in memory only. 129 * <p> 130 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 131 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 132 * (smaller) than the most-recent flush. 133 * <p> 134 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read, 135 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for 136 * replication where we may want to tail the active WAL file. 137 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 138 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 139 * we have made successful appends to the WAL and we then are unable to sync them, our current 140 * semantic is to return error to the client that the appends failed but also to abort the current 141 * context, usually the hosting server. We need to replay the WALs. <br> 142 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 143 * that the append failed. <br> 144 * TODO: replication may pick up these last edits though they have been marked as failed append 145 * (Need to keep our own file lengths, not rely on HDFS). 146 */ 147@InterfaceAudience.Private 148public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 149 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 150 151 private static final Comparator<SyncFuture> SEQ_COMPARATOR = 152 Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); 153 154 private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec"; 155 private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900; 156 /** Don't log blocking regions more frequently than this. */ 157 private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); 158 159 protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms"; 160 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 161 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 162 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 163 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 164 "hbase.regionserver.wal.slowsync.roll.threshold"; 165 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 166 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 167 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 168 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 169 170 public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 171 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 172 173 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 174 175 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 176 177 public static final String RING_BUFFER_SLOT_COUNT = 178 "hbase.regionserver.wal.disruptor.event.count"; 179 180 public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; 181 public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; 182 183 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 184 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 185 186 public static final String WAL_AVOID_LOCAL_WRITES_KEY = 187 "hbase.regionserver.wal.avoid-local-writes"; 188 public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; 189 190 /** 191 * file system instance 192 */ 193 protected final FileSystem fs; 194 195 /** 196 * WAL directory, where all WAL files would be placed. 197 */ 198 protected final Path walDir; 199 200 private final FileSystem remoteFs; 201 202 private final Path remoteWALDir; 203 204 /** 205 * dir path where old logs are kept. 206 */ 207 protected final Path walArchiveDir; 208 209 /** 210 * Matches just those wal files that belong to this wal instance. 211 */ 212 protected final PathFilter ourFiles; 213 214 /** 215 * Prefix of a WAL file, usually the region server name it is hosted on. 216 */ 217 protected final String walFilePrefix; 218 219 /** 220 * Suffix included on generated wal file names 221 */ 222 protected final String walFileSuffix; 223 224 /** 225 * Prefix used when checking for wal membership. 226 */ 227 protected final String prefixPathStr; 228 229 protected final WALCoprocessorHost coprocessorHost; 230 231 /** 232 * conf object 233 */ 234 protected final Configuration conf; 235 236 protected final Abortable abortable; 237 238 /** Listeners that are called on WAL events. */ 239 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 240 241 /** Tracks the logs in the process of being closed. */ 242 protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>(); 243 244 /** 245 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 246 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 247 * facility for answering questions such as "Is it safe to GC a WAL?". 248 */ 249 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 250 251 /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */ 252 protected final long slowSyncNs, rollOnSyncNs; 253 protected final int slowSyncRollThreshold; 254 protected final int slowSyncCheckInterval; 255 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 256 257 private final long walSyncTimeoutNs; 258 259 private final long walTooOldNs; 260 261 // If > than this size, roll the log. 262 protected final long logrollsize; 263 264 /** 265 * Block size to use writing files. 266 */ 267 protected final long blocksize; 268 269 /* 270 * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If 271 * too many and we crash, then will take forever replaying. Keep the number of logs tidy. 272 */ 273 protected final int maxLogs; 274 275 protected final boolean useHsync; 276 277 /** 278 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 279 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 280 * warning when it thinks synchronized controls writer thread safety. It is held when we are 281 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 282 * not. 283 */ 284 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 285 286 // The timestamp (in ms) when the log file was created. 287 protected final AtomicLong filenum = new AtomicLong(-1); 288 289 // Number of transactions in the current Wal. 290 protected final AtomicInteger numEntries = new AtomicInteger(0); 291 292 /** 293 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 294 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 295 * corresponding entry in queue. 296 */ 297 protected volatile long highestUnsyncedTxid = -1; 298 299 /** 300 * Updated to the transaction id of the last successful sync call. This can be less than 301 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 302 * for it. 303 */ 304 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 305 306 /** 307 * The total size of wal 308 */ 309 protected final AtomicLong totalLogSize = new AtomicLong(0); 310 /** 311 * Current log file. 312 */ 313 volatile W writer; 314 315 // Last time to check low replication on hlog's pipeline 316 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 317 318 // Last time we asked to roll the log due to a slow sync 319 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 320 321 protected volatile boolean closed = false; 322 323 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 324 325 protected final long walShutdownTimeout; 326 327 private long nextLogTooOldNs = System.nanoTime(); 328 329 /** 330 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 331 * an IllegalArgumentException if used to compare paths from different wals. 332 */ 333 final Comparator<Path> LOG_NAME_COMPARATOR = 334 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 335 336 private static final class WALProps { 337 338 /** 339 * Map the encoded region name to the highest sequence id. 340 * <p/> 341 * Contains all the regions it has an entry for. 342 */ 343 private final Map<byte[], Long> encodedName2HighestSequenceId; 344 345 /** 346 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 347 * subclasses. 348 */ 349 private final long logSize; 350 351 /** 352 * The nanoTime of the log rolling, used to determine the time interval that has passed since. 353 */ 354 private final long rollTimeNs; 355 356 /** 357 * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the 358 * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file, 359 * for safety. 360 */ 361 private volatile boolean closed = false; 362 363 WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 364 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 365 this.logSize = logSize; 366 this.rollTimeNs = System.nanoTime(); 367 } 368 } 369 370 /** 371 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 372 * (contained in the log file name). 373 */ 374 protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props = 375 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 376 377 /** 378 * A cache of sync futures reused by threads. 379 */ 380 protected final SyncFutureCache syncFutureCache; 381 382 /** 383 * The class name of the runtime implementation, used as prefix for logging/tracing. 384 * <p> 385 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 386 * refer to HBASE-17676 for more details 387 * </p> 388 */ 389 protected final String implClassName; 390 391 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 392 393 protected final ExecutorService closeExecutor = Executors.newCachedThreadPool( 394 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 395 396 // Run in caller if we get reject execution exception, to avoid aborting region server when we get 397 // reject execution exception. Usually this should not happen but let's make it more robust. 398 private final ExecutorService logArchiveExecutor = 399 new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), 400 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), 401 new ThreadPoolExecutor.CallerRunsPolicy()); 402 403 private final int archiveRetries; 404 405 protected ExecutorService consumeExecutor; 406 407 private final Lock consumeLock = new ReentrantLock(); 408 409 protected final Runnable consumer = this::consume; 410 411 // check if there is already a consumer task in the event loop's task queue 412 protected Supplier<Boolean> hasConsumerTask; 413 414 private static final int MAX_EPOCH = 0x3FFFFFFF; 415 // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old 416 // writer to be closed. 417 // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter 418 // is needed. 419 // all other bits are the epoch number of the current writer, this is used to detect whether the 420 // writer is still the one when you issue the sync. 421 // notice that, modification to this field is only allowed under the protection of consumeLock. 422 private volatile int epochAndState; 423 424 private boolean readyForRolling; 425 426 private final Condition readyForRollingCond = consumeLock.newCondition(); 427 428 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 429 430 private final Sequence waitingConsumePayloadsGatingSequence; 431 432 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 433 434 private final long batchSize; 435 436 protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 437 438 protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 439 440 protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 441 442 // the highest txid of WAL entries being processed 443 protected long highestProcessedAppendTxid; 444 445 // file length when we issue last sync request on the writer 446 private long fileLengthAtLastSync; 447 448 private long highestProcessedAppendTxidAtLastSync; 449 450 private int waitOnShutdownInSeconds; 451 452 private String waitOnShutdownInSecondsConfigKey; 453 454 protected boolean shouldShutDownConsumeExecutorWhenClose = true; 455 456 private volatile boolean skipRemoteWAL = false; 457 458 private volatile boolean markerEditOnly = false; 459 460 public long getFilenum() { 461 return this.filenum.get(); 462 } 463 464 /** 465 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 466 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 467 * the filename is created with the {@link #computeFilename(long filenum)} method. 468 * @return timestamp, as in the log file name. 469 */ 470 protected long getFileNumFromFileName(Path fileName) { 471 checkNotNull(fileName, "file name can't be null"); 472 if (!ourFiles.accept(fileName)) { 473 throw new IllegalArgumentException( 474 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 475 } 476 final String fileNameString = fileName.toString(); 477 String chompedPath = fileNameString.substring(prefixPathStr.length(), 478 (fileNameString.length() - walFileSuffix.length())); 479 return Long.parseLong(chompedPath); 480 } 481 482 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 483 checkArgument(logRollSize > 0, 484 "The log roll size cannot be zero or negative when calculating max log files, " 485 + "current value is " + logRollSize); 486 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 487 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 488 } 489 490 // must be power of 2 491 protected final int getPreallocatedEventCount() { 492 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 493 // be stuck and make no progress if the buffer is filled with appends only and there is no 494 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 495 // before they return. 496 int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 497 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 498 int floor = Integer.highestOneBit(preallocatedEventCount); 499 if (floor == preallocatedEventCount) { 500 return floor; 501 } 502 // max capacity is 1 << 30 503 if (floor >= 1 << 29) { 504 return 1 << 30; 505 } 506 return floor << 1; 507 } 508 509 protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds, 510 String waitOnShutdownInSecondsConfigKey) { 511 this.waitOnShutdownInSeconds = waitOnShutdownInSeconds; 512 this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey; 513 } 514 515 protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir, 516 final String prefix) { 517 ThreadPoolExecutor threadPool = 518 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 519 new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:" 520 + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build()); 521 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 522 consumeExecutor = threadPool; 523 this.shouldShutDownConsumeExecutorWhenClose = true; 524 } 525 526 protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, 527 final String logDir, final String archiveDir, final Configuration conf, 528 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 529 final String suffix, FileSystem remoteFs, Path remoteWALDir) 530 throws FailedLogCloseException, IOException { 531 this.fs = fs; 532 this.walDir = new Path(rootDir, logDir); 533 this.walArchiveDir = new Path(rootDir, archiveDir); 534 this.conf = conf; 535 this.abortable = abortable; 536 this.remoteFs = remoteFs; 537 this.remoteWALDir = remoteWALDir; 538 539 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 540 throw new IOException("Unable to mkdir " + walDir); 541 } 542 543 if (!fs.exists(this.walArchiveDir)) { 544 if (!fs.mkdirs(this.walArchiveDir)) { 545 throw new IOException("Unable to mkdir " + this.walArchiveDir); 546 } 547 } 548 549 // If prefix is null||empty then just name it wal 550 this.walFilePrefix = prefix == null || prefix.isEmpty() 551 ? "wal" 552 : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name()); 553 // we only correctly differentiate suffices when numeric ones start with '.' 554 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 555 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER 556 + "' but instead was '" + suffix + "'"); 557 } 558 // Now that it exists, set the storage policy for the entire directory of wal files related to 559 // this FSHLog instance 560 String storagePolicy = 561 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 562 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 563 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 564 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 565 566 this.ourFiles = new PathFilter() { 567 @Override 568 public boolean accept(final Path fileName) { 569 // The path should start with dir/<prefix> and end with our suffix 570 final String fileNameString = fileName.toString(); 571 if (!fileNameString.startsWith(prefixPathStr)) { 572 return false; 573 } 574 if (walFileSuffix.isEmpty()) { 575 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 576 return org.apache.commons.lang3.StringUtils 577 .isNumeric(fileNameString.substring(prefixPathStr.length())); 578 } else if (!fileNameString.endsWith(walFileSuffix)) { 579 return false; 580 } 581 return true; 582 } 583 }; 584 585 if (failIfWALExists) { 586 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 587 if (null != walFiles && 0 != walFiles.length) { 588 throw new IOException("Target WAL already exists within directory " + walDir); 589 } 590 } 591 592 // Register listeners. TODO: Should this exist anymore? We have CPs? 593 if (listeners != null) { 594 for (WALActionsListener i : listeners) { 595 registerWALActionsListener(i); 596 } 597 } 598 this.coprocessorHost = new WALCoprocessorHost(this, conf); 599 600 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 601 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 602 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 603 // the block size but experience from the field has it that this was not enough time for the 604 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 605 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 606 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 607 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 608 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 609 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 610 this.logrollsize = (long) (this.blocksize * multiplier); 611 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 612 613 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" 614 + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" 615 + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir 616 + ", maxLogs=" + this.maxLogs); 617 this.slowSyncNs = 618 TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS)); 619 this.rollOnSyncNs = TimeUnit.MILLISECONDS 620 .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); 621 this.slowSyncRollThreshold = 622 conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 623 this.slowSyncCheckInterval = 624 conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 625 this.walSyncTimeoutNs = 626 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); 627 this.syncFutureCache = new SyncFutureCache(conf); 628 this.implClassName = getClass().getSimpleName(); 629 this.walTooOldNs = TimeUnit.SECONDS 630 .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); 631 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 632 archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); 633 this.walShutdownTimeout = 634 conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); 635 636 int preallocatedEventCount = 637 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 638 waitingConsumePayloads = 639 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 640 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 641 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 642 643 // inrease the ringbuffer sequence so our txid is start from 1 644 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 645 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 646 647 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 648 } 649 650 /** 651 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 652 */ 653 @Override 654 public void init() throws IOException { 655 rollWriter(); 656 } 657 658 @Override 659 public void registerWALActionsListener(WALActionsListener listener) { 660 this.listeners.add(listener); 661 } 662 663 @Override 664 public boolean unregisterWALActionsListener(WALActionsListener listener) { 665 return this.listeners.remove(listener); 666 } 667 668 @Override 669 public WALCoprocessorHost getCoprocessorHost() { 670 return coprocessorHost; 671 } 672 673 @Override 674 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 675 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 676 } 677 678 @Override 679 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 680 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 681 } 682 683 @Override 684 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 685 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 686 } 687 688 @Override 689 public void abortCacheFlush(byte[] encodedRegionName) { 690 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 691 } 692 693 @Override 694 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 695 // This method is used by tests and for figuring if we should flush or not because our 696 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 697 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 698 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 699 // currently flushing sequence ids, and if anything found there, it is returning these. This is 700 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 701 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 702 // id is old even though we are currently flushing. This may mean we do too much flushing. 703 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 704 } 705 706 @Override 707 public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { 708 return rollWriter(false); 709 } 710 711 @Override 712 public final void sync() throws IOException { 713 sync(useHsync); 714 } 715 716 @Override 717 public final void sync(long txid) throws IOException { 718 sync(txid, useHsync); 719 } 720 721 @Override 722 public final void sync(boolean forceSync) throws IOException { 723 TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync")); 724 } 725 726 @Override 727 public final void sync(long txid, boolean forceSync) throws IOException { 728 TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); 729 } 730 731 @RestrictedApi(explanation = "Should only be called in tests", link = "", 732 allowedOnPath = ".*/src/test/.*") 733 public SequenceIdAccounting getSequenceIdAccounting() { 734 return sequenceIdAccounting; 735 } 736 737 /** 738 * This is a convenience method that computes a new filename with a given file-number. 739 * @param filenum to use 740 */ 741 protected Path computeFilename(final long filenum) { 742 if (filenum < 0) { 743 throw new RuntimeException("WAL file number can't be < 0"); 744 } 745 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 746 return new Path(walDir, child); 747 } 748 749 /** 750 * This is a convenience method that computes a new filename with a given using the current WAL 751 * file-number 752 */ 753 public Path getCurrentFileName() { 754 return computeFilename(this.filenum.get()); 755 } 756 757 /** 758 * retrieve the next path to use for writing. Increments the internal filenum. 759 */ 760 private Path getNewPath() throws IOException { 761 this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime())); 762 Path newPath = getCurrentFileName(); 763 return newPath; 764 } 765 766 public Path getOldPath() { 767 long currentFilenum = this.filenum.get(); 768 Path oldPath = null; 769 if (currentFilenum > 0) { 770 // ComputeFilename will take care of meta wal filename 771 oldPath = computeFilename(currentFilenum); 772 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 773 return oldPath; 774 } 775 776 /** 777 * Tell listeners about pre log roll. 778 */ 779 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 780 throws IOException { 781 coprocessorHost.preWALRoll(oldPath, newPath); 782 783 if (!this.listeners.isEmpty()) { 784 for (WALActionsListener i : this.listeners) { 785 i.preLogRoll(oldPath, newPath); 786 } 787 } 788 } 789 790 /** 791 * Tell listeners about post log roll. 792 */ 793 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 794 throws IOException { 795 if (!this.listeners.isEmpty()) { 796 for (WALActionsListener i : this.listeners) { 797 i.postLogRoll(oldPath, newPath); 798 } 799 } 800 801 coprocessorHost.postWALRoll(oldPath, newPath); 802 } 803 804 // public only until class moves to o.a.h.h.wal 805 /** Returns the number of rolled log files */ 806 public int getNumRolledLogFiles() { 807 return walFile2Props.size(); 808 } 809 810 // public only until class moves to o.a.h.h.wal 811 /** Returns the number of log files in use */ 812 public int getNumLogFiles() { 813 // +1 for current use log 814 return getNumRolledLogFiles() + 1; 815 } 816 817 /** 818 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the 819 * first (oldest) WAL, and return those regions which should be flushed so that it can be 820 * let-go/'archived'. 821 * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file 822 */ 823 Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { 824 Map<byte[], List<byte[]>> regions = null; 825 int logCount = getNumRolledLogFiles(); 826 if (logCount > this.maxLogs && logCount > 0) { 827 Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry(); 828 regions = 829 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 830 } 831 if (regions != null) { 832 List<String> listForPrint = new ArrayList<>(); 833 for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { 834 StringBuilder families = new StringBuilder(); 835 for (int i = 0; i < r.getValue().size(); i++) { 836 if (i > 0) { 837 families.append(","); 838 } 839 families.append(Bytes.toString(r.getValue().get(i))); 840 } 841 listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); 842 } 843 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs 844 + "; forcing (partial) flush of " + regions.size() + " region(s): " 845 + StringUtils.join(",", listForPrint)); 846 } 847 return regions; 848 } 849 850 /** 851 * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file. 852 */ 853 private void markClosedAndClean(Path path) { 854 WALProps props = walFile2Props.get(path); 855 // typically this should not be null, but if there is no big issue if it is already null, so 856 // let's make the code more robust 857 if (props != null) { 858 props.closed = true; 859 cleanOldLogs(); 860 } 861 } 862 863 /** 864 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 865 * <p/> 866 * Use synchronized because we may call this method in different threads, normally when replacing 867 * writer, and since now close writer may be asynchronous, we will also call this method in the 868 * closeExecutor, right after we actually close a WAL writer. 869 */ 870 private synchronized void cleanOldLogs() { 871 List<Pair<Path, Long>> logsToArchive = null; 872 long now = System.nanoTime(); 873 boolean mayLogTooOld = nextLogTooOldNs <= now; 874 ArrayList<byte[]> regionsBlockingWal = null; 875 // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids 876 // are older than what is currently in memory, the WAL can be GC'd. 877 for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) { 878 if (!e.getValue().closed) { 879 LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey()); 880 continue; 881 } 882 Path log = e.getKey(); 883 ArrayList<byte[]> regionsBlockingThisWal = null; 884 long ageNs = now - e.getValue().rollTimeNs; 885 if (ageNs > walTooOldNs) { 886 if (mayLogTooOld && regionsBlockingWal == null) { 887 regionsBlockingWal = new ArrayList<>(); 888 } 889 regionsBlockingThisWal = regionsBlockingWal; 890 } 891 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 892 if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) { 893 if (logsToArchive == null) { 894 logsToArchive = new ArrayList<>(); 895 } 896 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 897 if (LOG.isTraceEnabled()) { 898 LOG.trace("WAL file ready for archiving " + log); 899 } 900 } else if (regionsBlockingThisWal != null) { 901 StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ") 902 .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: "); 903 boolean isFirst = true; 904 for (byte[] region : regionsBlockingThisWal) { 905 if (!isFirst) { 906 sb.append("; "); 907 } 908 isFirst = false; 909 sb.append(Bytes.toString(region)); 910 } 911 LOG.warn(sb.toString()); 912 nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS; 913 regionsBlockingThisWal.clear(); 914 } 915 } 916 917 if (logsToArchive != null) { 918 final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; 919 // make it async 920 for (Pair<Path, Long> log : localLogsToArchive) { 921 logArchiveExecutor.execute(() -> { 922 archive(log); 923 }); 924 this.walFile2Props.remove(log.getFirst()); 925 } 926 } 927 } 928 929 protected void archive(final Pair<Path, Long> log) { 930 totalLogSize.addAndGet(-log.getSecond()); 931 int retry = 1; 932 while (true) { 933 try { 934 archiveLogFile(log.getFirst()); 935 // successful 936 break; 937 } catch (Throwable e) { 938 if (retry > archiveRetries) { 939 LOG.error("Failed log archiving for the log {},", log.getFirst(), e); 940 if (this.abortable != null) { 941 this.abortable.abort("Failed log archiving", e); 942 break; 943 } 944 } else { 945 LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e); 946 } 947 retry++; 948 } 949 } 950 } 951 952 /* 953 * only public so WALSplitter can use. 954 * @return archived location of a WAL file with the given path p 955 */ 956 public static Path getWALArchivePath(Path archiveDir, Path p) { 957 return new Path(archiveDir, p.getName()); 958 } 959 960 protected void archiveLogFile(final Path p) throws IOException { 961 Path newPath = getWALArchivePath(this.walArchiveDir, p); 962 // Tell our listeners that a log is going to be archived. 963 if (!this.listeners.isEmpty()) { 964 for (WALActionsListener i : this.listeners) { 965 i.preLogArchive(p, newPath); 966 } 967 } 968 LOG.info("Archiving " + p + " to " + newPath); 969 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 970 throw new IOException("Unable to rename " + p + " to " + newPath); 971 } 972 // Tell our listeners that a log has been archived. 973 if (!this.listeners.isEmpty()) { 974 for (WALActionsListener i : this.listeners) { 975 i.postLogArchive(p, newPath); 976 } 977 } 978 } 979 980 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 981 int oldNumEntries = this.numEntries.getAndSet(0); 982 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 983 if (oldPath != null) { 984 this.walFile2Props.put(oldPath, 985 new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 986 this.totalLogSize.addAndGet(oldFileLen); 987 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 988 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 989 newPathString); 990 } else { 991 LOG.info("New WAL {}", newPathString); 992 } 993 } 994 995 private Span createSpan(String name) { 996 return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName); 997 } 998 999 /** 1000 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 1001 * <p/> 1002 * <ul> 1003 * <li>In the case of creating a new WAL, oldPath will be null.</li> 1004 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 1005 * </li> 1006 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 1007 * null.</li> 1008 * </ul> 1009 * @param oldPath may be null 1010 * @param newPath may be null 1011 * @param nextWriter may be null 1012 * @return the passed in <code>newPath</code> 1013 * @throws IOException if there is a problem flushing or closing the underlying FS 1014 */ 1015 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 1016 return TraceUtil.trace(() -> { 1017 doReplaceWriter(oldPath, newPath, nextWriter); 1018 return newPath; 1019 }, () -> createSpan("WAL.replaceWriter")); 1020 } 1021 1022 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 1023 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 1024 try { 1025 if (syncFuture != null) { 1026 if (closed) { 1027 throw new IOException("WAL has been closed"); 1028 } else { 1029 syncFuture.get(walSyncTimeoutNs); 1030 } 1031 } 1032 } catch (TimeoutIOException tioe) { 1033 throw new WALSyncTimeoutIOException(tioe); 1034 } catch (InterruptedException ie) { 1035 LOG.warn("Interrupted", ie); 1036 throw convertInterruptedExceptionToIOException(ie); 1037 } catch (ExecutionException e) { 1038 throw ensureIOException(e.getCause()); 1039 } 1040 } 1041 1042 private static IOException ensureIOException(final Throwable t) { 1043 return (t instanceof IOException) ? (IOException) t : new IOException(t); 1044 } 1045 1046 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 1047 Thread.currentThread().interrupt(); 1048 IOException ioe = new InterruptedIOException(); 1049 ioe.initCause(ie); 1050 return ioe; 1051 } 1052 1053 private W createCombinedWriter(W localWriter, Path localPath) 1054 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 1055 // retry forever if we can not create the remote writer to prevent aborting the RS due to log 1056 // rolling error, unless the skipRemoteWal is set to true. 1057 // TODO: since for now we only have one thread doing log rolling, this may block the rolling for 1058 // other wals 1059 Path remoteWAL = new Path(remoteWALDir, localPath.getName()); 1060 for (int retry = 0;; retry++) { 1061 if (skipRemoteWAL) { 1062 return localWriter; 1063 } 1064 W remoteWriter; 1065 try { 1066 remoteWriter = createWriterInstance(remoteFs, remoteWAL); 1067 } catch (IOException e) { 1068 LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); 1069 try { 1070 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 1071 } catch (InterruptedException ie) { 1072 // restore the interrupt state 1073 Thread.currentThread().interrupt(); 1074 // must close local writer here otherwise no one will close it for us 1075 Closeables.close(localWriter, true); 1076 throw (IOException) new InterruptedIOException().initCause(ie); 1077 } 1078 continue; 1079 } 1080 return createCombinedWriter(localWriter, remoteWriter); 1081 } 1082 } 1083 1084 private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException { 1085 rollWriterLock.lock(); 1086 try { 1087 if (this.closed) { 1088 throw new WALClosedException("WAL has been closed"); 1089 } 1090 // Return if nothing to flush. 1091 if (!force && this.writer != null && this.numEntries.get() <= 0) { 1092 return null; 1093 } 1094 Map<byte[], List<byte[]>> regionsToFlush = null; 1095 try { 1096 Path oldPath = getOldPath(); 1097 Path newPath = getNewPath(); 1098 // Any exception from here on is catastrophic, non-recoverable, so we currently abort. 1099 W nextWriter = this.createWriterInstance(fs, newPath); 1100 if (remoteFs != null) { 1101 // create a remote wal if necessary 1102 nextWriter = createCombinedWriter(nextWriter, newPath); 1103 } 1104 tellListenersAboutPreLogRoll(oldPath, newPath); 1105 // NewPath could be equal to oldPath if replaceWriter fails. 1106 newPath = replaceWriter(oldPath, newPath, nextWriter); 1107 tellListenersAboutPostLogRoll(oldPath, newPath); 1108 if (LOG.isDebugEnabled()) { 1109 LOG.debug("Create new " + implClassName + " writer with pipeline: " 1110 + FanOutOneBlockAsyncDFSOutputHelper 1111 .getDataNodeInfo(Arrays.stream(getPipeline()).collect(Collectors.toList()))); 1112 } 1113 // We got a new writer, so reset the slow sync count 1114 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 1115 slowSyncCount.set(0); 1116 // Can we delete any of the old log files? 1117 if (getNumRolledLogFiles() > 0) { 1118 cleanOldLogs(); 1119 regionsToFlush = findRegionsToForceFlush(); 1120 } 1121 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 1122 // If the underlying FileSystem can't do what we ask, treat as IO failure, so 1123 // we'll abort. 1124 throw new IOException( 1125 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 1126 exception); 1127 } 1128 return regionsToFlush; 1129 } finally { 1130 rollWriterLock.unlock(); 1131 } 1132 } 1133 1134 @Override 1135 public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { 1136 return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter")); 1137 } 1138 1139 // public only until class moves to o.a.h.h.wal 1140 /** Returns the size of log files in use */ 1141 public long getLogFileSize() { 1142 return this.totalLogSize.get(); 1143 } 1144 1145 // public only until class moves to o.a.h.h.wal 1146 public void requestLogRoll() { 1147 requestLogRoll(ERROR); 1148 } 1149 1150 /** 1151 * Get the backing files associated with this WAL. 1152 * @return may be null if there are no files. 1153 */ 1154 FileStatus[] getFiles() throws IOException { 1155 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 1156 } 1157 1158 @Override 1159 public void shutdown() throws IOException { 1160 if (!shutdown.compareAndSet(false, true)) { 1161 return; 1162 } 1163 closed = true; 1164 // Tell our listeners that the log is closing 1165 if (!this.listeners.isEmpty()) { 1166 for (WALActionsListener i : this.listeners) { 1167 i.logCloseRequested(); 1168 } 1169 } 1170 1171 ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( 1172 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); 1173 1174 Future<Void> future = shutdownExecutor.submit(new Callable<Void>() { 1175 @Override 1176 public Void call() throws Exception { 1177 if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { 1178 try { 1179 doShutdown(); 1180 if (syncFutureCache != null) { 1181 syncFutureCache.clear(); 1182 } 1183 } finally { 1184 rollWriterLock.unlock(); 1185 } 1186 } else { 1187 throw new IOException("Waiting for rollWriterLock timeout"); 1188 } 1189 return null; 1190 } 1191 }); 1192 shutdownExecutor.shutdown(); 1193 1194 try { 1195 future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); 1196 } catch (InterruptedException e) { 1197 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1198 } catch (TimeoutException e) { 1199 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1200 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1201 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1202 + "\"", e); 1203 } catch (ExecutionException e) { 1204 if (e.getCause() instanceof IOException) { 1205 throw (IOException) e.getCause(); 1206 } else { 1207 throw new IOException(e.getCause()); 1208 } 1209 } finally { 1210 // in shutdown, we may call cleanOldLogs so shutdown this executor in the end. 1211 // In sync replication implementation, we may shut down a WAL without shutting down the whole 1212 // region server, if we shut down this executor earlier we may get reject execution exception 1213 // and abort the region server 1214 logArchiveExecutor.shutdown(); 1215 } 1216 // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still 1217 // have some pending archiving tasks not finished yet, and in close we may archive all the 1218 // remaining WAL files, there could be race if we do not wait for the background archive task 1219 // finish 1220 try { 1221 if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) { 1222 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1223 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1224 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1225 + "\""); 1226 } 1227 } catch (InterruptedException e) { 1228 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1229 } 1230 } 1231 1232 @Override 1233 public void close() throws IOException { 1234 shutdown(); 1235 final FileStatus[] files = getFiles(); 1236 if (null != files && 0 != files.length) { 1237 for (FileStatus file : files) { 1238 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 1239 // Tell our listeners that a log is going to be archived. 1240 if (!this.listeners.isEmpty()) { 1241 for (WALActionsListener i : this.listeners) { 1242 i.preLogArchive(file.getPath(), p); 1243 } 1244 } 1245 1246 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 1247 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 1248 } 1249 // Tell our listeners that a log was archived. 1250 if (!this.listeners.isEmpty()) { 1251 for (WALActionsListener i : this.listeners) { 1252 i.postLogArchive(file.getPath(), p); 1253 } 1254 } 1255 } 1256 LOG.debug( 1257 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 1258 } 1259 LOG.info("Closed WAL: " + toString()); 1260 } 1261 1262 /** Returns number of WALs currently in the process of closing. */ 1263 public int getInflightWALCloseCount() { 1264 return inflightWALClosures.size(); 1265 } 1266 1267 /** 1268 * updates the sequence number of a specific store. depending on the flag: replaces current seq 1269 * number if the given seq id is bigger, or even if it is lower than existing one 1270 */ 1271 @Override 1272 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 1273 boolean onlyIfGreater) { 1274 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 1275 } 1276 1277 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 1278 return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); 1279 } 1280 1281 protected boolean isLogRollRequested() { 1282 return rollRequested.get(); 1283 } 1284 1285 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 1286 // If we have already requested a roll, don't do it again 1287 // And only set rollRequested to true when there is a registered listener 1288 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 1289 for (WALActionsListener i : this.listeners) { 1290 i.logRollRequested(reason); 1291 } 1292 } 1293 } 1294 1295 long getUnflushedEntriesCount() { 1296 long highestSynced = this.highestSyncedTxid.get(); 1297 long highestUnsynced = this.highestUnsyncedTxid; 1298 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 1299 } 1300 1301 boolean isUnflushedEntries() { 1302 return getUnflushedEntriesCount() > 0; 1303 } 1304 1305 /** 1306 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 1307 */ 1308 protected void atHeadOfRingBufferEventHandlerAppend() { 1309 // Noop 1310 } 1311 1312 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 1313 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 1314 atHeadOfRingBufferEventHandlerAppend(); 1315 long start = EnvironmentEdgeManager.currentTime(); 1316 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 1317 long regionSequenceId = entry.getKey().getSequenceId(); 1318 1319 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1320 // region sequence id only, a region edit/sequence id that is not associated with an actual 1321 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1322 if (entry.getEdit().isEmpty()) { 1323 return false; 1324 } 1325 1326 // Coprocessor hook. 1327 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1328 if (!listeners.isEmpty()) { 1329 for (WALActionsListener i : listeners) { 1330 i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1331 } 1332 } 1333 doAppend(writer, entry); 1334 assert highestUnsyncedTxid < entry.getTxid(); 1335 highestUnsyncedTxid = entry.getTxid(); 1336 if (entry.isCloseRegion()) { 1337 // let's clean all the records of this region 1338 sequenceIdAccounting.onRegionClose(encodedRegionName); 1339 } else { 1340 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1341 entry.isInMemStore()); 1342 } 1343 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1344 // Update metrics. 1345 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1346 numEntries.incrementAndGet(); 1347 return true; 1348 } 1349 1350 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 1351 long len = 0; 1352 if (!listeners.isEmpty()) { 1353 for (Cell cell : e.getEdit().getCells()) { 1354 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 1355 } 1356 for (WALActionsListener listener : listeners) { 1357 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 1358 } 1359 } 1360 return len; 1361 } 1362 1363 protected final void postSync(long timeInNanos, int handlerSyncs) { 1364 if (timeInNanos > this.slowSyncNs) { 1365 String msg = new StringBuilder().append("Slow sync cost: ") 1366 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ") 1367 .append(Arrays.toString(getPipeline())).toString(); 1368 LOG.info(msg); 1369 if (timeInNanos > this.rollOnSyncNs) { 1370 // A single sync took too long. 1371 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1372 // effects. Here we have a single data point that indicates we should take immediate 1373 // action, so do so. 1374 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" 1375 + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" 1376 + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " 1377 + Arrays.toString(getPipeline())); 1378 requestLogRoll(SLOW_SYNC); 1379 } 1380 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1381 } 1382 if (!listeners.isEmpty()) { 1383 for (WALActionsListener listener : listeners) { 1384 listener.postSync(timeInNanos, handlerSyncs); 1385 } 1386 } 1387 } 1388 1389 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1390 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { 1391 if (this.closed) { 1392 throw new IOException( 1393 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1394 } 1395 MutableLong txidHolder = new MutableLong(); 1396 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1397 txidHolder.setValue(ringBuffer.next()); 1398 }); 1399 long txid = txidHolder.longValue(); 1400 ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null); 1401 try { 1402 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1403 entry.stampRegionSequenceId(we); 1404 ringBuffer.get(txid).load(entry); 1405 } finally { 1406 ringBuffer.publish(txid); 1407 } 1408 return txid; 1409 } 1410 1411 @Override 1412 public String toString() { 1413 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1414 } 1415 1416 /** 1417 * if the given {@code path} is being written currently, then return its length. 1418 * <p> 1419 * This is used by replication to prevent replicating unacked log entries. See 1420 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1421 */ 1422 @Override 1423 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1424 rollWriterLock.lock(); 1425 try { 1426 Path currentPath = getOldPath(); 1427 if (path.equals(currentPath)) { 1428 // Currently active path. 1429 W writer = this.writer; 1430 return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); 1431 } else { 1432 W temp = inflightWALClosures.get(path.getName()); 1433 if (temp != null) { 1434 // In the process of being closed, trailer bytes may or may not be flushed. 1435 // Ensuring that we read all the bytes in a file is critical for correctness of tailing 1436 // use cases like replication, see HBASE-25924/HBASE-25932. 1437 return OptionalLong.of(temp.getSyncedLength()); 1438 } 1439 // Log rolled successfully. 1440 return OptionalLong.empty(); 1441 } 1442 } finally { 1443 rollWriterLock.unlock(); 1444 } 1445 } 1446 1447 @Override 1448 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1449 return TraceUtil.trace(() -> append(info, key, edits, true), 1450 () -> createSpan("WAL.appendData")); 1451 } 1452 1453 @Override 1454 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1455 return TraceUtil.trace(() -> append(info, key, edits, false), 1456 () -> createSpan("WAL.appendMarker")); 1457 } 1458 1459 /** 1460 * Helper that marks the future as DONE and offers it back to the cache. 1461 */ 1462 protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { 1463 future.done(txid, t); 1464 syncFutureCache.offer(future); 1465 } 1466 1467 private static boolean waitingRoll(int epochAndState) { 1468 return (epochAndState & 1) != 0; 1469 } 1470 1471 private static boolean writerBroken(int epochAndState) { 1472 return ((epochAndState >>> 1) & 1) != 0; 1473 } 1474 1475 private static int epoch(int epochAndState) { 1476 return epochAndState >>> 2; 1477 } 1478 1479 // return whether we have successfully set readyForRolling to true. 1480 private boolean trySetReadyForRolling() { 1481 // Check without holding lock first. Usually we will just return here. 1482 // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe 1483 // to check them outside the consumeLock. 1484 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 1485 return false; 1486 } 1487 consumeLock.lock(); 1488 try { 1489 // 1. a roll is requested 1490 // 2. all out-going entries have been acked(we have confirmed above). 1491 if (waitingRoll(epochAndState)) { 1492 readyForRolling = true; 1493 readyForRollingCond.signalAll(); 1494 return true; 1495 } else { 1496 return false; 1497 } 1498 } finally { 1499 consumeLock.unlock(); 1500 } 1501 } 1502 1503 private void syncFailed(long epochWhenSync, Throwable error) { 1504 LOG.warn("sync failed", error); 1505 this.onException(epochWhenSync, error); 1506 } 1507 1508 private void onException(long epochWhenSync, Throwable error) { 1509 boolean shouldRequestLogRoll = true; 1510 consumeLock.lock(); 1511 try { 1512 int currentEpochAndState = epochAndState; 1513 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 1514 // this is not the previous writer which means we have already rolled the writer. 1515 // or this is still the current writer, but we have already marked it as broken and request 1516 // a roll. 1517 return; 1518 } 1519 this.epochAndState = currentEpochAndState | 0b10; 1520 if (waitingRoll(currentEpochAndState)) { 1521 readyForRolling = true; 1522 readyForRollingCond.signalAll(); 1523 // this means we have already in the middle of a rollWriter so just tell the roller thread 1524 // that you can continue without requesting an extra log roll. 1525 shouldRequestLogRoll = false; 1526 } 1527 } finally { 1528 consumeLock.unlock(); 1529 } 1530 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 1531 toWriteAppends.addFirst(iter.next()); 1532 } 1533 highestUnsyncedTxid = highestSyncedTxid.get(); 1534 if (shouldRequestLogRoll) { 1535 // request a roll. 1536 requestLogRoll(ERROR); 1537 } 1538 } 1539 1540 private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) { 1541 // Please see the last several comments on HBASE-22761, it is possible that we get a 1542 // syncCompleted which acks a previous sync request after we received a syncFailed on the same 1543 // writer. So here we will also check on the epoch and state, if the epoch has already been 1544 // changed, i.e, we have already rolled the writer, or the writer is already broken, we should 1545 // just skip here, to avoid mess up the state or accidentally release some WAL entries and 1546 // cause data corruption. 1547 // The syncCompleted call is on the critical write path, so we should try our best to make it 1548 // fast. So here we do not hold consumeLock, for increasing performance. It is safe because 1549 // there are only 3 possible situations: 1550 // 1. For normal case, the only place where we change epochAndState is when rolling the writer. 1551 // Before rolling actually happen, we will only change the state to waitingRoll which is another 1552 // bit than writerBroken, and when we actually change the epoch, we can make sure that there is 1553 // no outgoing sync request. So we will always pass the check here and there is no problem. 1554 // 2. The writer is broken, but we have not called syncFailed yet. In this case, since 1555 // syncFailed and syncCompleted are executed in the same thread, we will just face the same 1556 // situation with #1. 1557 // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are 1558 // only 2 possible situations: 1559 // a. we arrive before we actually roll the writer, then we will find out the writer is broken 1560 // and give up. 1561 // b. we arrive after we actually roll the writer, then we will find out the epoch is changed 1562 // and give up. 1563 // For both #a and #b, we do not need to hold the consumeLock as we will always update the 1564 // epochAndState as a whole. 1565 // So in general, for all the cases above, we do not need to hold the consumeLock. 1566 int epochAndState = this.epochAndState; 1567 if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { 1568 LOG.warn("Got a sync complete call after the writer is broken, skip"); 1569 return; 1570 } 1571 1572 if (processedTxid < highestSyncedTxid.get()) { 1573 return; 1574 } 1575 highestSyncedTxid.set(processedTxid); 1576 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 1577 FSWALEntry entry = iter.next(); 1578 if (entry.getTxid() <= processedTxid) { 1579 entry.release(); 1580 iter.remove(); 1581 } else { 1582 break; 1583 } 1584 } 1585 postSync(System.nanoTime() - startTimeNs, finishSync()); 1586 /** 1587 * This method is used to be compatible with the original logic of {@link FSHLog}. 1588 */ 1589 checkSlowSyncCount(); 1590 if (trySetReadyForRolling()) { 1591 // we have just finished a roll, then do not need to check for log rolling, the writer will be 1592 // closed soon. 1593 return; 1594 } 1595 // If we haven't already requested a roll, check if we have exceeded logrollsize 1596 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 1597 if (LOG.isDebugEnabled()) { 1598 LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() 1599 + ", logrollsize=" + logrollsize); 1600 } 1601 requestLogRoll(SIZE); 1602 } 1603 } 1604 1605 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 1606 // sync futures then just use the default one. 1607 private boolean isHsync(long beginTxid, long endTxid) { 1608 SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), 1609 new SyncFuture().reset(endTxid + 1, false)); 1610 if (futures.isEmpty()) { 1611 return useHsync; 1612 } 1613 for (SyncFuture future : futures) { 1614 if (future.isForceSync()) { 1615 return true; 1616 } 1617 } 1618 return false; 1619 } 1620 1621 private void sync(W writer) { 1622 fileLengthAtLastSync = writer.getLength(); 1623 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 1624 boolean shouldUseHsync = 1625 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 1626 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 1627 final long startTimeNs = System.nanoTime(); 1628 final long epoch = (long) epochAndState >>> 2L; 1629 addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid), 1630 (result, error) -> { 1631 if (error != null) { 1632 syncFailed(epoch, error); 1633 } else { 1634 long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result); 1635 syncCompleted(epoch, writer, syncedTxid, startTimeNs); 1636 } 1637 }, consumeExecutor); 1638 } 1639 1640 /** 1641 * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use 1642 * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling 1643 * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we 1644 * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as 1645 * successful syncedTxid. 1646 */ 1647 protected long getSyncedTxid(long processedTxid, long completableFutureResult) { 1648 return processedTxid; 1649 } 1650 1651 protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync, 1652 long txidWhenSyn); 1653 1654 private int finishSyncLowerThanTxid(long txid) { 1655 int finished = 0; 1656 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 1657 SyncFuture sync = iter.next(); 1658 if (sync.getTxid() <= txid) { 1659 markFutureDoneAndOffer(sync, txid, null); 1660 iter.remove(); 1661 finished++; 1662 } else { 1663 break; 1664 } 1665 } 1666 return finished; 1667 } 1668 1669 // try advancing the highestSyncedTxid as much as possible 1670 private int finishSync() { 1671 if (unackedAppends.isEmpty()) { 1672 // All outstanding appends have been acked. 1673 if (toWriteAppends.isEmpty()) { 1674 // Also no appends that wait to be written out, then just finished all pending syncs. 1675 long maxSyncTxid = highestSyncedTxid.get(); 1676 for (SyncFuture sync : syncFutures) { 1677 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 1678 markFutureDoneAndOffer(sync, maxSyncTxid, null); 1679 } 1680 highestSyncedTxid.set(maxSyncTxid); 1681 int finished = syncFutures.size(); 1682 syncFutures.clear(); 1683 return finished; 1684 } else { 1685 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 1686 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 1687 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 1688 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 1689 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 1690 long doneTxid = lowestUnprocessedAppendTxid - 1; 1691 highestSyncedTxid.set(doneTxid); 1692 return finishSyncLowerThanTxid(doneTxid); 1693 } 1694 } else { 1695 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 1696 // first unacked append minus 1. 1697 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 1698 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 1699 highestSyncedTxid.set(doneTxid); 1700 return finishSyncLowerThanTxid(doneTxid); 1701 } 1702 } 1703 1704 // confirm non-empty before calling 1705 private static long getLastTxid(Deque<FSWALEntry> queue) { 1706 return queue.peekLast().getTxid(); 1707 } 1708 1709 private void appendAndSync() throws IOException { 1710 final W writer = this.writer; 1711 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 1712 // finish some. 1713 finishSync(); 1714 long newHighestProcessedAppendTxid = -1L; 1715 // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single 1716 // threaded, this could save us some cycles 1717 boolean addedToUnackedAppends = false; 1718 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 1719 FSWALEntry entry = iter.next(); 1720 /** 1721 * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not 1722 * throw any IOException. 1723 */ 1724 boolean appended = appendEntry(writer, entry); 1725 newHighestProcessedAppendTxid = entry.getTxid(); 1726 iter.remove(); 1727 if (appended) { 1728 // This is possible, when we fail to sync, we will add the unackedAppends back to 1729 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 1730 if ( 1731 addedToUnackedAppends || unackedAppends.isEmpty() 1732 || getLastTxid(unackedAppends) < entry.getTxid() 1733 ) { 1734 unackedAppends.addLast(entry); 1735 addedToUnackedAppends = true; 1736 } 1737 // See HBASE-25905, here we need to make sure that, we will always write all the entries in 1738 // unackedAppends out. As the code in the consume method will assume that, the entries in 1739 // unackedAppends have all been sent out so if there is roll request and unackedAppends is 1740 // not empty, we could just return as later there will be a syncCompleted call to clear the 1741 // unackedAppends, or a syncFailed to lead us to another state. 1742 // There could be other ways to fix, such as changing the logic in the consume method, but 1743 // it will break the assumption and then (may) lead to a big refactoring. So here let's use 1744 // this way to fix first, can optimize later. 1745 if ( 1746 writer.getLength() - fileLengthAtLastSync >= batchSize 1747 && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) 1748 ) { 1749 break; 1750 } 1751 } 1752 } 1753 // if we have a newer transaction id, update it. 1754 // otherwise, use the previous transaction id. 1755 if (newHighestProcessedAppendTxid > 0) { 1756 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 1757 } else { 1758 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 1759 } 1760 1761 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 1762 // sync because buffer size limit. 1763 sync(writer); 1764 return; 1765 } 1766 if (writer.getLength() == fileLengthAtLastSync) { 1767 // we haven't written anything out, just advance the highestSyncedSequence since we may only 1768 // stamp some region sequence id. 1769 if (unackedAppends.isEmpty()) { 1770 highestSyncedTxid.set(highestProcessedAppendTxid); 1771 finishSync(); 1772 trySetReadyForRolling(); 1773 } 1774 return; 1775 } 1776 // reach here means that we have some unsynced data but haven't reached the batch size yet, 1777 // but we will not issue a sync directly here even if there are sync requests because we may 1778 // have some new data in the ringbuffer, so let's just return here and delay the decision of 1779 // whether to issue a sync in the caller method. 1780 } 1781 1782 private void consume() { 1783 consumeLock.lock(); 1784 try { 1785 int currentEpochAndState = epochAndState; 1786 if (writerBroken(currentEpochAndState)) { 1787 return; 1788 } 1789 if (waitingRoll(currentEpochAndState)) { 1790 if (writer.getLength() > fileLengthAtLastSync) { 1791 // issue a sync 1792 sync(writer); 1793 } else { 1794 if (unackedAppends.isEmpty()) { 1795 readyForRolling = true; 1796 readyForRollingCond.signalAll(); 1797 } 1798 } 1799 return; 1800 } 1801 } finally { 1802 consumeLock.unlock(); 1803 } 1804 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 1805 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 1806 <= cursorBound; nextCursor++) { 1807 if (!waitingConsumePayloads.isPublished(nextCursor)) { 1808 break; 1809 } 1810 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 1811 switch (truck.type()) { 1812 case APPEND: 1813 toWriteAppends.addLast(truck.unloadAppend()); 1814 break; 1815 case SYNC: 1816 syncFutures.add(truck.unloadSync()); 1817 break; 1818 default: 1819 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 1820 break; 1821 } 1822 waitingConsumePayloadsGatingSequence.set(nextCursor); 1823 } 1824 1825 /** 1826 * This method is used to be compatible with the original logic of {@link AsyncFSWAL}. 1827 */ 1828 if (markerEditOnly) { 1829 drainNonMarkerEditsAndFailSyncs(); 1830 } 1831 try { 1832 appendAndSync(); 1833 } catch (IOException exception) { 1834 /** 1835 * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't 1836 * go in here. 1837 */ 1838 LOG.error("appendAndSync throws IOException.", exception); 1839 onAppendEntryFailed(exception); 1840 return; 1841 } 1842 if (hasConsumerTask.get()) { 1843 return; 1844 } 1845 if (toWriteAppends.isEmpty()) { 1846 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 1847 consumerScheduled.set(false); 1848 // recheck here since in append and sync we do not hold the consumeLock. Thing may 1849 // happen like 1850 // 1. we check cursor, no new entry 1851 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 1852 // give up scheduling the consumer task. 1853 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 1854 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 1855 // we will give up consuming so if there are some unsynced data we need to issue a sync. 1856 if ( 1857 writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() 1858 && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync 1859 ) { 1860 // no new data in the ringbuffer and we have at least one sync request 1861 sync(writer); 1862 } 1863 return; 1864 } else { 1865 // maybe someone has grabbed this before us 1866 if (!consumerScheduled.compareAndSet(false, true)) { 1867 return; 1868 } 1869 } 1870 } 1871 } 1872 // reschedule if we still have something to write. 1873 consumeExecutor.execute(consumer); 1874 } 1875 1876 private boolean shouldScheduleConsumer() { 1877 int currentEpochAndState = epochAndState; 1878 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 1879 return false; 1880 } 1881 return consumerScheduled.compareAndSet(false, true); 1882 } 1883 1884 /** 1885 * Append a set of edits to the WAL. 1886 * <p/> 1887 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1888 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1889 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1890 * <p/> 1891 * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc 1892 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1893 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1894 * 'complete' the transaction this mvcc transaction by calling 1895 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1896 * in the finally of a try/finally block within which this appends lives and any subsequent 1897 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1898 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1899 * immediately available on return from this method. It WILL be available subsequent to a sync of 1900 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1901 * @param hri the regioninfo associated with append 1902 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1903 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1904 * sequence id that is after all currently appended edits. 1905 * @param inMemstore Always true except for case where we are writing a region event meta marker 1906 * edit, for example, a compaction completion record into the WAL or noting a 1907 * Region Open event. In these cases the entry is just so we can finish an 1908 * unfinished compaction after a crash when the new Server reads the WAL on 1909 * recovery, etc. These transition event 'Markers' do not go via the memstore. 1910 * When memstore is false, we presume a Marker event edit. 1911 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1912 * in it. 1913 */ 1914 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1915 throws IOException { 1916 if (markerEditOnly && !edits.isMetaEdit()) { 1917 throw new IOException("WAL is closing, only marker edit is allowed"); 1918 } 1919 long txid = 1920 stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); 1921 if (shouldScheduleConsumer()) { 1922 consumeExecutor.execute(consumer); 1923 } 1924 return txid; 1925 } 1926 1927 protected void doSync(boolean forceSync) throws IOException { 1928 long txid = waitingConsumePayloads.next(); 1929 SyncFuture future; 1930 try { 1931 future = getSyncFuture(txid, forceSync); 1932 RingBufferTruck truck = waitingConsumePayloads.get(txid); 1933 truck.load(future); 1934 } finally { 1935 waitingConsumePayloads.publish(txid); 1936 } 1937 if (shouldScheduleConsumer()) { 1938 consumeExecutor.execute(consumer); 1939 } 1940 blockOnSync(future); 1941 } 1942 1943 protected void doSync(long txid, boolean forceSync) throws IOException { 1944 if (highestSyncedTxid.get() >= txid) { 1945 return; 1946 } 1947 // here we do not use ring buffer sequence as txid 1948 long sequence = waitingConsumePayloads.next(); 1949 SyncFuture future; 1950 try { 1951 future = getSyncFuture(txid, forceSync); 1952 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 1953 truck.load(future); 1954 } finally { 1955 waitingConsumePayloads.publish(sequence); 1956 } 1957 if (shouldScheduleConsumer()) { 1958 consumeExecutor.execute(consumer); 1959 } 1960 blockOnSync(future); 1961 } 1962 1963 private void drainNonMarkerEditsAndFailSyncs() { 1964 if (toWriteAppends.isEmpty()) { 1965 return; 1966 } 1967 boolean hasNonMarkerEdits = false; 1968 Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator(); 1969 while (iter.hasNext()) { 1970 FSWALEntry entry = iter.next(); 1971 if (!entry.getEdit().isMetaEdit()) { 1972 entry.release(); 1973 hasNonMarkerEdits = true; 1974 break; 1975 } 1976 } 1977 if (hasNonMarkerEdits) { 1978 for (;;) { 1979 iter.remove(); 1980 if (!iter.hasNext()) { 1981 break; 1982 } 1983 iter.next().release(); 1984 } 1985 for (FSWALEntry entry : unackedAppends) { 1986 entry.release(); 1987 } 1988 unackedAppends.clear(); 1989 // fail the sync futures which are under the txid of the first remaining edit, if none, fail 1990 // all the sync futures. 1991 long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); 1992 IOException error = new IOException("WAL is closing, only marker edit is allowed"); 1993 for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) { 1994 SyncFuture future = syncIter.next(); 1995 if (future.getTxid() < txid) { 1996 markFutureDoneAndOffer(future, future.getTxid(), error); 1997 syncIter.remove(); 1998 } else { 1999 break; 2000 } 2001 } 2002 } 2003 } 2004 2005 protected abstract W createWriterInstance(FileSystem fs, Path path) 2006 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 2007 2008 protected abstract W createCombinedWriter(W localWriter, W remoteWriter); 2009 2010 protected final void waitForSafePoint() { 2011 consumeLock.lock(); 2012 try { 2013 int currentEpochAndState = epochAndState; 2014 if (writerBroken(currentEpochAndState) || this.writer == null) { 2015 return; 2016 } 2017 consumerScheduled.set(true); 2018 epochAndState = currentEpochAndState | 1; 2019 readyForRolling = false; 2020 consumeExecutor.execute(consumer); 2021 while (!readyForRolling) { 2022 readyForRollingCond.awaitUninterruptibly(); 2023 } 2024 } finally { 2025 consumeLock.unlock(); 2026 } 2027 } 2028 2029 private void recoverLease(FileSystem fs, Path p, Configuration conf) { 2030 try { 2031 RecoverLeaseFSUtils.recoverFileLease(fs, p, conf, null); 2032 } catch (IOException ex) { 2033 LOG.error("Unable to recover lease after several attempts. Give up.", ex); 2034 } 2035 } 2036 2037 protected final void closeWriter(W writer, Path path) { 2038 inflightWALClosures.put(path.getName(), writer); 2039 closeExecutor.execute(() -> { 2040 try { 2041 writer.close(); 2042 } catch (IOException e) { 2043 LOG.warn("close old writer failed.", e); 2044 recoverLease(this.fs, path, conf); 2045 } finally { 2046 // call this even if the above close fails, as there is no other chance we can set closed to 2047 // true, it will not cause big problems. 2048 markClosedAndClean(path); 2049 inflightWALClosures.remove(path.getName()); 2050 } 2051 }); 2052 } 2053 2054 /** 2055 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 2056 * will begin to work before returning from this method. If we clear the flag after returning from 2057 * this call, we may miss a roll request. The implementation class should choose a proper place to 2058 * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you 2059 * start writing to the new writer. 2060 */ 2061 protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 2062 Preconditions.checkNotNull(nextWriter); 2063 waitForSafePoint(); 2064 /** 2065 * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}. 2066 */ 2067 doCleanUpResources(); 2068 // we will call rollWriter in init method, where we want to create the first writer and 2069 // obviously the previous writer is null, so here we need this null check. And why we must call 2070 // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after 2071 // closing the writer asynchronously, we need to make sure the WALProps is put into 2072 // walFile2Props before we call markClosedAndClean 2073 if (writer != null) { 2074 long oldFileLen = writer.getLength(); 2075 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 2076 closeWriter(writer, oldPath); 2077 } else { 2078 logRollAndSetupWalProps(oldPath, newPath, 0); 2079 } 2080 this.writer = nextWriter; 2081 /** 2082 * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem 2083 * output after writer is replaced. 2084 */ 2085 onWriterReplaced(nextWriter); 2086 this.fileLengthAtLastSync = nextWriter.getLength(); 2087 this.highestProcessedAppendTxidAtLastSync = 0L; 2088 consumeLock.lock(); 2089 try { 2090 consumerScheduled.set(true); 2091 int currentEpoch = epochAndState >>> 2; 2092 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 2093 // set a new epoch and also clear waitingRoll and writerBroken 2094 this.epochAndState = nextEpoch << 2; 2095 // Reset rollRequested status 2096 rollRequested.set(false); 2097 consumeExecutor.execute(consumer); 2098 } finally { 2099 consumeLock.unlock(); 2100 } 2101 } 2102 2103 protected abstract void onWriterReplaced(W nextWriter); 2104 2105 protected void doShutdown() throws IOException { 2106 waitForSafePoint(); 2107 /** 2108 * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}. 2109 */ 2110 doCleanUpResources(); 2111 if (this.writer != null) { 2112 closeWriter(this.writer, getOldPath()); 2113 this.writer = null; 2114 } 2115 closeExecutor.shutdown(); 2116 try { 2117 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 2118 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" 2119 + " the close of async writer doesn't complete." 2120 + "Please check the status of underlying filesystem" 2121 + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey 2122 + "\""); 2123 } 2124 } catch (InterruptedException e) { 2125 LOG.error("The wait for close of async writer is interrupted"); 2126 Thread.currentThread().interrupt(); 2127 } 2128 IOException error = new IOException("WAL has been closed"); 2129 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 2130 // drain all the pending sync requests 2131 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 2132 <= cursorBound; nextCursor++) { 2133 if (!waitingConsumePayloads.isPublished(nextCursor)) { 2134 break; 2135 } 2136 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 2137 switch (truck.type()) { 2138 case SYNC: 2139 syncFutures.add(truck.unloadSync()); 2140 break; 2141 default: 2142 break; 2143 } 2144 } 2145 // and fail them 2146 syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); 2147 if (this.shouldShutDownConsumeExecutorWhenClose) { 2148 consumeExecutor.shutdown(); 2149 } 2150 } 2151 2152 protected void doCleanUpResources() { 2153 }; 2154 2155 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 2156 2157 /** 2158 * This method gets the pipeline for the current WAL. 2159 */ 2160 abstract DatanodeInfo[] getPipeline(); 2161 2162 /** 2163 * This method gets the datanode replication count for the current WAL. 2164 */ 2165 abstract int getLogReplication(); 2166 2167 protected abstract boolean doCheckLogLowReplication(); 2168 2169 protected boolean isWriterBroken() { 2170 return writerBroken(epochAndState); 2171 } 2172 2173 private void onAppendEntryFailed(IOException exception) { 2174 LOG.warn("append entry failed", exception); 2175 final long currentEpoch = (long) epochAndState >>> 2L; 2176 this.onException(currentEpoch, exception); 2177 } 2178 2179 protected void checkSlowSyncCount() { 2180 } 2181 2182 /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ 2183 protected boolean doCheckSlowSync() { 2184 boolean result = false; 2185 long now = EnvironmentEdgeManager.currentTime(); 2186 long elapsedTime = now - lastTimeCheckSlowSync; 2187 if (elapsedTime >= slowSyncCheckInterval) { 2188 if (slowSyncCount.get() >= slowSyncRollThreshold) { 2189 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 2190 // If two or more slowSyncCheckInterval have elapsed this is a corner case 2191 // where a train of slow syncs almost triggered us but then there was a long 2192 // interval from then until the one more that pushed us over. If so, we 2193 // should do nothing and let the count reset. 2194 if (LOG.isDebugEnabled()) { 2195 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count=" 2196 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime=" 2197 + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms"); 2198 } 2199 // Fall through to count reset below 2200 } else { 2201 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" 2202 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: " 2203 + Arrays.toString(getPipeline())); 2204 result = true; 2205 } 2206 } 2207 lastTimeCheckSlowSync = now; 2208 slowSyncCount.set(0); 2209 } 2210 return result; 2211 } 2212 2213 public void checkLogLowReplication(long checkInterval) { 2214 long now = EnvironmentEdgeManager.currentTime(); 2215 if (now - lastTimeCheckLowReplication < checkInterval) { 2216 return; 2217 } 2218 // Will return immediately if we are in the middle of a WAL log roll currently. 2219 if (!rollWriterLock.tryLock()) { 2220 return; 2221 } 2222 try { 2223 lastTimeCheckLowReplication = now; 2224 if (doCheckLogLowReplication()) { 2225 requestLogRoll(LOW_REPLICATION); 2226 } 2227 } finally { 2228 rollWriterLock.unlock(); 2229 } 2230 } 2231 2232 // Allow temporarily skipping the creation of remote writer. When failing to write to the remote 2233 // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we 2234 // need to write a close marker when closing a region, and if it fails, the whole rs will abort. 2235 // So here we need to skip the creation of remote writer and make it possible to write the region 2236 // close marker. 2237 // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing 2238 // any pending wal entries as they will be discarded. The remote cluster will replicate the 2239 // correct data back later. We still need to allow writing marker edits such as close region event 2240 // to allow closing a region. 2241 @Override 2242 public void skipRemoteWAL(boolean markerEditOnly) { 2243 if (markerEditOnly) { 2244 this.markerEditOnly = true; 2245 } 2246 this.skipRemoteWAL = true; 2247 } 2248 2249 private static void split(final Configuration conf, final Path p) throws IOException { 2250 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 2251 if (!fs.exists(p)) { 2252 throw new FileNotFoundException(p.toString()); 2253 } 2254 if (!fs.getFileStatus(p).isDirectory()) { 2255 throw new IOException(p + " is not a directory"); 2256 } 2257 2258 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 2259 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 2260 if ( 2261 conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 2262 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR) 2263 ) { 2264 archiveDir = new Path(archiveDir, p.getName()); 2265 } 2266 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 2267 } 2268 2269 W getWriter() { 2270 return this.writer; 2271 } 2272 2273 private static void usage() { 2274 System.err.println("Usage: AbstractFSWAL <ARGS>"); 2275 System.err.println("Arguments:"); 2276 System.err.println(" --dump Dump textual representation of passed one or more files"); 2277 System.err.println(" For example: " 2278 + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 2279 System.err.println(" --split Split the passed directory of WAL logs"); 2280 System.err.println( 2281 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 2282 } 2283 2284 /** 2285 * Pass one or more log file names, and it will either dump out a text version on 2286 * <code>stdout</code> or split the specified log files. 2287 */ 2288 public static void main(String[] args) throws IOException { 2289 if (args.length < 2) { 2290 usage(); 2291 System.exit(-1); 2292 } 2293 // either dump using the WALPrettyPrinter or split, depending on args 2294 if (args[0].compareTo("--dump") == 0) { 2295 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 2296 } else if (args[0].compareTo("--perf") == 0) { 2297 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 2298 LOG.error(HBaseMarkers.FATAL, 2299 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 2300 System.exit(-1); 2301 } else if (args[0].compareTo("--split") == 0) { 2302 Configuration conf = HBaseConfiguration.create(); 2303 for (int i = 1; i < args.length; i++) { 2304 try { 2305 Path logPath = new Path(args[i]); 2306 CommonFSUtils.setFsDefault(conf, logPath); 2307 split(conf, logPath); 2308 } catch (IOException t) { 2309 t.printStackTrace(System.err); 2310 System.exit(-1); 2311 } 2312 } 2313 } else { 2314 usage(); 2315 System.exit(-1); 2316 } 2317 } 2318}