001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 023import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; 024import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 026import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 027 028import com.lmax.disruptor.RingBuffer; 029import io.opentelemetry.api.trace.Span; 030import java.io.FileNotFoundException; 031import java.io.IOException; 032import java.io.InterruptedIOException; 033import java.lang.management.MemoryType; 034import java.net.URLEncoder; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Comparator; 038import java.util.List; 039import java.util.Map; 040import java.util.OptionalLong; 041import java.util.Set; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentNavigableMap; 045import java.util.concurrent.ConcurrentSkipListMap; 046import java.util.concurrent.CopyOnWriteArrayList; 047import java.util.concurrent.ExecutionException; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.Future; 051import java.util.concurrent.LinkedBlockingQueue; 052import java.util.concurrent.ThreadPoolExecutor; 053import java.util.concurrent.TimeUnit; 054import java.util.concurrent.TimeoutException; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.concurrent.atomic.AtomicInteger; 057import java.util.concurrent.atomic.AtomicLong; 058import java.util.concurrent.locks.ReentrantLock; 059import org.apache.commons.lang3.mutable.MutableLong; 060import org.apache.hadoop.conf.Configuration; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.fs.PathFilter; 065import org.apache.hadoop.hbase.Abortable; 066import org.apache.hadoop.hbase.Cell; 067import org.apache.hadoop.hbase.HBaseConfiguration; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.PrivateCellUtil; 070import org.apache.hadoop.hbase.client.RegionInfo; 071import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 072import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 073import org.apache.hadoop.hbase.ipc.RpcServer; 074import org.apache.hadoop.hbase.ipc.ServerCall; 075import org.apache.hadoop.hbase.log.HBaseMarkers; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 078import org.apache.hadoop.hbase.trace.TraceUtil; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 084import org.apache.hadoop.hbase.wal.WAL; 085import org.apache.hadoop.hbase.wal.WALEdit; 086import org.apache.hadoop.hbase.wal.WALFactory; 087import org.apache.hadoop.hbase.wal.WALKeyImpl; 088import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 089import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 090import org.apache.hadoop.hbase.wal.WALSplitter; 091import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 092import org.apache.hadoop.util.StringUtils; 093import org.apache.yetus.audience.InterfaceAudience; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 098 099/** 100 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 101 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 102 * This is done internal to the implementation. 103 * <p> 104 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 105 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 106 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 107 * flushed out to hfiles, and what is yet in WAL and in memory only. 108 * <p> 109 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 110 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 111 * (smaller) than the most-recent flush. 112 * <p> 113 * To read an WAL, call 114 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * 115 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 116 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 117 * we have made successful appends to the WAL and we then are unable to sync them, our current 118 * semantic is to return error to the client that the appends failed but also to abort the current 119 * context, usually the hosting server. We need to replay the WALs. <br> 120 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 121 * that the append failed. <br> 122 * TODO: replication may pick up these last edits though they have been marked as failed append 123 * (Need to keep our own file lengths, not rely on HDFS). 124 */ 125@InterfaceAudience.Private 126public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 127 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 128 129 private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec"; 130 private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900; 131 /** Don't log blocking regions more frequently than this. */ 132 private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); 133 134 protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms"; 135 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 136 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 137 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 138 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 139 "hbase.regionserver.wal.slowsync.roll.threshold"; 140 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 141 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 142 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 143 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 144 145 public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 146 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 147 148 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 149 150 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 151 152 public static final String RING_BUFFER_SLOT_COUNT = 153 "hbase.regionserver.wal.disruptor.event.count"; 154 155 public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; 156 public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; 157 158 /** 159 * file system instance 160 */ 161 protected final FileSystem fs; 162 163 /** 164 * WAL directory, where all WAL files would be placed. 165 */ 166 protected final Path walDir; 167 168 /** 169 * dir path where old logs are kept. 170 */ 171 protected final Path walArchiveDir; 172 173 /** 174 * Matches just those wal files that belong to this wal instance. 175 */ 176 protected final PathFilter ourFiles; 177 178 /** 179 * Prefix of a WAL file, usually the region server name it is hosted on. 180 */ 181 protected final String walFilePrefix; 182 183 /** 184 * Suffix included on generated wal file names 185 */ 186 protected final String walFileSuffix; 187 188 /** 189 * Prefix used when checking for wal membership. 190 */ 191 protected final String prefixPathStr; 192 193 protected final WALCoprocessorHost coprocessorHost; 194 195 /** 196 * conf object 197 */ 198 protected final Configuration conf; 199 200 protected final Abortable abortable; 201 202 /** Listeners that are called on WAL events. */ 203 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 204 205 /** Tracks the logs in the process of being closed. */ 206 protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>(); 207 208 /** 209 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 210 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 211 * facility for answering questions such as "Is it safe to GC a WAL?". 212 */ 213 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 214 215 /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */ 216 protected final long slowSyncNs, rollOnSyncNs; 217 protected final int slowSyncRollThreshold; 218 protected final int slowSyncCheckInterval; 219 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 220 221 private final long walSyncTimeoutNs; 222 223 private final long walTooOldNs; 224 225 // If > than this size, roll the log. 226 protected final long logrollsize; 227 228 /** 229 * Block size to use writing files. 230 */ 231 protected final long blocksize; 232 233 /* 234 * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too 235 * many and we crash, then will take forever replaying. Keep the number of logs tidy. 236 */ 237 protected final int maxLogs; 238 239 protected final boolean useHsync; 240 241 /** 242 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 243 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 244 * warning when it thinks synchronized controls writer thread safety. It is held when we are 245 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 246 * not. 247 */ 248 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 249 250 // The timestamp (in ms) when the log file was created. 251 protected final AtomicLong filenum = new AtomicLong(-1); 252 253 // Number of transactions in the current Wal. 254 protected final AtomicInteger numEntries = new AtomicInteger(0); 255 256 /** 257 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 258 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 259 * corresponding entry in queue. 260 */ 261 protected volatile long highestUnsyncedTxid = -1; 262 263 /** 264 * Updated to the transaction id of the last successful sync call. This can be less than 265 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 266 * for it. 267 */ 268 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 269 270 /** 271 * The total size of wal 272 */ 273 protected final AtomicLong totalLogSize = new AtomicLong(0); 274 /** 275 * Current log file. 276 */ 277 volatile W writer; 278 279 // Last time to check low replication on hlog's pipeline 280 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 281 282 // Last time we asked to roll the log due to a slow sync 283 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 284 285 protected volatile boolean closed = false; 286 287 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 288 289 protected final long walShutdownTimeout; 290 291 private long nextLogTooOldNs = System.nanoTime(); 292 293 /** 294 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 295 * an IllegalArgumentException if used to compare paths from different wals. 296 */ 297 final Comparator<Path> LOG_NAME_COMPARATOR = 298 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 299 300 private static final class WalProps { 301 302 /** 303 * Map the encoded region name to the highest sequence id. 304 * <p/> 305 * Contains all the regions it has an entry for. 306 */ 307 public final Map<byte[], Long> encodedName2HighestSequenceId; 308 309 /** 310 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 311 * sub classes. 312 */ 313 public final long logSize; 314 315 /** 316 * The nanoTime of the log rolling, used to determine the time interval that has passed since. 317 */ 318 public final long rollTimeNs; 319 320 public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 321 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 322 this.logSize = logSize; 323 this.rollTimeNs = System.nanoTime(); 324 } 325 } 326 327 /** 328 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 329 * (contained in the log file name). 330 */ 331 protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = 332 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 333 334 /** 335 * A cache of sync futures reused by threads. 336 */ 337 protected final SyncFutureCache syncFutureCache; 338 339 /** 340 * The class name of the runtime implementation, used as prefix for logging/tracing. 341 * <p> 342 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 343 * refer to HBASE-17676 for more details 344 * </p> 345 */ 346 protected final String implClassName; 347 348 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 349 350 // Run in caller if we get reject execution exception, to avoid aborting region server when we get 351 // reject execution exception. Usually this should not happen but let's make it more robust. 352 private final ExecutorService logArchiveExecutor = 353 new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), 354 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), 355 new ThreadPoolExecutor.CallerRunsPolicy()); 356 357 private final int archiveRetries; 358 359 public long getFilenum() { 360 return this.filenum.get(); 361 } 362 363 /** 364 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 365 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 366 * the filename is created with the {@link #computeFilename(long filenum)} method. 367 * @return timestamp, as in the log file name. 368 */ 369 protected long getFileNumFromFileName(Path fileName) { 370 checkNotNull(fileName, "file name can't be null"); 371 if (!ourFiles.accept(fileName)) { 372 throw new IllegalArgumentException( 373 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 374 } 375 final String fileNameString = fileName.toString(); 376 String chompedPath = fileNameString.substring(prefixPathStr.length(), 377 (fileNameString.length() - walFileSuffix.length())); 378 return Long.parseLong(chompedPath); 379 } 380 381 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 382 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 383 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 384 } 385 386 // must be power of 2 387 protected final int getPreallocatedEventCount() { 388 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 389 // be stuck and make no progress if the buffer is filled with appends only and there is no 390 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 391 // before they return. 392 int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 393 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 394 int floor = Integer.highestOneBit(preallocatedEventCount); 395 if (floor == preallocatedEventCount) { 396 return floor; 397 } 398 // max capacity is 1 << 30 399 if (floor >= 1 << 29) { 400 return 1 << 30; 401 } 402 return floor << 1; 403 } 404 405 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 406 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 407 final boolean failIfWALExists, final String prefix, final String suffix) 408 throws FailedLogCloseException, IOException { 409 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 410 } 411 412 protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, 413 final String logDir, final String archiveDir, final Configuration conf, 414 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 415 final String suffix) throws FailedLogCloseException, IOException { 416 this.fs = fs; 417 this.walDir = new Path(rootDir, logDir); 418 this.walArchiveDir = new Path(rootDir, archiveDir); 419 this.conf = conf; 420 this.abortable = abortable; 421 422 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 423 throw new IOException("Unable to mkdir " + walDir); 424 } 425 426 if (!fs.exists(this.walArchiveDir)) { 427 if (!fs.mkdirs(this.walArchiveDir)) { 428 throw new IOException("Unable to mkdir " + this.walArchiveDir); 429 } 430 } 431 432 // If prefix is null||empty then just name it wal 433 this.walFilePrefix = 434 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 435 // we only correctly differentiate suffices when numeric ones start with '.' 436 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 437 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER 438 + "' but instead was '" + suffix + "'"); 439 } 440 // Now that it exists, set the storage policy for the entire directory of wal files related to 441 // this FSHLog instance 442 String storagePolicy = 443 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 444 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 445 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 446 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 447 448 this.ourFiles = new PathFilter() { 449 @Override 450 public boolean accept(final Path fileName) { 451 // The path should start with dir/<prefix> and end with our suffix 452 final String fileNameString = fileName.toString(); 453 if (!fileNameString.startsWith(prefixPathStr)) { 454 return false; 455 } 456 if (walFileSuffix.isEmpty()) { 457 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 458 return org.apache.commons.lang3.StringUtils 459 .isNumeric(fileNameString.substring(prefixPathStr.length())); 460 } else if (!fileNameString.endsWith(walFileSuffix)) { 461 return false; 462 } 463 return true; 464 } 465 }; 466 467 if (failIfWALExists) { 468 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 469 if (null != walFiles && 0 != walFiles.length) { 470 throw new IOException("Target WAL already exists within directory " + walDir); 471 } 472 } 473 474 // Register listeners. TODO: Should this exist anymore? We have CPs? 475 if (listeners != null) { 476 for (WALActionsListener i : listeners) { 477 registerWALActionsListener(i); 478 } 479 } 480 this.coprocessorHost = new WALCoprocessorHost(this, conf); 481 482 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 483 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 484 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 485 // the block size but experience from the field has it that this was not enough time for the 486 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 487 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 488 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 489 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 490 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 491 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 492 this.logrollsize = (long) (this.blocksize * multiplier); 493 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 494 495 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" 496 + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" 497 + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir 498 + ", maxLogs=" + this.maxLogs); 499 this.slowSyncNs = 500 TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS)); 501 this.rollOnSyncNs = TimeUnit.MILLISECONDS 502 .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); 503 this.slowSyncRollThreshold = 504 conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 505 this.slowSyncCheckInterval = 506 conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 507 this.walSyncTimeoutNs = 508 TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS)); 509 this.syncFutureCache = new SyncFutureCache(conf); 510 this.implClassName = getClass().getSimpleName(); 511 this.walTooOldNs = TimeUnit.SECONDS 512 .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); 513 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 514 archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); 515 this.walShutdownTimeout = 516 conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); 517 } 518 519 /** 520 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 521 */ 522 public void init() throws IOException { 523 rollWriter(); 524 } 525 526 @Override 527 public void registerWALActionsListener(WALActionsListener listener) { 528 this.listeners.add(listener); 529 } 530 531 @Override 532 public boolean unregisterWALActionsListener(WALActionsListener listener) { 533 return this.listeners.remove(listener); 534 } 535 536 @Override 537 public WALCoprocessorHost getCoprocessorHost() { 538 return coprocessorHost; 539 } 540 541 @Override 542 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 543 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 544 } 545 546 @Override 547 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 548 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 549 } 550 551 @Override 552 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 553 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 554 } 555 556 @Override 557 public void abortCacheFlush(byte[] encodedRegionName) { 558 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 559 } 560 561 @Override 562 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 563 // Used by tests. Deprecated as too subtle for general usage. 564 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 565 } 566 567 @Override 568 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 569 // This method is used by tests and for figuring if we should flush or not because our 570 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 571 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 572 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 573 // currently flushing sequence ids, and if anything found there, it is returning these. This is 574 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 575 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 576 // id is old even though we are currently flushing. This may mean we do too much flushing. 577 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 578 } 579 580 @Override 581 public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { 582 return rollWriter(false); 583 } 584 585 @Override 586 public final void sync() throws IOException { 587 sync(useHsync); 588 } 589 590 @Override 591 public final void sync(long txid) throws IOException { 592 sync(txid, useHsync); 593 } 594 595 @Override 596 public final void sync(boolean forceSync) throws IOException { 597 TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync")); 598 } 599 600 @Override 601 public final void sync(long txid, boolean forceSync) throws IOException { 602 TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); 603 } 604 605 protected abstract void doSync(boolean forceSync) throws IOException; 606 607 protected abstract void doSync(long txid, boolean forceSync) throws IOException; 608 609 /** 610 * This is a convenience method that computes a new filename with a given file-number. 611 * @param filenum to use 612 */ 613 protected Path computeFilename(final long filenum) { 614 if (filenum < 0) { 615 throw new RuntimeException("WAL file number can't be < 0"); 616 } 617 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 618 return new Path(walDir, child); 619 } 620 621 /** 622 * This is a convenience method that computes a new filename with a given using the current WAL 623 * file-number 624 */ 625 public Path getCurrentFileName() { 626 return computeFilename(this.filenum.get()); 627 } 628 629 /** 630 * retrieve the next path to use for writing. Increments the internal filenum. 631 */ 632 private Path getNewPath() throws IOException { 633 this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime())); 634 Path newPath = getCurrentFileName(); 635 return newPath; 636 } 637 638 public Path getOldPath() { 639 long currentFilenum = this.filenum.get(); 640 Path oldPath = null; 641 if (currentFilenum > 0) { 642 // ComputeFilename will take care of meta wal filename 643 oldPath = computeFilename(currentFilenum); 644 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 645 return oldPath; 646 } 647 648 /** 649 * Tell listeners about pre log roll. 650 */ 651 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 652 throws IOException { 653 coprocessorHost.preWALRoll(oldPath, newPath); 654 655 if (!this.listeners.isEmpty()) { 656 for (WALActionsListener i : this.listeners) { 657 i.preLogRoll(oldPath, newPath); 658 } 659 } 660 } 661 662 /** 663 * Tell listeners about post log roll. 664 */ 665 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 666 throws IOException { 667 if (!this.listeners.isEmpty()) { 668 for (WALActionsListener i : this.listeners) { 669 i.postLogRoll(oldPath, newPath); 670 } 671 } 672 673 coprocessorHost.postWALRoll(oldPath, newPath); 674 } 675 676 // public only until class moves to o.a.h.h.wal 677 /** Returns the number of rolled log files */ 678 public int getNumRolledLogFiles() { 679 return walFile2Props.size(); 680 } 681 682 // public only until class moves to o.a.h.h.wal 683 /** Returns the number of log files in use */ 684 public int getNumLogFiles() { 685 // +1 for current use log 686 return getNumRolledLogFiles() + 1; 687 } 688 689 /** 690 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the 691 * first (oldest) WAL, and return those regions which should be flushed so that it can be 692 * let-go/'archived'. 693 * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file. 694 */ 695 Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { 696 Map<byte[], List<byte[]>> regions = null; 697 int logCount = getNumRolledLogFiles(); 698 if (logCount > this.maxLogs && logCount > 0) { 699 Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); 700 regions = 701 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 702 } 703 if (regions != null) { 704 List<String> listForPrint = new ArrayList<>(); 705 for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { 706 StringBuilder families = new StringBuilder(); 707 for (int i = 0; i < r.getValue().size(); i++) { 708 if (i > 0) { 709 families.append(","); 710 } 711 families.append(Bytes.toString(r.getValue().get(i))); 712 } 713 listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); 714 } 715 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs 716 + "; forcing (partial) flush of " + regions.size() + " region(s): " 717 + StringUtils.join(",", listForPrint)); 718 } 719 return regions; 720 } 721 722 /** 723 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 724 */ 725 private void cleanOldLogs() throws IOException { 726 List<Pair<Path, Long>> logsToArchive = null; 727 long now = System.nanoTime(); 728 boolean mayLogTooOld = nextLogTooOldNs <= now; 729 ArrayList<byte[]> regionsBlockingWal = null; 730 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 731 // are older than what is currently in memory, the WAL can be GC'd. 732 for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) { 733 Path log = e.getKey(); 734 ArrayList<byte[]> regionsBlockingThisWal = null; 735 long ageNs = now - e.getValue().rollTimeNs; 736 if (ageNs > walTooOldNs) { 737 if (mayLogTooOld && regionsBlockingWal == null) { 738 regionsBlockingWal = new ArrayList<>(); 739 } 740 regionsBlockingThisWal = regionsBlockingWal; 741 } 742 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 743 if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) { 744 if (logsToArchive == null) { 745 logsToArchive = new ArrayList<>(); 746 } 747 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 748 if (LOG.isTraceEnabled()) { 749 LOG.trace("WAL file ready for archiving " + log); 750 } 751 } else if (regionsBlockingThisWal != null) { 752 StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ") 753 .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: "); 754 boolean isFirst = true; 755 for (byte[] region : regionsBlockingThisWal) { 756 if (!isFirst) { 757 sb.append("; "); 758 } 759 isFirst = false; 760 sb.append(Bytes.toString(region)); 761 } 762 LOG.warn(sb.toString()); 763 nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS; 764 regionsBlockingThisWal.clear(); 765 } 766 } 767 768 if (logsToArchive != null) { 769 final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; 770 // make it async 771 for (Pair<Path, Long> log : localLogsToArchive) { 772 logArchiveExecutor.execute(() -> { 773 archive(log); 774 }); 775 this.walFile2Props.remove(log.getFirst()); 776 } 777 } 778 } 779 780 protected void archive(final Pair<Path, Long> log) { 781 totalLogSize.addAndGet(-log.getSecond()); 782 int retry = 1; 783 while (true) { 784 try { 785 archiveLogFile(log.getFirst()); 786 // successful 787 break; 788 } catch (Throwable e) { 789 if (retry > archiveRetries) { 790 LOG.error("Failed log archiving for the log {},", log.getFirst(), e); 791 if (this.abortable != null) { 792 this.abortable.abort("Failed log archiving", e); 793 break; 794 } 795 } else { 796 LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e); 797 } 798 retry++; 799 } 800 } 801 } 802 803 /* 804 * only public so WALSplitter can use. 805 * @return archived location of a WAL file with the given path p 806 */ 807 public static Path getWALArchivePath(Path archiveDir, Path p) { 808 return new Path(archiveDir, p.getName()); 809 } 810 811 protected void archiveLogFile(final Path p) throws IOException { 812 Path newPath = getWALArchivePath(this.walArchiveDir, p); 813 // Tell our listeners that a log is going to be archived. 814 if (!this.listeners.isEmpty()) { 815 for (WALActionsListener i : this.listeners) { 816 i.preLogArchive(p, newPath); 817 } 818 } 819 LOG.info("Archiving " + p + " to " + newPath); 820 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 821 throw new IOException("Unable to rename " + p + " to " + newPath); 822 } 823 // Tell our listeners that a log has been archived. 824 if (!this.listeners.isEmpty()) { 825 for (WALActionsListener i : this.listeners) { 826 i.postLogArchive(p, newPath); 827 } 828 } 829 } 830 831 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 832 int oldNumEntries = this.numEntries.getAndSet(0); 833 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 834 if (oldPath != null) { 835 this.walFile2Props.put(oldPath, 836 new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 837 this.totalLogSize.addAndGet(oldFileLen); 838 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 839 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 840 newPathString); 841 } else { 842 LOG.info("New WAL {}", newPathString); 843 } 844 } 845 846 private Span createSpan(String name) { 847 return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName); 848 } 849 850 /** 851 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 852 * <p/> 853 * <ul> 854 * <li>In the case of creating a new WAL, oldPath will be null.</li> 855 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 856 * </li> 857 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 858 * null.</li> 859 * </ul> 860 * @param oldPath may be null 861 * @param newPath may be null 862 * @param nextWriter may be null 863 * @return the passed in <code>newPath</code> 864 * @throws IOException if there is a problem flushing or closing the underlying FS 865 */ 866 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 867 return TraceUtil.trace(() -> { 868 doReplaceWriter(oldPath, newPath, nextWriter); 869 return newPath; 870 }, () -> createSpan("WAL.replaceWriter")); 871 } 872 873 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 874 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 875 try { 876 if (syncFuture != null) { 877 if (closed) { 878 throw new IOException("WAL has been closed"); 879 } else { 880 syncFuture.get(walSyncTimeoutNs); 881 } 882 } 883 } catch (TimeoutIOException tioe) { 884 throw new WALSyncTimeoutIOException(tioe); 885 } catch (InterruptedException ie) { 886 LOG.warn("Interrupted", ie); 887 throw convertInterruptedExceptionToIOException(ie); 888 } catch (ExecutionException e) { 889 throw ensureIOException(e.getCause()); 890 } 891 } 892 893 private static IOException ensureIOException(final Throwable t) { 894 return (t instanceof IOException) ? (IOException) t : new IOException(t); 895 } 896 897 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 898 Thread.currentThread().interrupt(); 899 IOException ioe = new InterruptedIOException(); 900 ioe.initCause(ie); 901 return ioe; 902 } 903 904 private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException { 905 rollWriterLock.lock(); 906 try { 907 if (this.closed) { 908 throw new WALClosedException("WAL has been closed"); 909 } 910 // Return if nothing to flush. 911 if (!force && this.writer != null && this.numEntries.get() <= 0) { 912 return null; 913 } 914 Map<byte[], List<byte[]>> regionsToFlush = null; 915 try { 916 Path oldPath = getOldPath(); 917 Path newPath = getNewPath(); 918 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 919 W nextWriter = this.createWriterInstance(newPath); 920 tellListenersAboutPreLogRoll(oldPath, newPath); 921 // NewPath could be equal to oldPath if replaceWriter fails. 922 newPath = replaceWriter(oldPath, newPath, nextWriter); 923 tellListenersAboutPostLogRoll(oldPath, newPath); 924 if (LOG.isDebugEnabled()) { 925 LOG.debug("Create new " + implClassName + " writer with pipeline: " 926 + Arrays.toString(getPipeline())); 927 } 928 // We got a new writer, so reset the slow sync count 929 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 930 slowSyncCount.set(0); 931 // Can we delete any of the old log files? 932 if (getNumRolledLogFiles() > 0) { 933 cleanOldLogs(); 934 regionsToFlush = findRegionsToForceFlush(); 935 } 936 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 937 // If the underlying FileSystem can't do what we ask, treat as IO failure so 938 // we'll abort. 939 throw new IOException( 940 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 941 exception); 942 } 943 return regionsToFlush; 944 } finally { 945 rollWriterLock.unlock(); 946 } 947 } 948 949 @Override 950 public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { 951 return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter")); 952 } 953 954 // public only until class moves to o.a.h.h.wal 955 /** Returns the size of log files in use */ 956 public long getLogFileSize() { 957 return this.totalLogSize.get(); 958 } 959 960 // public only until class moves to o.a.h.h.wal 961 public void requestLogRoll() { 962 requestLogRoll(ERROR); 963 } 964 965 /** 966 * Get the backing files associated with this WAL. 967 * @return may be null if there are no files. 968 */ 969 FileStatus[] getFiles() throws IOException { 970 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 971 } 972 973 @Override 974 public void shutdown() throws IOException { 975 if (!shutdown.compareAndSet(false, true)) { 976 return; 977 } 978 closed = true; 979 // Tell our listeners that the log is closing 980 if (!this.listeners.isEmpty()) { 981 for (WALActionsListener i : this.listeners) { 982 i.logCloseRequested(); 983 } 984 } 985 986 ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( 987 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); 988 989 Future<Void> future = shutdownExecutor.submit(new Callable<Void>() { 990 @Override 991 public Void call() throws Exception { 992 if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { 993 try { 994 doShutdown(); 995 if (syncFutureCache != null) { 996 syncFutureCache.clear(); 997 } 998 } finally { 999 rollWriterLock.unlock(); 1000 } 1001 } else { 1002 throw new IOException("Waiting for rollWriterLock timeout"); 1003 } 1004 return null; 1005 } 1006 }); 1007 shutdownExecutor.shutdown(); 1008 1009 try { 1010 future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); 1011 } catch (InterruptedException e) { 1012 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1013 } catch (TimeoutException e) { 1014 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1015 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1016 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1017 + "\"", e); 1018 } catch (ExecutionException e) { 1019 if (e.getCause() instanceof IOException) { 1020 throw (IOException) e.getCause(); 1021 } else { 1022 throw new IOException(e.getCause()); 1023 } 1024 } finally { 1025 // in shutdown we may call cleanOldLogs so shutdown this executor in the end. 1026 // In sync replication implementation, we may shutdown a WAL without shutting down the whole 1027 // region server, if we shutdown this executor earlier we may get reject execution exception 1028 // and abort the region server 1029 logArchiveExecutor.shutdown(); 1030 } 1031 } 1032 1033 @Override 1034 public void close() throws IOException { 1035 shutdown(); 1036 final FileStatus[] files = getFiles(); 1037 if (null != files && 0 != files.length) { 1038 for (FileStatus file : files) { 1039 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 1040 // Tell our listeners that a log is going to be archived. 1041 if (!this.listeners.isEmpty()) { 1042 for (WALActionsListener i : this.listeners) { 1043 i.preLogArchive(file.getPath(), p); 1044 } 1045 } 1046 1047 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 1048 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 1049 } 1050 // Tell our listeners that a log was archived. 1051 if (!this.listeners.isEmpty()) { 1052 for (WALActionsListener i : this.listeners) { 1053 i.postLogArchive(file.getPath(), p); 1054 } 1055 } 1056 } 1057 LOG.debug( 1058 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 1059 } 1060 LOG.info("Closed WAL: " + toString()); 1061 } 1062 1063 /** Returns number of WALs currently in the process of closing. */ 1064 public int getInflightWALCloseCount() { 1065 return inflightWALClosures.size(); 1066 } 1067 1068 /** 1069 * updates the sequence number of a specific store. depending on the flag: replaces current seq 1070 * number if the given seq id is bigger, or even if it is lower than existing one 1071 */ 1072 @Override 1073 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 1074 boolean onlyIfGreater) { 1075 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 1076 } 1077 1078 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 1079 return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); 1080 } 1081 1082 protected boolean isLogRollRequested() { 1083 return rollRequested.get(); 1084 } 1085 1086 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 1087 // If we have already requested a roll, don't do it again 1088 // And only set rollRequested to true when there is a registered listener 1089 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 1090 for (WALActionsListener i : this.listeners) { 1091 i.logRollRequested(reason); 1092 } 1093 } 1094 } 1095 1096 long getUnflushedEntriesCount() { 1097 long highestSynced = this.highestSyncedTxid.get(); 1098 long highestUnsynced = this.highestUnsyncedTxid; 1099 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 1100 } 1101 1102 boolean isUnflushedEntries() { 1103 return getUnflushedEntriesCount() > 0; 1104 } 1105 1106 /** 1107 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 1108 */ 1109 protected void atHeadOfRingBufferEventHandlerAppend() { 1110 // Noop 1111 } 1112 1113 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 1114 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 1115 atHeadOfRingBufferEventHandlerAppend(); 1116 long start = EnvironmentEdgeManager.currentTime(); 1117 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 1118 long regionSequenceId = entry.getKey().getSequenceId(); 1119 1120 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1121 // region sequence id only, a region edit/sequence id that is not associated with an actual 1122 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1123 if (entry.getEdit().isEmpty()) { 1124 return false; 1125 } 1126 1127 // Coprocessor hook. 1128 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1129 if (!listeners.isEmpty()) { 1130 for (WALActionsListener i : listeners) { 1131 i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1132 } 1133 } 1134 doAppend(writer, entry); 1135 assert highestUnsyncedTxid < entry.getTxid(); 1136 highestUnsyncedTxid = entry.getTxid(); 1137 if (entry.isCloseRegion()) { 1138 // let's clean all the records of this region 1139 sequenceIdAccounting.onRegionClose(encodedRegionName); 1140 } else { 1141 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1142 entry.isInMemStore()); 1143 } 1144 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1145 // Update metrics. 1146 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1147 numEntries.incrementAndGet(); 1148 return true; 1149 } 1150 1151 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 1152 long len = 0; 1153 if (!listeners.isEmpty()) { 1154 for (Cell cell : e.getEdit().getCells()) { 1155 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 1156 } 1157 for (WALActionsListener listener : listeners) { 1158 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 1159 } 1160 } 1161 return len; 1162 } 1163 1164 protected final void postSync(long timeInNanos, int handlerSyncs) { 1165 if (timeInNanos > this.slowSyncNs) { 1166 String msg = new StringBuilder().append("Slow sync cost: ") 1167 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ") 1168 .append(Arrays.toString(getPipeline())).toString(); 1169 LOG.info(msg); 1170 if (timeInNanos > this.rollOnSyncNs) { 1171 // A single sync took too long. 1172 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1173 // effects. Here we have a single data point that indicates we should take immediate 1174 // action, so do so. 1175 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" 1176 + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" 1177 + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " 1178 + Arrays.toString(getPipeline())); 1179 requestLogRoll(SLOW_SYNC); 1180 } 1181 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1182 } 1183 if (!listeners.isEmpty()) { 1184 for (WALActionsListener listener : listeners) { 1185 listener.postSync(timeInNanos, handlerSyncs); 1186 } 1187 } 1188 } 1189 1190 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1191 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { 1192 if (this.closed) { 1193 throw new IOException( 1194 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1195 } 1196 MutableLong txidHolder = new MutableLong(); 1197 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1198 txidHolder.setValue(ringBuffer.next()); 1199 }); 1200 long txid = txidHolder.longValue(); 1201 ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null); 1202 try { 1203 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1204 entry.stampRegionSequenceId(we); 1205 ringBuffer.get(txid).load(entry); 1206 } finally { 1207 ringBuffer.publish(txid); 1208 } 1209 return txid; 1210 } 1211 1212 @Override 1213 public String toString() { 1214 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1215 } 1216 1217 /** 1218 * if the given {@code path} is being written currently, then return its length. 1219 * <p> 1220 * This is used by replication to prevent replicating unacked log entries. See 1221 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1222 */ 1223 @Override 1224 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1225 rollWriterLock.lock(); 1226 try { 1227 Path currentPath = getOldPath(); 1228 if (path.equals(currentPath)) { 1229 // Currently active path. 1230 W writer = this.writer; 1231 return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); 1232 } else { 1233 W temp = inflightWALClosures.get(path.getName()); 1234 if (temp != null) { 1235 // In the process of being closed, trailer bytes may or may not be flushed. 1236 // Ensuring that we read all the bytes in a file is critical for correctness of tailing 1237 // use cases like replication, see HBASE-25924/HBASE-25932. 1238 return OptionalLong.of(temp.getSyncedLength()); 1239 } 1240 // Log rolled successfully. 1241 return OptionalLong.empty(); 1242 } 1243 } finally { 1244 rollWriterLock.unlock(); 1245 } 1246 } 1247 1248 @Override 1249 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1250 return TraceUtil.trace(() -> append(info, key, edits, true), 1251 () -> createSpan("WAL.appendData")); 1252 } 1253 1254 @Override 1255 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1256 return TraceUtil.trace(() -> append(info, key, edits, false), 1257 () -> createSpan("WAL.appendMarker")); 1258 } 1259 1260 /** 1261 * Append a set of edits to the WAL. 1262 * <p/> 1263 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1264 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1265 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1266 * <p/> 1267 * NOTE: This append, at a time that is usually after this call returns, starts an mvcc 1268 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1269 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1270 * 'complete' the transaction this mvcc transaction by calling 1271 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1272 * in the finally of a try/finally block within which this append lives and any subsequent 1273 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1274 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1275 * immediately available on return from this method. It WILL be available subsequent to a sync of 1276 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1277 * @param info the regioninfo associated with append 1278 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1279 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1280 * sequence id that is after all currently appended edits. 1281 * @param inMemstore Always true except for case where we are writing a region event meta marker 1282 * edit, for example, a compaction completion record into the WAL or noting a 1283 * Region Open event. In these cases the entry is just so we can finish an 1284 * unfinished compaction after a crash when the new Server reads the WAL on 1285 * recovery, etc. These transition event 'Markers' do not go via the memstore. 1286 * When memstore is false, we presume a Marker event edit. 1287 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1288 * in it. 1289 */ 1290 protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1291 throws IOException; 1292 1293 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1294 1295 protected abstract W createWriterInstance(Path path) 1296 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1297 1298 /** 1299 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1300 * will begin to work before returning from this method. If we clear the flag after returning from 1301 * this call, we may miss a roll request. The implementation class should choose a proper place to 1302 * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you 1303 * start writing to the new writer. 1304 */ 1305 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1306 throws IOException; 1307 1308 protected abstract void doShutdown() throws IOException; 1309 1310 protected abstract boolean doCheckLogLowReplication(); 1311 1312 /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ 1313 protected boolean doCheckSlowSync() { 1314 boolean result = false; 1315 long now = EnvironmentEdgeManager.currentTime(); 1316 long elapsedTime = now - lastTimeCheckSlowSync; 1317 if (elapsedTime >= slowSyncCheckInterval) { 1318 if (slowSyncCount.get() >= slowSyncRollThreshold) { 1319 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 1320 // If two or more slowSyncCheckInterval have elapsed this is a corner case 1321 // where a train of slow syncs almost triggered us but then there was a long 1322 // interval from then until the one more that pushed us over. If so, we 1323 // should do nothing and let the count reset. 1324 if (LOG.isDebugEnabled()) { 1325 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count=" 1326 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime=" 1327 + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms"); 1328 } 1329 // Fall through to count reset below 1330 } else { 1331 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" 1332 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: " 1333 + Arrays.toString(getPipeline())); 1334 result = true; 1335 } 1336 } 1337 lastTimeCheckSlowSync = now; 1338 slowSyncCount.set(0); 1339 } 1340 return result; 1341 } 1342 1343 public void checkLogLowReplication(long checkInterval) { 1344 long now = EnvironmentEdgeManager.currentTime(); 1345 if (now - lastTimeCheckLowReplication < checkInterval) { 1346 return; 1347 } 1348 // Will return immediately if we are in the middle of a WAL log roll currently. 1349 if (!rollWriterLock.tryLock()) { 1350 return; 1351 } 1352 try { 1353 lastTimeCheckLowReplication = now; 1354 if (doCheckLogLowReplication()) { 1355 requestLogRoll(LOW_REPLICATION); 1356 } 1357 } finally { 1358 rollWriterLock.unlock(); 1359 } 1360 } 1361 1362 /** 1363 * This method gets the pipeline for the current WAL. 1364 */ 1365 abstract DatanodeInfo[] getPipeline(); 1366 1367 /** 1368 * This method gets the datanode replication count for the current WAL. 1369 */ 1370 abstract int getLogReplication(); 1371 1372 private static void split(final Configuration conf, final Path p) throws IOException { 1373 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 1374 if (!fs.exists(p)) { 1375 throw new FileNotFoundException(p.toString()); 1376 } 1377 if (!fs.getFileStatus(p).isDirectory()) { 1378 throw new IOException(p + " is not a directory"); 1379 } 1380 1381 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 1382 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1383 if ( 1384 conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1385 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR) 1386 ) { 1387 archiveDir = new Path(archiveDir, p.getName()); 1388 } 1389 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1390 } 1391 1392 private static void usage() { 1393 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1394 System.err.println("Arguments:"); 1395 System.err.println(" --dump Dump textual representation of passed one or more files"); 1396 System.err.println(" For example: " 1397 + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1398 System.err.println(" --split Split the passed directory of WAL logs"); 1399 System.err.println( 1400 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1401 } 1402 1403 /** 1404 * Pass one or more log file names and it will either dump out a text version on 1405 * <code>stdout</code> or split the specified log files. 1406 */ 1407 public static void main(String[] args) throws IOException { 1408 if (args.length < 2) { 1409 usage(); 1410 System.exit(-1); 1411 } 1412 // either dump using the WALPrettyPrinter or split, depending on args 1413 if (args[0].compareTo("--dump") == 0) { 1414 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1415 } else if (args[0].compareTo("--perf") == 0) { 1416 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1417 LOG.error(HBaseMarkers.FATAL, 1418 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1419 System.exit(-1); 1420 } else if (args[0].compareTo("--split") == 0) { 1421 Configuration conf = HBaseConfiguration.create(); 1422 for (int i = 1; i < args.length; i++) { 1423 try { 1424 Path logPath = new Path(args[i]); 1425 CommonFSUtils.setFsDefault(conf, logPath); 1426 split(conf, logPath); 1427 } catch (IOException t) { 1428 t.printStackTrace(System.err); 1429 System.exit(-1); 1430 } 1431 } 1432 } else { 1433 usage(); 1434 System.exit(-1); 1435 } 1436 } 1437}