001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 023import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 024import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; 025import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 026import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 027import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 028import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 029 030import com.lmax.disruptor.RingBuffer; 031import com.lmax.disruptor.Sequence; 032import com.lmax.disruptor.Sequencer; 033import io.opentelemetry.api.trace.Span; 034import java.io.FileNotFoundException; 035import java.io.IOException; 036import java.io.InterruptedIOException; 037import java.lang.management.MemoryType; 038import java.net.URLEncoder; 039import java.nio.charset.StandardCharsets; 040import java.util.ArrayDeque; 041import java.util.ArrayList; 042import java.util.Arrays; 043import java.util.Comparator; 044import java.util.Deque; 045import java.util.Iterator; 046import java.util.List; 047import java.util.Map; 048import java.util.OptionalLong; 049import java.util.Set; 050import java.util.SortedSet; 051import java.util.TreeSet; 052import java.util.concurrent.Callable; 053import java.util.concurrent.CompletableFuture; 054import java.util.concurrent.ConcurrentHashMap; 055import java.util.concurrent.ConcurrentNavigableMap; 056import java.util.concurrent.ConcurrentSkipListMap; 057import java.util.concurrent.CopyOnWriteArrayList; 058import java.util.concurrent.ExecutionException; 059import java.util.concurrent.ExecutorService; 060import java.util.concurrent.Executors; 061import java.util.concurrent.Future; 062import java.util.concurrent.LinkedBlockingQueue; 063import java.util.concurrent.ThreadPoolExecutor; 064import java.util.concurrent.TimeUnit; 065import java.util.concurrent.TimeoutException; 066import java.util.concurrent.atomic.AtomicBoolean; 067import java.util.concurrent.atomic.AtomicInteger; 068import java.util.concurrent.atomic.AtomicLong; 069import java.util.concurrent.locks.Condition; 070import java.util.concurrent.locks.Lock; 071import java.util.concurrent.locks.ReentrantLock; 072import java.util.function.Supplier; 073import org.apache.commons.lang3.mutable.MutableLong; 074import org.apache.hadoop.conf.Configuration; 075import org.apache.hadoop.fs.FileStatus; 076import org.apache.hadoop.fs.FileSystem; 077import org.apache.hadoop.fs.Path; 078import org.apache.hadoop.fs.PathFilter; 079import org.apache.hadoop.hbase.Abortable; 080import org.apache.hadoop.hbase.Cell; 081import org.apache.hadoop.hbase.HBaseConfiguration; 082import org.apache.hadoop.hbase.HConstants; 083import org.apache.hadoop.hbase.PrivateCellUtil; 084import org.apache.hadoop.hbase.client.ConnectionUtils; 085import org.apache.hadoop.hbase.client.RegionInfo; 086import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 087import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 088import org.apache.hadoop.hbase.ipc.RpcServer; 089import org.apache.hadoop.hbase.ipc.ServerCall; 090import org.apache.hadoop.hbase.log.HBaseMarkers; 091import org.apache.hadoop.hbase.regionserver.HRegion; 092import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 093import org.apache.hadoop.hbase.trace.TraceUtil; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.CommonFSUtils; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.Pair; 098import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 099import org.apache.hadoop.hbase.wal.WAL; 100import org.apache.hadoop.hbase.wal.WALEdit; 101import org.apache.hadoop.hbase.wal.WALFactory; 102import org.apache.hadoop.hbase.wal.WALKeyImpl; 103import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 104import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 105import org.apache.hadoop.hbase.wal.WALSplitter; 106import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 107import org.apache.hadoop.util.StringUtils; 108import org.apache.yetus.audience.InterfaceAudience; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111 112import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 113import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 114import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 115 116/** 117 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 118 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 119 * This is done internal to the implementation. 120 * <p> 121 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 122 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 123 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 124 * flushed out to hfiles, and what is yet in WAL and in memory only. 125 * <p> 126 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 127 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 128 * (smaller) than the most-recent flush. 129 * <p> 130 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read, 131 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for 132 * replication where we may want to tail the active WAL file. 133 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 134 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 135 * we have made successful appends to the WAL and we then are unable to sync them, our current 136 * semantic is to return error to the client that the appends failed but also to abort the current 137 * context, usually the hosting server. We need to replay the WALs. <br> 138 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 139 * that the append failed. <br> 140 * TODO: replication may pick up these last edits though they have been marked as failed append 141 * (Need to keep our own file lengths, not rely on HDFS). 142 */ 143@InterfaceAudience.Private 144public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 145 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 146 147 private static final Comparator<SyncFuture> SEQ_COMPARATOR = 148 Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); 149 150 private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec"; 151 private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900; 152 /** Don't log blocking regions more frequently than this. */ 153 private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); 154 155 protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms"; 156 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 157 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 158 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 159 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 160 "hbase.regionserver.wal.slowsync.roll.threshold"; 161 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 162 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 163 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 164 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 165 166 public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 167 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 168 169 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 170 171 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 172 173 public static final String RING_BUFFER_SLOT_COUNT = 174 "hbase.regionserver.wal.disruptor.event.count"; 175 176 public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; 177 public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; 178 179 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 180 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 181 182 public static final String WAL_AVOID_LOCAL_WRITES_KEY = 183 "hbase.regionserver.wal.avoid-local-writes"; 184 public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; 185 186 /** 187 * file system instance 188 */ 189 protected final FileSystem fs; 190 191 /** 192 * WAL directory, where all WAL files would be placed. 193 */ 194 protected final Path walDir; 195 196 private final FileSystem remoteFs; 197 198 private final Path remoteWALDir; 199 200 /** 201 * dir path where old logs are kept. 202 */ 203 protected final Path walArchiveDir; 204 205 /** 206 * Matches just those wal files that belong to this wal instance. 207 */ 208 protected final PathFilter ourFiles; 209 210 /** 211 * Prefix of a WAL file, usually the region server name it is hosted on. 212 */ 213 protected final String walFilePrefix; 214 215 /** 216 * Suffix included on generated wal file names 217 */ 218 protected final String walFileSuffix; 219 220 /** 221 * Prefix used when checking for wal membership. 222 */ 223 protected final String prefixPathStr; 224 225 protected final WALCoprocessorHost coprocessorHost; 226 227 /** 228 * conf object 229 */ 230 protected final Configuration conf; 231 232 protected final Abortable abortable; 233 234 /** Listeners that are called on WAL events. */ 235 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 236 237 /** Tracks the logs in the process of being closed. */ 238 protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>(); 239 240 /** 241 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 242 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 243 * facility for answering questions such as "Is it safe to GC a WAL?". 244 */ 245 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 246 247 /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */ 248 protected final long slowSyncNs, rollOnSyncNs; 249 protected final int slowSyncRollThreshold; 250 protected final int slowSyncCheckInterval; 251 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 252 253 private final long walSyncTimeoutNs; 254 255 private final long walTooOldNs; 256 257 // If > than this size, roll the log. 258 protected final long logrollsize; 259 260 /** 261 * Block size to use writing files. 262 */ 263 protected final long blocksize; 264 265 /* 266 * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If 267 * too many and we crash, then will take forever replaying. Keep the number of logs tidy. 268 */ 269 protected final int maxLogs; 270 271 protected final boolean useHsync; 272 273 /** 274 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 275 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 276 * warning when it thinks synchronized controls writer thread safety. It is held when we are 277 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 278 * not. 279 */ 280 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 281 282 // The timestamp (in ms) when the log file was created. 283 protected final AtomicLong filenum = new AtomicLong(-1); 284 285 // Number of transactions in the current Wal. 286 protected final AtomicInteger numEntries = new AtomicInteger(0); 287 288 /** 289 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 290 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 291 * corresponding entry in queue. 292 */ 293 protected volatile long highestUnsyncedTxid = -1; 294 295 /** 296 * Updated to the transaction id of the last successful sync call. This can be less than 297 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 298 * for it. 299 */ 300 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 301 302 /** 303 * The total size of wal 304 */ 305 protected final AtomicLong totalLogSize = new AtomicLong(0); 306 /** 307 * Current log file. 308 */ 309 volatile W writer; 310 311 // Last time to check low replication on hlog's pipeline 312 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 313 314 // Last time we asked to roll the log due to a slow sync 315 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 316 317 protected volatile boolean closed = false; 318 319 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 320 321 protected final long walShutdownTimeout; 322 323 private long nextLogTooOldNs = System.nanoTime(); 324 325 /** 326 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 327 * an IllegalArgumentException if used to compare paths from different wals. 328 */ 329 final Comparator<Path> LOG_NAME_COMPARATOR = 330 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 331 332 private static final class WALProps { 333 334 /** 335 * Map the encoded region name to the highest sequence id. 336 * <p/> 337 * Contains all the regions it has an entry for. 338 */ 339 private final Map<byte[], Long> encodedName2HighestSequenceId; 340 341 /** 342 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 343 * subclasses. 344 */ 345 private final long logSize; 346 347 /** 348 * The nanoTime of the log rolling, used to determine the time interval that has passed since. 349 */ 350 private final long rollTimeNs; 351 352 /** 353 * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the 354 * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file, 355 * for safety. 356 */ 357 private volatile boolean closed = false; 358 359 WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 360 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 361 this.logSize = logSize; 362 this.rollTimeNs = System.nanoTime(); 363 } 364 } 365 366 /** 367 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 368 * (contained in the log file name). 369 */ 370 protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props = 371 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 372 373 /** 374 * A cache of sync futures reused by threads. 375 */ 376 protected final SyncFutureCache syncFutureCache; 377 378 /** 379 * The class name of the runtime implementation, used as prefix for logging/tracing. 380 * <p> 381 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 382 * refer to HBASE-17676 for more details 383 * </p> 384 */ 385 protected final String implClassName; 386 387 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 388 389 protected final ExecutorService closeExecutor = Executors.newCachedThreadPool( 390 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 391 392 // Run in caller if we get reject execution exception, to avoid aborting region server when we get 393 // reject execution exception. Usually this should not happen but let's make it more robust. 394 private final ExecutorService logArchiveExecutor = 395 new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), 396 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), 397 new ThreadPoolExecutor.CallerRunsPolicy()); 398 399 private final int archiveRetries; 400 401 protected ExecutorService consumeExecutor; 402 403 private final Lock consumeLock = new ReentrantLock(); 404 405 protected final Runnable consumer = this::consume; 406 407 // check if there is already a consumer task in the event loop's task queue 408 protected Supplier<Boolean> hasConsumerTask; 409 410 private static final int MAX_EPOCH = 0x3FFFFFFF; 411 // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old 412 // writer to be closed. 413 // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter 414 // is needed. 415 // all other bits are the epoch number of the current writer, this is used to detect whether the 416 // writer is still the one when you issue the sync. 417 // notice that, modification to this field is only allowed under the protection of consumeLock. 418 private volatile int epochAndState; 419 420 private boolean readyForRolling; 421 422 private final Condition readyForRollingCond = consumeLock.newCondition(); 423 424 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 425 426 private final Sequence waitingConsumePayloadsGatingSequence; 427 428 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 429 430 private final long batchSize; 431 432 protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 433 434 protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 435 436 protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 437 438 // the highest txid of WAL entries being processed 439 protected long highestProcessedAppendTxid; 440 441 // file length when we issue last sync request on the writer 442 private long fileLengthAtLastSync; 443 444 private long highestProcessedAppendTxidAtLastSync; 445 446 private int waitOnShutdownInSeconds; 447 448 private String waitOnShutdownInSecondsConfigKey; 449 450 protected boolean shouldShutDownConsumeExecutorWhenClose = true; 451 452 private volatile boolean skipRemoteWAL = false; 453 454 private volatile boolean markerEditOnly = false; 455 456 public long getFilenum() { 457 return this.filenum.get(); 458 } 459 460 /** 461 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 462 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 463 * the filename is created with the {@link #computeFilename(long filenum)} method. 464 * @return timestamp, as in the log file name. 465 */ 466 protected long getFileNumFromFileName(Path fileName) { 467 checkNotNull(fileName, "file name can't be null"); 468 if (!ourFiles.accept(fileName)) { 469 throw new IllegalArgumentException( 470 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 471 } 472 final String fileNameString = fileName.toString(); 473 String chompedPath = fileNameString.substring(prefixPathStr.length(), 474 (fileNameString.length() - walFileSuffix.length())); 475 return Long.parseLong(chompedPath); 476 } 477 478 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 479 checkArgument(logRollSize > 0, 480 "The log roll size cannot be zero or negative when calculating max log files, " 481 + "current value is " + logRollSize); 482 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 483 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 484 } 485 486 // must be power of 2 487 protected final int getPreallocatedEventCount() { 488 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 489 // be stuck and make no progress if the buffer is filled with appends only and there is no 490 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 491 // before they return. 492 int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 493 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 494 int floor = Integer.highestOneBit(preallocatedEventCount); 495 if (floor == preallocatedEventCount) { 496 return floor; 497 } 498 // max capacity is 1 << 30 499 if (floor >= 1 << 29) { 500 return 1 << 30; 501 } 502 return floor << 1; 503 } 504 505 protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds, 506 String waitOnShutdownInSecondsConfigKey) { 507 this.waitOnShutdownInSeconds = waitOnShutdownInSeconds; 508 this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey; 509 } 510 511 protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir, 512 final String prefix) { 513 ThreadPoolExecutor threadPool = 514 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 515 new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:" 516 + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build()); 517 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 518 consumeExecutor = threadPool; 519 this.shouldShutDownConsumeExecutorWhenClose = true; 520 } 521 522 protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, 523 final String logDir, final String archiveDir, final Configuration conf, 524 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 525 final String suffix, FileSystem remoteFs, Path remoteWALDir) 526 throws FailedLogCloseException, IOException { 527 this.fs = fs; 528 this.walDir = new Path(rootDir, logDir); 529 this.walArchiveDir = new Path(rootDir, archiveDir); 530 this.conf = conf; 531 this.abortable = abortable; 532 this.remoteFs = remoteFs; 533 this.remoteWALDir = remoteWALDir; 534 535 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 536 throw new IOException("Unable to mkdir " + walDir); 537 } 538 539 if (!fs.exists(this.walArchiveDir)) { 540 if (!fs.mkdirs(this.walArchiveDir)) { 541 throw new IOException("Unable to mkdir " + this.walArchiveDir); 542 } 543 } 544 545 // If prefix is null||empty then just name it wal 546 this.walFilePrefix = prefix == null || prefix.isEmpty() 547 ? "wal" 548 : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name()); 549 // we only correctly differentiate suffices when numeric ones start with '.' 550 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 551 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER 552 + "' but instead was '" + suffix + "'"); 553 } 554 // Now that it exists, set the storage policy for the entire directory of wal files related to 555 // this FSHLog instance 556 String storagePolicy = 557 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 558 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 559 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 560 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 561 562 this.ourFiles = new PathFilter() { 563 @Override 564 public boolean accept(final Path fileName) { 565 // The path should start with dir/<prefix> and end with our suffix 566 final String fileNameString = fileName.toString(); 567 if (!fileNameString.startsWith(prefixPathStr)) { 568 return false; 569 } 570 if (walFileSuffix.isEmpty()) { 571 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 572 return org.apache.commons.lang3.StringUtils 573 .isNumeric(fileNameString.substring(prefixPathStr.length())); 574 } else if (!fileNameString.endsWith(walFileSuffix)) { 575 return false; 576 } 577 return true; 578 } 579 }; 580 581 if (failIfWALExists) { 582 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 583 if (null != walFiles && 0 != walFiles.length) { 584 throw new IOException("Target WAL already exists within directory " + walDir); 585 } 586 } 587 588 // Register listeners. TODO: Should this exist anymore? We have CPs? 589 if (listeners != null) { 590 for (WALActionsListener i : listeners) { 591 registerWALActionsListener(i); 592 } 593 } 594 this.coprocessorHost = new WALCoprocessorHost(this, conf); 595 596 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 597 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 598 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 599 // the block size but experience from the field has it that this was not enough time for the 600 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 601 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 602 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 603 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 604 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 605 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 606 this.logrollsize = (long) (this.blocksize * multiplier); 607 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 608 609 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" 610 + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" 611 + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir 612 + ", maxLogs=" + this.maxLogs); 613 this.slowSyncNs = 614 TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS)); 615 this.rollOnSyncNs = TimeUnit.MILLISECONDS 616 .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); 617 this.slowSyncRollThreshold = 618 conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 619 this.slowSyncCheckInterval = 620 conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 621 this.walSyncTimeoutNs = 622 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); 623 this.syncFutureCache = new SyncFutureCache(conf); 624 this.implClassName = getClass().getSimpleName(); 625 this.walTooOldNs = TimeUnit.SECONDS 626 .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); 627 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 628 archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); 629 this.walShutdownTimeout = 630 conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); 631 632 int preallocatedEventCount = 633 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 634 waitingConsumePayloads = 635 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 636 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 637 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 638 639 // inrease the ringbuffer sequence so our txid is start from 1 640 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 641 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 642 643 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 644 } 645 646 /** 647 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 648 */ 649 @Override 650 public void init() throws IOException { 651 rollWriter(); 652 } 653 654 @Override 655 public void registerWALActionsListener(WALActionsListener listener) { 656 this.listeners.add(listener); 657 } 658 659 @Override 660 public boolean unregisterWALActionsListener(WALActionsListener listener) { 661 return this.listeners.remove(listener); 662 } 663 664 @Override 665 public WALCoprocessorHost getCoprocessorHost() { 666 return coprocessorHost; 667 } 668 669 @Override 670 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 671 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 672 } 673 674 @Override 675 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 676 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 677 } 678 679 @Override 680 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 681 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 682 } 683 684 @Override 685 public void abortCacheFlush(byte[] encodedRegionName) { 686 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 687 } 688 689 @Override 690 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 691 // Used by tests. Deprecated as too subtle for general usage. 692 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 693 } 694 695 @Override 696 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 697 // This method is used by tests and for figuring if we should flush or not because our 698 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 699 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 700 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 701 // currently flushing sequence ids, and if anything found there, it is returning these. This is 702 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 703 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 704 // id is old even though we are currently flushing. This may mean we do too much flushing. 705 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 706 } 707 708 @Override 709 public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { 710 return rollWriter(false); 711 } 712 713 @Override 714 public final void sync() throws IOException { 715 sync(useHsync); 716 } 717 718 @Override 719 public final void sync(long txid) throws IOException { 720 sync(txid, useHsync); 721 } 722 723 @Override 724 public final void sync(boolean forceSync) throws IOException { 725 TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync")); 726 } 727 728 @Override 729 public final void sync(long txid, boolean forceSync) throws IOException { 730 TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); 731 } 732 733 /** 734 * This is a convenience method that computes a new filename with a given file-number. 735 * @param filenum to use 736 */ 737 protected Path computeFilename(final long filenum) { 738 if (filenum < 0) { 739 throw new RuntimeException("WAL file number can't be < 0"); 740 } 741 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 742 return new Path(walDir, child); 743 } 744 745 /** 746 * This is a convenience method that computes a new filename with a given using the current WAL 747 * file-number 748 */ 749 public Path getCurrentFileName() { 750 return computeFilename(this.filenum.get()); 751 } 752 753 /** 754 * retrieve the next path to use for writing. Increments the internal filenum. 755 */ 756 private Path getNewPath() throws IOException { 757 this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime())); 758 Path newPath = getCurrentFileName(); 759 return newPath; 760 } 761 762 public Path getOldPath() { 763 long currentFilenum = this.filenum.get(); 764 Path oldPath = null; 765 if (currentFilenum > 0) { 766 // ComputeFilename will take care of meta wal filename 767 oldPath = computeFilename(currentFilenum); 768 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 769 return oldPath; 770 } 771 772 /** 773 * Tell listeners about pre log roll. 774 */ 775 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 776 throws IOException { 777 coprocessorHost.preWALRoll(oldPath, newPath); 778 779 if (!this.listeners.isEmpty()) { 780 for (WALActionsListener i : this.listeners) { 781 i.preLogRoll(oldPath, newPath); 782 } 783 } 784 } 785 786 /** 787 * Tell listeners about post log roll. 788 */ 789 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 790 throws IOException { 791 if (!this.listeners.isEmpty()) { 792 for (WALActionsListener i : this.listeners) { 793 i.postLogRoll(oldPath, newPath); 794 } 795 } 796 797 coprocessorHost.postWALRoll(oldPath, newPath); 798 } 799 800 // public only until class moves to o.a.h.h.wal 801 /** Returns the number of rolled log files */ 802 public int getNumRolledLogFiles() { 803 return walFile2Props.size(); 804 } 805 806 // public only until class moves to o.a.h.h.wal 807 /** Returns the number of log files in use */ 808 public int getNumLogFiles() { 809 // +1 for current use log 810 return getNumRolledLogFiles() + 1; 811 } 812 813 /** 814 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the 815 * first (oldest) WAL, and return those regions which should be flushed so that it can be 816 * let-go/'archived'. 817 * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file 818 */ 819 Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { 820 Map<byte[], List<byte[]>> regions = null; 821 int logCount = getNumRolledLogFiles(); 822 if (logCount > this.maxLogs && logCount > 0) { 823 Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry(); 824 regions = 825 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 826 } 827 if (regions != null) { 828 List<String> listForPrint = new ArrayList<>(); 829 for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { 830 StringBuilder families = new StringBuilder(); 831 for (int i = 0; i < r.getValue().size(); i++) { 832 if (i > 0) { 833 families.append(","); 834 } 835 families.append(Bytes.toString(r.getValue().get(i))); 836 } 837 listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); 838 } 839 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs 840 + "; forcing (partial) flush of " + regions.size() + " region(s): " 841 + StringUtils.join(",", listForPrint)); 842 } 843 return regions; 844 } 845 846 /** 847 * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file. 848 */ 849 private void markClosedAndClean(Path path) { 850 WALProps props = walFile2Props.get(path); 851 // typically this should not be null, but if there is no big issue if it is already null, so 852 // let's make the code more robust 853 if (props != null) { 854 props.closed = true; 855 cleanOldLogs(); 856 } 857 } 858 859 /** 860 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 861 * <p/> 862 * Use synchronized because we may call this method in different threads, normally when replacing 863 * writer, and since now close writer may be asynchronous, we will also call this method in the 864 * closeExecutor, right after we actually close a WAL writer. 865 */ 866 private synchronized void cleanOldLogs() { 867 List<Pair<Path, Long>> logsToArchive = null; 868 long now = System.nanoTime(); 869 boolean mayLogTooOld = nextLogTooOldNs <= now; 870 ArrayList<byte[]> regionsBlockingWal = null; 871 // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids 872 // are older than what is currently in memory, the WAL can be GC'd. 873 for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) { 874 if (!e.getValue().closed) { 875 LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey()); 876 continue; 877 } 878 Path log = e.getKey(); 879 ArrayList<byte[]> regionsBlockingThisWal = null; 880 long ageNs = now - e.getValue().rollTimeNs; 881 if (ageNs > walTooOldNs) { 882 if (mayLogTooOld && regionsBlockingWal == null) { 883 regionsBlockingWal = new ArrayList<>(); 884 } 885 regionsBlockingThisWal = regionsBlockingWal; 886 } 887 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 888 if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) { 889 if (logsToArchive == null) { 890 logsToArchive = new ArrayList<>(); 891 } 892 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 893 if (LOG.isTraceEnabled()) { 894 LOG.trace("WAL file ready for archiving " + log); 895 } 896 } else if (regionsBlockingThisWal != null) { 897 StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ") 898 .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: "); 899 boolean isFirst = true; 900 for (byte[] region : regionsBlockingThisWal) { 901 if (!isFirst) { 902 sb.append("; "); 903 } 904 isFirst = false; 905 sb.append(Bytes.toString(region)); 906 } 907 LOG.warn(sb.toString()); 908 nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS; 909 regionsBlockingThisWal.clear(); 910 } 911 } 912 913 if (logsToArchive != null) { 914 final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; 915 // make it async 916 for (Pair<Path, Long> log : localLogsToArchive) { 917 logArchiveExecutor.execute(() -> { 918 archive(log); 919 }); 920 this.walFile2Props.remove(log.getFirst()); 921 } 922 } 923 } 924 925 protected void archive(final Pair<Path, Long> log) { 926 totalLogSize.addAndGet(-log.getSecond()); 927 int retry = 1; 928 while (true) { 929 try { 930 archiveLogFile(log.getFirst()); 931 // successful 932 break; 933 } catch (Throwable e) { 934 if (retry > archiveRetries) { 935 LOG.error("Failed log archiving for the log {},", log.getFirst(), e); 936 if (this.abortable != null) { 937 this.abortable.abort("Failed log archiving", e); 938 break; 939 } 940 } else { 941 LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e); 942 } 943 retry++; 944 } 945 } 946 } 947 948 /* 949 * only public so WALSplitter can use. 950 * @return archived location of a WAL file with the given path p 951 */ 952 public static Path getWALArchivePath(Path archiveDir, Path p) { 953 return new Path(archiveDir, p.getName()); 954 } 955 956 protected void archiveLogFile(final Path p) throws IOException { 957 Path newPath = getWALArchivePath(this.walArchiveDir, p); 958 // Tell our listeners that a log is going to be archived. 959 if (!this.listeners.isEmpty()) { 960 for (WALActionsListener i : this.listeners) { 961 i.preLogArchive(p, newPath); 962 } 963 } 964 LOG.info("Archiving " + p + " to " + newPath); 965 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 966 throw new IOException("Unable to rename " + p + " to " + newPath); 967 } 968 // Tell our listeners that a log has been archived. 969 if (!this.listeners.isEmpty()) { 970 for (WALActionsListener i : this.listeners) { 971 i.postLogArchive(p, newPath); 972 } 973 } 974 } 975 976 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 977 int oldNumEntries = this.numEntries.getAndSet(0); 978 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 979 if (oldPath != null) { 980 this.walFile2Props.put(oldPath, 981 new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 982 this.totalLogSize.addAndGet(oldFileLen); 983 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 984 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 985 newPathString); 986 } else { 987 LOG.info("New WAL {}", newPathString); 988 } 989 } 990 991 private Span createSpan(String name) { 992 return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName); 993 } 994 995 /** 996 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 997 * <p/> 998 * <ul> 999 * <li>In the case of creating a new WAL, oldPath will be null.</li> 1000 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 1001 * </li> 1002 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 1003 * null.</li> 1004 * </ul> 1005 * @param oldPath may be null 1006 * @param newPath may be null 1007 * @param nextWriter may be null 1008 * @return the passed in <code>newPath</code> 1009 * @throws IOException if there is a problem flushing or closing the underlying FS 1010 */ 1011 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 1012 return TraceUtil.trace(() -> { 1013 doReplaceWriter(oldPath, newPath, nextWriter); 1014 return newPath; 1015 }, () -> createSpan("WAL.replaceWriter")); 1016 } 1017 1018 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 1019 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 1020 try { 1021 if (syncFuture != null) { 1022 if (closed) { 1023 throw new IOException("WAL has been closed"); 1024 } else { 1025 syncFuture.get(walSyncTimeoutNs); 1026 } 1027 } 1028 } catch (TimeoutIOException tioe) { 1029 throw new WALSyncTimeoutIOException(tioe); 1030 } catch (InterruptedException ie) { 1031 LOG.warn("Interrupted", ie); 1032 throw convertInterruptedExceptionToIOException(ie); 1033 } catch (ExecutionException e) { 1034 throw ensureIOException(e.getCause()); 1035 } 1036 } 1037 1038 private static IOException ensureIOException(final Throwable t) { 1039 return (t instanceof IOException) ? (IOException) t : new IOException(t); 1040 } 1041 1042 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 1043 Thread.currentThread().interrupt(); 1044 IOException ioe = new InterruptedIOException(); 1045 ioe.initCause(ie); 1046 return ioe; 1047 } 1048 1049 private W createCombinedWriter(W localWriter, Path localPath) 1050 throws IOException, CommonFSUtils.StreamLacksCapabilityException { 1051 // retry forever if we can not create the remote writer to prevent aborting the RS due to log 1052 // rolling error, unless the skipRemoteWal is set to true. 1053 // TODO: since for now we only have one thread doing log rolling, this may block the rolling for 1054 // other wals 1055 Path remoteWAL = new Path(remoteWALDir, localPath.getName()); 1056 for (int retry = 0;; retry++) { 1057 if (skipRemoteWAL) { 1058 return localWriter; 1059 } 1060 W remoteWriter; 1061 try { 1062 remoteWriter = createWriterInstance(remoteFs, remoteWAL); 1063 } catch (IOException e) { 1064 LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); 1065 try { 1066 Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); 1067 } catch (InterruptedException ie) { 1068 // restore the interrupt state 1069 Thread.currentThread().interrupt(); 1070 // must close local writer here otherwise no one will close it for us 1071 Closeables.close(localWriter, true); 1072 throw (IOException) new InterruptedIOException().initCause(ie); 1073 } 1074 continue; 1075 } 1076 return createCombinedWriter(localWriter, remoteWriter); 1077 } 1078 } 1079 1080 private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException { 1081 rollWriterLock.lock(); 1082 try { 1083 if (this.closed) { 1084 throw new WALClosedException("WAL has been closed"); 1085 } 1086 // Return if nothing to flush. 1087 if (!force && this.writer != null && this.numEntries.get() <= 0) { 1088 return null; 1089 } 1090 Map<byte[], List<byte[]>> regionsToFlush = null; 1091 try { 1092 Path oldPath = getOldPath(); 1093 Path newPath = getNewPath(); 1094 // Any exception from here on is catastrophic, non-recoverable, so we currently abort. 1095 W nextWriter = this.createWriterInstance(fs, newPath); 1096 if (remoteFs != null) { 1097 // create a remote wal if necessary 1098 nextWriter = createCombinedWriter(nextWriter, newPath); 1099 } 1100 tellListenersAboutPreLogRoll(oldPath, newPath); 1101 // NewPath could be equal to oldPath if replaceWriter fails. 1102 newPath = replaceWriter(oldPath, newPath, nextWriter); 1103 tellListenersAboutPostLogRoll(oldPath, newPath); 1104 if (LOG.isDebugEnabled()) { 1105 LOG.debug("Create new " + implClassName + " writer with pipeline: " 1106 + Arrays.toString(getPipeline())); 1107 } 1108 // We got a new writer, so reset the slow sync count 1109 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 1110 slowSyncCount.set(0); 1111 // Can we delete any of the old log files? 1112 if (getNumRolledLogFiles() > 0) { 1113 cleanOldLogs(); 1114 regionsToFlush = findRegionsToForceFlush(); 1115 } 1116 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 1117 // If the underlying FileSystem can't do what we ask, treat as IO failure, so 1118 // we'll abort. 1119 throw new IOException( 1120 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 1121 exception); 1122 } 1123 return regionsToFlush; 1124 } finally { 1125 rollWriterLock.unlock(); 1126 } 1127 } 1128 1129 @Override 1130 public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { 1131 return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter")); 1132 } 1133 1134 // public only until class moves to o.a.h.h.wal 1135 /** Returns the size of log files in use */ 1136 public long getLogFileSize() { 1137 return this.totalLogSize.get(); 1138 } 1139 1140 // public only until class moves to o.a.h.h.wal 1141 public void requestLogRoll() { 1142 requestLogRoll(ERROR); 1143 } 1144 1145 /** 1146 * Get the backing files associated with this WAL. 1147 * @return may be null if there are no files. 1148 */ 1149 FileStatus[] getFiles() throws IOException { 1150 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 1151 } 1152 1153 @Override 1154 public void shutdown() throws IOException { 1155 if (!shutdown.compareAndSet(false, true)) { 1156 return; 1157 } 1158 closed = true; 1159 // Tell our listeners that the log is closing 1160 if (!this.listeners.isEmpty()) { 1161 for (WALActionsListener i : this.listeners) { 1162 i.logCloseRequested(); 1163 } 1164 } 1165 1166 ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( 1167 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); 1168 1169 Future<Void> future = shutdownExecutor.submit(new Callable<Void>() { 1170 @Override 1171 public Void call() throws Exception { 1172 if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { 1173 try { 1174 doShutdown(); 1175 if (syncFutureCache != null) { 1176 syncFutureCache.clear(); 1177 } 1178 } finally { 1179 rollWriterLock.unlock(); 1180 } 1181 } else { 1182 throw new IOException("Waiting for rollWriterLock timeout"); 1183 } 1184 return null; 1185 } 1186 }); 1187 shutdownExecutor.shutdown(); 1188 1189 try { 1190 future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); 1191 } catch (InterruptedException e) { 1192 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1193 } catch (TimeoutException e) { 1194 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1195 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1196 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1197 + "\"", e); 1198 } catch (ExecutionException e) { 1199 if (e.getCause() instanceof IOException) { 1200 throw (IOException) e.getCause(); 1201 } else { 1202 throw new IOException(e.getCause()); 1203 } 1204 } finally { 1205 // in shutdown, we may call cleanOldLogs so shutdown this executor in the end. 1206 // In sync replication implementation, we may shut down a WAL without shutting down the whole 1207 // region server, if we shut down this executor earlier we may get reject execution exception 1208 // and abort the region server 1209 logArchiveExecutor.shutdown(); 1210 } 1211 // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still 1212 // have some pending archiving tasks not finished yet, and in close we may archive all the 1213 // remaining WAL files, there could be race if we do not wait for the background archive task 1214 // finish 1215 try { 1216 if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) { 1217 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1218 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1219 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1220 + "\""); 1221 } 1222 } catch (InterruptedException e) { 1223 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1224 } 1225 } 1226 1227 @Override 1228 public void close() throws IOException { 1229 shutdown(); 1230 final FileStatus[] files = getFiles(); 1231 if (null != files && 0 != files.length) { 1232 for (FileStatus file : files) { 1233 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 1234 // Tell our listeners that a log is going to be archived. 1235 if (!this.listeners.isEmpty()) { 1236 for (WALActionsListener i : this.listeners) { 1237 i.preLogArchive(file.getPath(), p); 1238 } 1239 } 1240 1241 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 1242 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 1243 } 1244 // Tell our listeners that a log was archived. 1245 if (!this.listeners.isEmpty()) { 1246 for (WALActionsListener i : this.listeners) { 1247 i.postLogArchive(file.getPath(), p); 1248 } 1249 } 1250 } 1251 LOG.debug( 1252 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 1253 } 1254 LOG.info("Closed WAL: " + toString()); 1255 } 1256 1257 /** Returns number of WALs currently in the process of closing. */ 1258 public int getInflightWALCloseCount() { 1259 return inflightWALClosures.size(); 1260 } 1261 1262 /** 1263 * updates the sequence number of a specific store. depending on the flag: replaces current seq 1264 * number if the given seq id is bigger, or even if it is lower than existing one 1265 */ 1266 @Override 1267 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 1268 boolean onlyIfGreater) { 1269 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 1270 } 1271 1272 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 1273 return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); 1274 } 1275 1276 protected boolean isLogRollRequested() { 1277 return rollRequested.get(); 1278 } 1279 1280 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 1281 // If we have already requested a roll, don't do it again 1282 // And only set rollRequested to true when there is a registered listener 1283 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 1284 for (WALActionsListener i : this.listeners) { 1285 i.logRollRequested(reason); 1286 } 1287 } 1288 } 1289 1290 long getUnflushedEntriesCount() { 1291 long highestSynced = this.highestSyncedTxid.get(); 1292 long highestUnsynced = this.highestUnsyncedTxid; 1293 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 1294 } 1295 1296 boolean isUnflushedEntries() { 1297 return getUnflushedEntriesCount() > 0; 1298 } 1299 1300 /** 1301 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 1302 */ 1303 protected void atHeadOfRingBufferEventHandlerAppend() { 1304 // Noop 1305 } 1306 1307 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 1308 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 1309 atHeadOfRingBufferEventHandlerAppend(); 1310 long start = EnvironmentEdgeManager.currentTime(); 1311 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 1312 long regionSequenceId = entry.getKey().getSequenceId(); 1313 1314 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1315 // region sequence id only, a region edit/sequence id that is not associated with an actual 1316 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1317 if (entry.getEdit().isEmpty()) { 1318 return false; 1319 } 1320 1321 // Coprocessor hook. 1322 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1323 if (!listeners.isEmpty()) { 1324 for (WALActionsListener i : listeners) { 1325 i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1326 } 1327 } 1328 doAppend(writer, entry); 1329 assert highestUnsyncedTxid < entry.getTxid(); 1330 highestUnsyncedTxid = entry.getTxid(); 1331 if (entry.isCloseRegion()) { 1332 // let's clean all the records of this region 1333 sequenceIdAccounting.onRegionClose(encodedRegionName); 1334 } else { 1335 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1336 entry.isInMemStore()); 1337 } 1338 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1339 // Update metrics. 1340 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1341 numEntries.incrementAndGet(); 1342 return true; 1343 } 1344 1345 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 1346 long len = 0; 1347 if (!listeners.isEmpty()) { 1348 for (Cell cell : e.getEdit().getCells()) { 1349 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 1350 } 1351 for (WALActionsListener listener : listeners) { 1352 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 1353 } 1354 } 1355 return len; 1356 } 1357 1358 protected final void postSync(long timeInNanos, int handlerSyncs) { 1359 if (timeInNanos > this.slowSyncNs) { 1360 String msg = new StringBuilder().append("Slow sync cost: ") 1361 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ") 1362 .append(Arrays.toString(getPipeline())).toString(); 1363 LOG.info(msg); 1364 if (timeInNanos > this.rollOnSyncNs) { 1365 // A single sync took too long. 1366 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1367 // effects. Here we have a single data point that indicates we should take immediate 1368 // action, so do so. 1369 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" 1370 + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" 1371 + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " 1372 + Arrays.toString(getPipeline())); 1373 requestLogRoll(SLOW_SYNC); 1374 } 1375 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1376 } 1377 if (!listeners.isEmpty()) { 1378 for (WALActionsListener listener : listeners) { 1379 listener.postSync(timeInNanos, handlerSyncs); 1380 } 1381 } 1382 } 1383 1384 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1385 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { 1386 if (this.closed) { 1387 throw new IOException( 1388 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1389 } 1390 MutableLong txidHolder = new MutableLong(); 1391 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1392 txidHolder.setValue(ringBuffer.next()); 1393 }); 1394 long txid = txidHolder.longValue(); 1395 ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null); 1396 try { 1397 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1398 entry.stampRegionSequenceId(we); 1399 ringBuffer.get(txid).load(entry); 1400 } finally { 1401 ringBuffer.publish(txid); 1402 } 1403 return txid; 1404 } 1405 1406 @Override 1407 public String toString() { 1408 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1409 } 1410 1411 /** 1412 * if the given {@code path} is being written currently, then return its length. 1413 * <p> 1414 * This is used by replication to prevent replicating unacked log entries. See 1415 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1416 */ 1417 @Override 1418 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1419 rollWriterLock.lock(); 1420 try { 1421 Path currentPath = getOldPath(); 1422 if (path.equals(currentPath)) { 1423 // Currently active path. 1424 W writer = this.writer; 1425 return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); 1426 } else { 1427 W temp = inflightWALClosures.get(path.getName()); 1428 if (temp != null) { 1429 // In the process of being closed, trailer bytes may or may not be flushed. 1430 // Ensuring that we read all the bytes in a file is critical for correctness of tailing 1431 // use cases like replication, see HBASE-25924/HBASE-25932. 1432 return OptionalLong.of(temp.getSyncedLength()); 1433 } 1434 // Log rolled successfully. 1435 return OptionalLong.empty(); 1436 } 1437 } finally { 1438 rollWriterLock.unlock(); 1439 } 1440 } 1441 1442 @Override 1443 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1444 return TraceUtil.trace(() -> append(info, key, edits, true), 1445 () -> createSpan("WAL.appendData")); 1446 } 1447 1448 @Override 1449 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1450 return TraceUtil.trace(() -> append(info, key, edits, false), 1451 () -> createSpan("WAL.appendMarker")); 1452 } 1453 1454 /** 1455 * Helper that marks the future as DONE and offers it back to the cache. 1456 */ 1457 protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { 1458 future.done(txid, t); 1459 syncFutureCache.offer(future); 1460 } 1461 1462 private static boolean waitingRoll(int epochAndState) { 1463 return (epochAndState & 1) != 0; 1464 } 1465 1466 private static boolean writerBroken(int epochAndState) { 1467 return ((epochAndState >>> 1) & 1) != 0; 1468 } 1469 1470 private static int epoch(int epochAndState) { 1471 return epochAndState >>> 2; 1472 } 1473 1474 // return whether we have successfully set readyForRolling to true. 1475 private boolean trySetReadyForRolling() { 1476 // Check without holding lock first. Usually we will just return here. 1477 // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe 1478 // to check them outside the consumeLock. 1479 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 1480 return false; 1481 } 1482 consumeLock.lock(); 1483 try { 1484 // 1. a roll is requested 1485 // 2. all out-going entries have been acked(we have confirmed above). 1486 if (waitingRoll(epochAndState)) { 1487 readyForRolling = true; 1488 readyForRollingCond.signalAll(); 1489 return true; 1490 } else { 1491 return false; 1492 } 1493 } finally { 1494 consumeLock.unlock(); 1495 } 1496 } 1497 1498 private void syncFailed(long epochWhenSync, Throwable error) { 1499 LOG.warn("sync failed", error); 1500 this.onException(epochWhenSync, error); 1501 } 1502 1503 private void onException(long epochWhenSync, Throwable error) { 1504 boolean shouldRequestLogRoll = true; 1505 consumeLock.lock(); 1506 try { 1507 int currentEpochAndState = epochAndState; 1508 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 1509 // this is not the previous writer which means we have already rolled the writer. 1510 // or this is still the current writer, but we have already marked it as broken and request 1511 // a roll. 1512 return; 1513 } 1514 this.epochAndState = currentEpochAndState | 0b10; 1515 if (waitingRoll(currentEpochAndState)) { 1516 readyForRolling = true; 1517 readyForRollingCond.signalAll(); 1518 // this means we have already in the middle of a rollWriter so just tell the roller thread 1519 // that you can continue without requesting an extra log roll. 1520 shouldRequestLogRoll = false; 1521 } 1522 } finally { 1523 consumeLock.unlock(); 1524 } 1525 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 1526 toWriteAppends.addFirst(iter.next()); 1527 } 1528 highestUnsyncedTxid = highestSyncedTxid.get(); 1529 if (shouldRequestLogRoll) { 1530 // request a roll. 1531 requestLogRoll(ERROR); 1532 } 1533 } 1534 1535 private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) { 1536 // Please see the last several comments on HBASE-22761, it is possible that we get a 1537 // syncCompleted which acks a previous sync request after we received a syncFailed on the same 1538 // writer. So here we will also check on the epoch and state, if the epoch has already been 1539 // changed, i.e, we have already rolled the writer, or the writer is already broken, we should 1540 // just skip here, to avoid mess up the state or accidentally release some WAL entries and 1541 // cause data corruption. 1542 // The syncCompleted call is on the critical write path, so we should try our best to make it 1543 // fast. So here we do not hold consumeLock, for increasing performance. It is safe because 1544 // there are only 3 possible situations: 1545 // 1. For normal case, the only place where we change epochAndState is when rolling the writer. 1546 // Before rolling actually happen, we will only change the state to waitingRoll which is another 1547 // bit than writerBroken, and when we actually change the epoch, we can make sure that there is 1548 // no outgoing sync request. So we will always pass the check here and there is no problem. 1549 // 2. The writer is broken, but we have not called syncFailed yet. In this case, since 1550 // syncFailed and syncCompleted are executed in the same thread, we will just face the same 1551 // situation with #1. 1552 // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are 1553 // only 2 possible situations: 1554 // a. we arrive before we actually roll the writer, then we will find out the writer is broken 1555 // and give up. 1556 // b. we arrive after we actually roll the writer, then we will find out the epoch is changed 1557 // and give up. 1558 // For both #a and #b, we do not need to hold the consumeLock as we will always update the 1559 // epochAndState as a whole. 1560 // So in general, for all the cases above, we do not need to hold the consumeLock. 1561 int epochAndState = this.epochAndState; 1562 if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { 1563 LOG.warn("Got a sync complete call after the writer is broken, skip"); 1564 return; 1565 } 1566 1567 if (processedTxid < highestSyncedTxid.get()) { 1568 return; 1569 } 1570 highestSyncedTxid.set(processedTxid); 1571 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 1572 FSWALEntry entry = iter.next(); 1573 if (entry.getTxid() <= processedTxid) { 1574 entry.release(); 1575 iter.remove(); 1576 } else { 1577 break; 1578 } 1579 } 1580 postSync(System.nanoTime() - startTimeNs, finishSync()); 1581 /** 1582 * This method is used to be compatible with the original logic of {@link FSHLog}. 1583 */ 1584 checkSlowSyncCount(); 1585 if (trySetReadyForRolling()) { 1586 // we have just finished a roll, then do not need to check for log rolling, the writer will be 1587 // closed soon. 1588 return; 1589 } 1590 // If we haven't already requested a roll, check if we have exceeded logrollsize 1591 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 1592 if (LOG.isDebugEnabled()) { 1593 LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() 1594 + ", logrollsize=" + logrollsize); 1595 } 1596 requestLogRoll(SIZE); 1597 } 1598 } 1599 1600 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 1601 // sync futures then just use the default one. 1602 private boolean isHsync(long beginTxid, long endTxid) { 1603 SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), 1604 new SyncFuture().reset(endTxid + 1, false)); 1605 if (futures.isEmpty()) { 1606 return useHsync; 1607 } 1608 for (SyncFuture future : futures) { 1609 if (future.isForceSync()) { 1610 return true; 1611 } 1612 } 1613 return false; 1614 } 1615 1616 private void sync(W writer) { 1617 fileLengthAtLastSync = writer.getLength(); 1618 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 1619 boolean shouldUseHsync = 1620 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 1621 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 1622 final long startTimeNs = System.nanoTime(); 1623 final long epoch = (long) epochAndState >>> 2L; 1624 addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid), 1625 (result, error) -> { 1626 if (error != null) { 1627 syncFailed(epoch, error); 1628 } else { 1629 long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result); 1630 syncCompleted(epoch, writer, syncedTxid, startTimeNs); 1631 } 1632 }, consumeExecutor); 1633 } 1634 1635 /** 1636 * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use 1637 * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling 1638 * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we 1639 * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as 1640 * successful syncedTxid. 1641 */ 1642 protected long getSyncedTxid(long processedTxid, long completableFutureResult) { 1643 return processedTxid; 1644 } 1645 1646 protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync, 1647 long txidWhenSyn); 1648 1649 private int finishSyncLowerThanTxid(long txid) { 1650 int finished = 0; 1651 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 1652 SyncFuture sync = iter.next(); 1653 if (sync.getTxid() <= txid) { 1654 markFutureDoneAndOffer(sync, txid, null); 1655 iter.remove(); 1656 finished++; 1657 } else { 1658 break; 1659 } 1660 } 1661 return finished; 1662 } 1663 1664 // try advancing the highestSyncedTxid as much as possible 1665 private int finishSync() { 1666 if (unackedAppends.isEmpty()) { 1667 // All outstanding appends have been acked. 1668 if (toWriteAppends.isEmpty()) { 1669 // Also no appends that wait to be written out, then just finished all pending syncs. 1670 long maxSyncTxid = highestSyncedTxid.get(); 1671 for (SyncFuture sync : syncFutures) { 1672 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 1673 markFutureDoneAndOffer(sync, maxSyncTxid, null); 1674 } 1675 highestSyncedTxid.set(maxSyncTxid); 1676 int finished = syncFutures.size(); 1677 syncFutures.clear(); 1678 return finished; 1679 } else { 1680 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 1681 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 1682 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 1683 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 1684 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 1685 long doneTxid = lowestUnprocessedAppendTxid - 1; 1686 highestSyncedTxid.set(doneTxid); 1687 return finishSyncLowerThanTxid(doneTxid); 1688 } 1689 } else { 1690 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 1691 // first unacked append minus 1. 1692 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 1693 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 1694 highestSyncedTxid.set(doneTxid); 1695 return finishSyncLowerThanTxid(doneTxid); 1696 } 1697 } 1698 1699 // confirm non-empty before calling 1700 private static long getLastTxid(Deque<FSWALEntry> queue) { 1701 return queue.peekLast().getTxid(); 1702 } 1703 1704 private void appendAndSync() throws IOException { 1705 final W writer = this.writer; 1706 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 1707 // finish some. 1708 finishSync(); 1709 long newHighestProcessedAppendTxid = -1L; 1710 // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single 1711 // threaded, this could save us some cycles 1712 boolean addedToUnackedAppends = false; 1713 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 1714 FSWALEntry entry = iter.next(); 1715 /** 1716 * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not 1717 * throw any IOException. 1718 */ 1719 boolean appended = appendEntry(writer, entry); 1720 newHighestProcessedAppendTxid = entry.getTxid(); 1721 iter.remove(); 1722 if (appended) { 1723 // This is possible, when we fail to sync, we will add the unackedAppends back to 1724 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 1725 if ( 1726 addedToUnackedAppends || unackedAppends.isEmpty() 1727 || getLastTxid(unackedAppends) < entry.getTxid() 1728 ) { 1729 unackedAppends.addLast(entry); 1730 addedToUnackedAppends = true; 1731 } 1732 // See HBASE-25905, here we need to make sure that, we will always write all the entries in 1733 // unackedAppends out. As the code in the consume method will assume that, the entries in 1734 // unackedAppends have all been sent out so if there is roll request and unackedAppends is 1735 // not empty, we could just return as later there will be a syncCompleted call to clear the 1736 // unackedAppends, or a syncFailed to lead us to another state. 1737 // There could be other ways to fix, such as changing the logic in the consume method, but 1738 // it will break the assumption and then (may) lead to a big refactoring. So here let's use 1739 // this way to fix first, can optimize later. 1740 if ( 1741 writer.getLength() - fileLengthAtLastSync >= batchSize 1742 && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) 1743 ) { 1744 break; 1745 } 1746 } 1747 } 1748 // if we have a newer transaction id, update it. 1749 // otherwise, use the previous transaction id. 1750 if (newHighestProcessedAppendTxid > 0) { 1751 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 1752 } else { 1753 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 1754 } 1755 1756 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 1757 // sync because buffer size limit. 1758 sync(writer); 1759 return; 1760 } 1761 if (writer.getLength() == fileLengthAtLastSync) { 1762 // we haven't written anything out, just advance the highestSyncedSequence since we may only 1763 // stamp some region sequence id. 1764 if (unackedAppends.isEmpty()) { 1765 highestSyncedTxid.set(highestProcessedAppendTxid); 1766 finishSync(); 1767 trySetReadyForRolling(); 1768 } 1769 return; 1770 } 1771 // reach here means that we have some unsynced data but haven't reached the batch size yet, 1772 // but we will not issue a sync directly here even if there are sync requests because we may 1773 // have some new data in the ringbuffer, so let's just return here and delay the decision of 1774 // whether to issue a sync in the caller method. 1775 } 1776 1777 private void consume() { 1778 consumeLock.lock(); 1779 try { 1780 int currentEpochAndState = epochAndState; 1781 if (writerBroken(currentEpochAndState)) { 1782 return; 1783 } 1784 if (waitingRoll(currentEpochAndState)) { 1785 if (writer.getLength() > fileLengthAtLastSync) { 1786 // issue a sync 1787 sync(writer); 1788 } else { 1789 if (unackedAppends.isEmpty()) { 1790 readyForRolling = true; 1791 readyForRollingCond.signalAll(); 1792 } 1793 } 1794 return; 1795 } 1796 } finally { 1797 consumeLock.unlock(); 1798 } 1799 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 1800 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 1801 <= cursorBound; nextCursor++) { 1802 if (!waitingConsumePayloads.isPublished(nextCursor)) { 1803 break; 1804 } 1805 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 1806 switch (truck.type()) { 1807 case APPEND: 1808 toWriteAppends.addLast(truck.unloadAppend()); 1809 break; 1810 case SYNC: 1811 syncFutures.add(truck.unloadSync()); 1812 break; 1813 default: 1814 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 1815 break; 1816 } 1817 waitingConsumePayloadsGatingSequence.set(nextCursor); 1818 } 1819 1820 /** 1821 * This method is used to be compatible with the original logic of {@link AsyncFSWAL}. 1822 */ 1823 if (markerEditOnly) { 1824 drainNonMarkerEditsAndFailSyncs(); 1825 } 1826 try { 1827 appendAndSync(); 1828 } catch (IOException exception) { 1829 /** 1830 * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't 1831 * go in here. 1832 */ 1833 LOG.error("appendAndSync throws IOException.", exception); 1834 onAppendEntryFailed(exception); 1835 return; 1836 } 1837 if (hasConsumerTask.get()) { 1838 return; 1839 } 1840 if (toWriteAppends.isEmpty()) { 1841 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 1842 consumerScheduled.set(false); 1843 // recheck here since in append and sync we do not hold the consumeLock. Thing may 1844 // happen like 1845 // 1. we check cursor, no new entry 1846 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 1847 // give up scheduling the consumer task. 1848 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 1849 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 1850 // we will give up consuming so if there are some unsynced data we need to issue a sync. 1851 if ( 1852 writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() 1853 && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync 1854 ) { 1855 // no new data in the ringbuffer and we have at least one sync request 1856 sync(writer); 1857 } 1858 return; 1859 } else { 1860 // maybe someone has grabbed this before us 1861 if (!consumerScheduled.compareAndSet(false, true)) { 1862 return; 1863 } 1864 } 1865 } 1866 } 1867 // reschedule if we still have something to write. 1868 consumeExecutor.execute(consumer); 1869 } 1870 1871 private boolean shouldScheduleConsumer() { 1872 int currentEpochAndState = epochAndState; 1873 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 1874 return false; 1875 } 1876 return consumerScheduled.compareAndSet(false, true); 1877 } 1878 1879 /** 1880 * Append a set of edits to the WAL. 1881 * <p/> 1882 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1883 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1884 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1885 * <p/> 1886 * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc 1887 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1888 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1889 * 'complete' the transaction this mvcc transaction by calling 1890 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1891 * in the finally of a try/finally block within which this appends lives and any subsequent 1892 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1893 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1894 * immediately available on return from this method. It WILL be available subsequent to a sync of 1895 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1896 * @param hri the regioninfo associated with append 1897 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1898 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1899 * sequence id that is after all currently appended edits. 1900 * @param inMemstore Always true except for case where we are writing a region event meta marker 1901 * edit, for example, a compaction completion record into the WAL or noting a 1902 * Region Open event. In these cases the entry is just so we can finish an 1903 * unfinished compaction after a crash when the new Server reads the WAL on 1904 * recovery, etc. These transition event 'Markers' do not go via the memstore. 1905 * When memstore is false, we presume a Marker event edit. 1906 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1907 * in it. 1908 */ 1909 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1910 throws IOException { 1911 if (markerEditOnly && !edits.isMetaEdit()) { 1912 throw new IOException("WAL is closing, only marker edit is allowed"); 1913 } 1914 long txid = 1915 stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); 1916 if (shouldScheduleConsumer()) { 1917 consumeExecutor.execute(consumer); 1918 } 1919 return txid; 1920 } 1921 1922 protected void doSync(boolean forceSync) throws IOException { 1923 long txid = waitingConsumePayloads.next(); 1924 SyncFuture future; 1925 try { 1926 future = getSyncFuture(txid, forceSync); 1927 RingBufferTruck truck = waitingConsumePayloads.get(txid); 1928 truck.load(future); 1929 } finally { 1930 waitingConsumePayloads.publish(txid); 1931 } 1932 if (shouldScheduleConsumer()) { 1933 consumeExecutor.execute(consumer); 1934 } 1935 blockOnSync(future); 1936 } 1937 1938 protected void doSync(long txid, boolean forceSync) throws IOException { 1939 if (highestSyncedTxid.get() >= txid) { 1940 return; 1941 } 1942 // here we do not use ring buffer sequence as txid 1943 long sequence = waitingConsumePayloads.next(); 1944 SyncFuture future; 1945 try { 1946 future = getSyncFuture(txid, forceSync); 1947 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 1948 truck.load(future); 1949 } finally { 1950 waitingConsumePayloads.publish(sequence); 1951 } 1952 if (shouldScheduleConsumer()) { 1953 consumeExecutor.execute(consumer); 1954 } 1955 blockOnSync(future); 1956 } 1957 1958 private void drainNonMarkerEditsAndFailSyncs() { 1959 if (toWriteAppends.isEmpty()) { 1960 return; 1961 } 1962 boolean hasNonMarkerEdits = false; 1963 Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator(); 1964 while (iter.hasNext()) { 1965 FSWALEntry entry = iter.next(); 1966 if (!entry.getEdit().isMetaEdit()) { 1967 entry.release(); 1968 hasNonMarkerEdits = true; 1969 break; 1970 } 1971 } 1972 if (hasNonMarkerEdits) { 1973 for (;;) { 1974 iter.remove(); 1975 if (!iter.hasNext()) { 1976 break; 1977 } 1978 iter.next().release(); 1979 } 1980 for (FSWALEntry entry : unackedAppends) { 1981 entry.release(); 1982 } 1983 unackedAppends.clear(); 1984 // fail the sync futures which are under the txid of the first remaining edit, if none, fail 1985 // all the sync futures. 1986 long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid(); 1987 IOException error = new IOException("WAL is closing, only marker edit is allowed"); 1988 for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) { 1989 SyncFuture future = syncIter.next(); 1990 if (future.getTxid() < txid) { 1991 markFutureDoneAndOffer(future, future.getTxid(), error); 1992 syncIter.remove(); 1993 } else { 1994 break; 1995 } 1996 } 1997 } 1998 } 1999 2000 protected abstract W createWriterInstance(FileSystem fs, Path path) 2001 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 2002 2003 protected abstract W createCombinedWriter(W localWriter, W remoteWriter); 2004 2005 protected final void waitForSafePoint() { 2006 consumeLock.lock(); 2007 try { 2008 int currentEpochAndState = epochAndState; 2009 if (writerBroken(currentEpochAndState) || this.writer == null) { 2010 return; 2011 } 2012 consumerScheduled.set(true); 2013 epochAndState = currentEpochAndState | 1; 2014 readyForRolling = false; 2015 consumeExecutor.execute(consumer); 2016 while (!readyForRolling) { 2017 readyForRollingCond.awaitUninterruptibly(); 2018 } 2019 } finally { 2020 consumeLock.unlock(); 2021 } 2022 } 2023 2024 protected final void closeWriter(W writer, Path path) { 2025 inflightWALClosures.put(path.getName(), writer); 2026 closeExecutor.execute(() -> { 2027 try { 2028 writer.close(); 2029 } catch (IOException e) { 2030 LOG.warn("close old writer failed", e); 2031 } finally { 2032 // call this even if the above close fails, as there is no other chance we can set closed to 2033 // true, it will not cause big problems. 2034 markClosedAndClean(path); 2035 inflightWALClosures.remove(path.getName()); 2036 } 2037 }); 2038 } 2039 2040 /** 2041 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 2042 * will begin to work before returning from this method. If we clear the flag after returning from 2043 * this call, we may miss a roll request. The implementation class should choose a proper place to 2044 * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you 2045 * start writing to the new writer. 2046 */ 2047 protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 2048 Preconditions.checkNotNull(nextWriter); 2049 waitForSafePoint(); 2050 /** 2051 * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}. 2052 */ 2053 doCleanUpResources(); 2054 // we will call rollWriter in init method, where we want to create the first writer and 2055 // obviously the previous writer is null, so here we need this null check. And why we must call 2056 // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after 2057 // closing the writer asynchronously, we need to make sure the WALProps is put into 2058 // walFile2Props before we call markClosedAndClean 2059 if (writer != null) { 2060 long oldFileLen = writer.getLength(); 2061 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 2062 closeWriter(writer, oldPath); 2063 } else { 2064 logRollAndSetupWalProps(oldPath, newPath, 0); 2065 } 2066 this.writer = nextWriter; 2067 /** 2068 * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem 2069 * output after writer is replaced. 2070 */ 2071 onWriterReplaced(nextWriter); 2072 this.fileLengthAtLastSync = nextWriter.getLength(); 2073 this.highestProcessedAppendTxidAtLastSync = 0L; 2074 consumeLock.lock(); 2075 try { 2076 consumerScheduled.set(true); 2077 int currentEpoch = epochAndState >>> 2; 2078 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 2079 // set a new epoch and also clear waitingRoll and writerBroken 2080 this.epochAndState = nextEpoch << 2; 2081 // Reset rollRequested status 2082 rollRequested.set(false); 2083 consumeExecutor.execute(consumer); 2084 } finally { 2085 consumeLock.unlock(); 2086 } 2087 } 2088 2089 protected abstract void onWriterReplaced(W nextWriter); 2090 2091 protected void doShutdown() throws IOException { 2092 waitForSafePoint(); 2093 /** 2094 * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}. 2095 */ 2096 doCleanUpResources(); 2097 if (this.writer != null) { 2098 closeWriter(this.writer, getOldPath()); 2099 this.writer = null; 2100 } 2101 closeExecutor.shutdown(); 2102 try { 2103 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 2104 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" 2105 + " the close of async writer doesn't complete." 2106 + "Please check the status of underlying filesystem" 2107 + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey 2108 + "\""); 2109 } 2110 } catch (InterruptedException e) { 2111 LOG.error("The wait for close of async writer is interrupted"); 2112 Thread.currentThread().interrupt(); 2113 } 2114 IOException error = new IOException("WAL has been closed"); 2115 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 2116 // drain all the pending sync requests 2117 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 2118 <= cursorBound; nextCursor++) { 2119 if (!waitingConsumePayloads.isPublished(nextCursor)) { 2120 break; 2121 } 2122 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 2123 switch (truck.type()) { 2124 case SYNC: 2125 syncFutures.add(truck.unloadSync()); 2126 break; 2127 default: 2128 break; 2129 } 2130 } 2131 // and fail them 2132 syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); 2133 if (this.shouldShutDownConsumeExecutorWhenClose) { 2134 consumeExecutor.shutdown(); 2135 } 2136 } 2137 2138 protected void doCleanUpResources() { 2139 }; 2140 2141 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 2142 2143 /** 2144 * This method gets the pipeline for the current WAL. 2145 */ 2146 abstract DatanodeInfo[] getPipeline(); 2147 2148 /** 2149 * This method gets the datanode replication count for the current WAL. 2150 */ 2151 abstract int getLogReplication(); 2152 2153 protected abstract boolean doCheckLogLowReplication(); 2154 2155 protected boolean isWriterBroken() { 2156 return writerBroken(epochAndState); 2157 } 2158 2159 private void onAppendEntryFailed(IOException exception) { 2160 LOG.warn("append entry failed", exception); 2161 final long currentEpoch = (long) epochAndState >>> 2L; 2162 this.onException(currentEpoch, exception); 2163 } 2164 2165 protected void checkSlowSyncCount() { 2166 } 2167 2168 /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ 2169 protected boolean doCheckSlowSync() { 2170 boolean result = false; 2171 long now = EnvironmentEdgeManager.currentTime(); 2172 long elapsedTime = now - lastTimeCheckSlowSync; 2173 if (elapsedTime >= slowSyncCheckInterval) { 2174 if (slowSyncCount.get() >= slowSyncRollThreshold) { 2175 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 2176 // If two or more slowSyncCheckInterval have elapsed this is a corner case 2177 // where a train of slow syncs almost triggered us but then there was a long 2178 // interval from then until the one more that pushed us over. If so, we 2179 // should do nothing and let the count reset. 2180 if (LOG.isDebugEnabled()) { 2181 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count=" 2182 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime=" 2183 + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms"); 2184 } 2185 // Fall through to count reset below 2186 } else { 2187 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" 2188 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: " 2189 + Arrays.toString(getPipeline())); 2190 result = true; 2191 } 2192 } 2193 lastTimeCheckSlowSync = now; 2194 slowSyncCount.set(0); 2195 } 2196 return result; 2197 } 2198 2199 public void checkLogLowReplication(long checkInterval) { 2200 long now = EnvironmentEdgeManager.currentTime(); 2201 if (now - lastTimeCheckLowReplication < checkInterval) { 2202 return; 2203 } 2204 // Will return immediately if we are in the middle of a WAL log roll currently. 2205 if (!rollWriterLock.tryLock()) { 2206 return; 2207 } 2208 try { 2209 lastTimeCheckLowReplication = now; 2210 if (doCheckLogLowReplication()) { 2211 requestLogRoll(LOW_REPLICATION); 2212 } 2213 } finally { 2214 rollWriterLock.unlock(); 2215 } 2216 } 2217 2218 // Allow temporarily skipping the creation of remote writer. When failing to write to the remote 2219 // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we 2220 // need to write a close marker when closing a region, and if it fails, the whole rs will abort. 2221 // So here we need to skip the creation of remote writer and make it possible to write the region 2222 // close marker. 2223 // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing 2224 // any pending wal entries as they will be discarded. The remote cluster will replicate the 2225 // correct data back later. We still need to allow writing marker edits such as close region event 2226 // to allow closing a region. 2227 @Override 2228 public void skipRemoteWAL(boolean markerEditOnly) { 2229 if (markerEditOnly) { 2230 this.markerEditOnly = true; 2231 } 2232 this.skipRemoteWAL = true; 2233 } 2234 2235 private static void split(final Configuration conf, final Path p) throws IOException { 2236 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 2237 if (!fs.exists(p)) { 2238 throw new FileNotFoundException(p.toString()); 2239 } 2240 if (!fs.getFileStatus(p).isDirectory()) { 2241 throw new IOException(p + " is not a directory"); 2242 } 2243 2244 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 2245 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 2246 if ( 2247 conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 2248 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR) 2249 ) { 2250 archiveDir = new Path(archiveDir, p.getName()); 2251 } 2252 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 2253 } 2254 2255 W getWriter() { 2256 return this.writer; 2257 } 2258 2259 private static void usage() { 2260 System.err.println("Usage: AbstractFSWAL <ARGS>"); 2261 System.err.println("Arguments:"); 2262 System.err.println(" --dump Dump textual representation of passed one or more files"); 2263 System.err.println(" For example: " 2264 + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 2265 System.err.println(" --split Split the passed directory of WAL logs"); 2266 System.err.println( 2267 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 2268 } 2269 2270 /** 2271 * Pass one or more log file names, and it will either dump out a text version on 2272 * <code>stdout</code> or split the specified log files. 2273 */ 2274 public static void main(String[] args) throws IOException { 2275 if (args.length < 2) { 2276 usage(); 2277 System.exit(-1); 2278 } 2279 // either dump using the WALPrettyPrinter or split, depending on args 2280 if (args[0].compareTo("--dump") == 0) { 2281 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 2282 } else if (args[0].compareTo("--perf") == 0) { 2283 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 2284 LOG.error(HBaseMarkers.FATAL, 2285 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 2286 System.exit(-1); 2287 } else if (args[0].compareTo("--split") == 0) { 2288 Configuration conf = HBaseConfiguration.create(); 2289 for (int i = 1; i < args.length; i++) { 2290 try { 2291 Path logPath = new Path(args[i]); 2292 CommonFSUtils.setFsDefault(conf, logPath); 2293 split(conf, logPath); 2294 } catch (IOException t) { 2295 t.printStackTrace(System.err); 2296 System.exit(-1); 2297 } 2298 } 2299 } else { 2300 usage(); 2301 System.exit(-1); 2302 } 2303 } 2304}