001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 023import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; 024import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 026import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 027 028import com.lmax.disruptor.RingBuffer; 029import io.opentelemetry.api.trace.Span; 030import java.io.FileNotFoundException; 031import java.io.IOException; 032import java.io.InterruptedIOException; 033import java.lang.management.MemoryType; 034import java.net.URLEncoder; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Comparator; 038import java.util.List; 039import java.util.Map; 040import java.util.OptionalLong; 041import java.util.Set; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.concurrent.ConcurrentNavigableMap; 044import java.util.concurrent.ConcurrentSkipListMap; 045import java.util.concurrent.CopyOnWriteArrayList; 046import java.util.concurrent.ExecutionException; 047import java.util.concurrent.ExecutorService; 048import java.util.concurrent.Executors; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicBoolean; 051import java.util.concurrent.atomic.AtomicInteger; 052import java.util.concurrent.atomic.AtomicLong; 053import java.util.concurrent.locks.ReentrantLock; 054import org.apache.commons.lang3.mutable.MutableLong; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileStatus; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.PathFilter; 060import org.apache.hadoop.hbase.Abortable; 061import org.apache.hadoop.hbase.Cell; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.HConstants; 064import org.apache.hadoop.hbase.PrivateCellUtil; 065import org.apache.hadoop.hbase.client.RegionInfo; 066import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 067import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 068import org.apache.hadoop.hbase.ipc.RpcServer; 069import org.apache.hadoop.hbase.ipc.ServerCall; 070import org.apache.hadoop.hbase.log.HBaseMarkers; 071import org.apache.hadoop.hbase.regionserver.HRegion; 072import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 073import org.apache.hadoop.hbase.trace.TraceUtil; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.CommonFSUtils; 076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 079import org.apache.hadoop.hbase.wal.WAL; 080import org.apache.hadoop.hbase.wal.WALEdit; 081import org.apache.hadoop.hbase.wal.WALFactory; 082import org.apache.hadoop.hbase.wal.WALKeyImpl; 083import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 084import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 085import org.apache.hadoop.hbase.wal.WALSplitter; 086import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 087import org.apache.hadoop.util.StringUtils; 088import org.apache.yetus.audience.InterfaceAudience; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 093 094/** 095 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 096 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 097 * This is done internal to the implementation. 098 * <p> 099 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 100 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 101 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 102 * flushed out to hfiles, and what is yet in WAL and in memory only. 103 * <p> 104 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 105 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 106 * (smaller) than the most-recent flush. 107 * <p> 108 * To read an WAL, call 109 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * 110 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 111 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 112 * we have made successful appends to the WAL and we then are unable to sync them, our current 113 * semantic is to return error to the client that the appends failed but also to abort the current 114 * context, usually the hosting server. We need to replay the WALs. <br> 115 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 116 * that the append failed. <br> 117 * TODO: replication may pick up these last edits though they have been marked as failed append 118 * (Need to keep our own file lengths, not rely on HDFS). 119 */ 120@InterfaceAudience.Private 121public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 122 123 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 124 125 protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms"; 126 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 127 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 128 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 129 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 130 "hbase.regionserver.wal.slowsync.roll.threshold"; 131 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 132 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 133 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 134 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 135 136 protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 137 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 138 139 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 140 141 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 142 143 public static final String RING_BUFFER_SLOT_COUNT = 144 "hbase.regionserver.wal.disruptor.event.count"; 145 146 /** 147 * file system instance 148 */ 149 protected final FileSystem fs; 150 151 /** 152 * WAL directory, where all WAL files would be placed. 153 */ 154 protected final Path walDir; 155 156 /** 157 * dir path where old logs are kept. 158 */ 159 protected final Path walArchiveDir; 160 161 /** 162 * Matches just those wal files that belong to this wal instance. 163 */ 164 protected final PathFilter ourFiles; 165 166 /** 167 * Prefix of a WAL file, usually the region server name it is hosted on. 168 */ 169 protected final String walFilePrefix; 170 171 /** 172 * Suffix included on generated wal file names 173 */ 174 protected final String walFileSuffix; 175 176 /** 177 * Prefix used when checking for wal membership. 178 */ 179 protected final String prefixPathStr; 180 181 protected final WALCoprocessorHost coprocessorHost; 182 183 /** 184 * conf object 185 */ 186 protected final Configuration conf; 187 188 protected final Abortable abortable; 189 190 /** Listeners that are called on WAL events. */ 191 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 192 193 /** Tracks the logs in the process of being closed. */ 194 protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>(); 195 196 /** 197 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 198 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 199 * facility for answering questions such as "Is it safe to GC a WAL?". 200 */ 201 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 202 203 protected final long slowSyncNs, rollOnSyncNs; 204 protected final int slowSyncRollThreshold; 205 protected final int slowSyncCheckInterval; 206 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 207 208 private final long walSyncTimeoutNs; 209 210 // If > than this size, roll the log. 211 protected final long logrollsize; 212 213 /** 214 * Block size to use writing files. 215 */ 216 protected final long blocksize; 217 218 /* 219 * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too 220 * many and we crash, then will take forever replaying. Keep the number of logs tidy. 221 */ 222 protected final int maxLogs; 223 224 protected final boolean useHsync; 225 226 /** 227 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 228 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 229 * warning when it thinks synchronized controls writer thread safety. It is held when we are 230 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 231 * not. 232 */ 233 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 234 235 // The timestamp (in ms) when the log file was created. 236 protected final AtomicLong filenum = new AtomicLong(-1); 237 238 // Number of transactions in the current Wal. 239 protected final AtomicInteger numEntries = new AtomicInteger(0); 240 241 /** 242 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 243 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 244 * corresponding entry in queue. 245 */ 246 protected volatile long highestUnsyncedTxid = -1; 247 248 /** 249 * Updated to the transaction id of the last successful sync call. This can be less than 250 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 251 * for it. 252 */ 253 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 254 255 /** 256 * The total size of wal 257 */ 258 protected final AtomicLong totalLogSize = new AtomicLong(0); 259 /** 260 * Current log file. 261 */ 262 volatile W writer; 263 264 // Last time to check low replication on hlog's pipeline 265 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 266 267 // Last time we asked to roll the log due to a slow sync 268 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 269 270 protected volatile boolean closed = false; 271 272 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 273 /** 274 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 275 * an IllegalArgumentException if used to compare paths from different wals. 276 */ 277 final Comparator<Path> LOG_NAME_COMPARATOR = 278 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 279 280 private static final class WalProps { 281 282 /** 283 * Map the encoded region name to the highest sequence id. 284 * <p/> 285 * Contains all the regions it has an entry for. 286 */ 287 public final Map<byte[], Long> encodedName2HighestSequenceId; 288 289 /** 290 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 291 * sub classes. 292 */ 293 public final long logSize; 294 295 public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 296 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 297 this.logSize = logSize; 298 } 299 } 300 301 /** 302 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 303 * (contained in the log file name). 304 */ 305 protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = 306 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 307 308 /** 309 * A cache of sync futures reused by threads. 310 */ 311 protected final SyncFutureCache syncFutureCache; 312 313 /** 314 * The class name of the runtime implementation, used as prefix for logging/tracing. 315 * <p> 316 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 317 * refer to HBASE-17676 for more details 318 * </p> 319 */ 320 protected final String implClassName; 321 322 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 323 324 private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( 325 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build()); 326 327 private final int archiveRetries; 328 329 public long getFilenum() { 330 return this.filenum.get(); 331 } 332 333 /** 334 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 335 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 336 * the filename is created with the {@link #computeFilename(long filenum)} method. 337 * @return timestamp, as in the log file name. 338 */ 339 protected long getFileNumFromFileName(Path fileName) { 340 checkNotNull(fileName, "file name can't be null"); 341 if (!ourFiles.accept(fileName)) { 342 throw new IllegalArgumentException( 343 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 344 } 345 final String fileNameString = fileName.toString(); 346 String chompedPath = fileNameString.substring(prefixPathStr.length(), 347 (fileNameString.length() - walFileSuffix.length())); 348 return Long.parseLong(chompedPath); 349 } 350 351 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 352 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 353 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 354 } 355 356 // must be power of 2 357 protected final int getPreallocatedEventCount() { 358 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 359 // be stuck and make no progress if the buffer is filled with appends only and there is no 360 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 361 // before they return. 362 int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 363 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 364 int floor = Integer.highestOneBit(preallocatedEventCount); 365 if (floor == preallocatedEventCount) { 366 return floor; 367 } 368 // max capacity is 1 << 30 369 if (floor >= 1 << 29) { 370 return 1 << 30; 371 } 372 return floor << 1; 373 } 374 375 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 376 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 377 final boolean failIfWALExists, final String prefix, final String suffix) 378 throws FailedLogCloseException, IOException { 379 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 380 } 381 382 protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, 383 final String logDir, final String archiveDir, final Configuration conf, 384 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 385 final String suffix) throws FailedLogCloseException, IOException { 386 this.fs = fs; 387 this.walDir = new Path(rootDir, logDir); 388 this.walArchiveDir = new Path(rootDir, archiveDir); 389 this.conf = conf; 390 this.abortable = abortable; 391 392 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 393 throw new IOException("Unable to mkdir " + walDir); 394 } 395 396 if (!fs.exists(this.walArchiveDir)) { 397 if (!fs.mkdirs(this.walArchiveDir)) { 398 throw new IOException("Unable to mkdir " + this.walArchiveDir); 399 } 400 } 401 402 // If prefix is null||empty then just name it wal 403 this.walFilePrefix = 404 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 405 // we only correctly differentiate suffices when numeric ones start with '.' 406 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 407 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER 408 + "' but instead was '" + suffix + "'"); 409 } 410 // Now that it exists, set the storage policy for the entire directory of wal files related to 411 // this FSHLog instance 412 String storagePolicy = 413 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 414 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 415 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 416 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 417 418 this.ourFiles = new PathFilter() { 419 @Override 420 public boolean accept(final Path fileName) { 421 // The path should start with dir/<prefix> and end with our suffix 422 final String fileNameString = fileName.toString(); 423 if (!fileNameString.startsWith(prefixPathStr)) { 424 return false; 425 } 426 if (walFileSuffix.isEmpty()) { 427 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 428 return org.apache.commons.lang3.StringUtils 429 .isNumeric(fileNameString.substring(prefixPathStr.length())); 430 } else if (!fileNameString.endsWith(walFileSuffix)) { 431 return false; 432 } 433 return true; 434 } 435 }; 436 437 if (failIfWALExists) { 438 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 439 if (null != walFiles && 0 != walFiles.length) { 440 throw new IOException("Target WAL already exists within directory " + walDir); 441 } 442 } 443 444 // Register listeners. TODO: Should this exist anymore? We have CPs? 445 if (listeners != null) { 446 for (WALActionsListener i : listeners) { 447 registerWALActionsListener(i); 448 } 449 } 450 this.coprocessorHost = new WALCoprocessorHost(this, conf); 451 452 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 453 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 454 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 455 // the block size but experience from the field has it that this was not enough time for the 456 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 457 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 458 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 459 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 460 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 461 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 462 this.logrollsize = (long) (this.blocksize * multiplier); 463 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 464 465 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" 466 + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" 467 + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir 468 + ", maxLogs=" + this.maxLogs); 469 this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, 470 conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS))); 471 this.rollOnSyncNs = TimeUnit.MILLISECONDS 472 .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); 473 this.slowSyncRollThreshold = 474 conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 475 this.slowSyncCheckInterval = 476 conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 477 this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, 478 conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS))); 479 this.syncFutureCache = new SyncFutureCache(conf); 480 this.implClassName = getClass().getSimpleName(); 481 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 482 archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0); 483 } 484 485 /** 486 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 487 */ 488 public void init() throws IOException { 489 rollWriter(); 490 } 491 492 @Override 493 public void registerWALActionsListener(WALActionsListener listener) { 494 this.listeners.add(listener); 495 } 496 497 @Override 498 public boolean unregisterWALActionsListener(WALActionsListener listener) { 499 return this.listeners.remove(listener); 500 } 501 502 @Override 503 public WALCoprocessorHost getCoprocessorHost() { 504 return coprocessorHost; 505 } 506 507 @Override 508 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 509 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 510 } 511 512 @Override 513 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 514 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 515 } 516 517 @Override 518 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 519 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 520 } 521 522 @Override 523 public void abortCacheFlush(byte[] encodedRegionName) { 524 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 525 } 526 527 @Override 528 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 529 // Used by tests. Deprecated as too subtle for general usage. 530 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 531 } 532 533 @Override 534 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 535 // This method is used by tests and for figuring if we should flush or not because our 536 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 537 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 538 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 539 // currently flushing sequence ids, and if anything found there, it is returning these. This is 540 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 541 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 542 // id is old even though we are currently flushing. This may mean we do too much flushing. 543 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 544 } 545 546 @Override 547 public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { 548 return rollWriter(false); 549 } 550 551 @Override 552 public final void sync() throws IOException { 553 sync(useHsync); 554 } 555 556 @Override 557 public final void sync(long txid) throws IOException { 558 sync(txid, useHsync); 559 } 560 561 @Override 562 public final void sync(boolean forceSync) throws IOException { 563 TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync")); 564 } 565 566 @Override 567 public final void sync(long txid, boolean forceSync) throws IOException { 568 TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); 569 } 570 571 protected abstract void doSync(boolean forceSync) throws IOException; 572 573 protected abstract void doSync(long txid, boolean forceSync) throws IOException; 574 575 /** 576 * This is a convenience method that computes a new filename with a given file-number. 577 * @param filenum to use n 578 */ 579 protected Path computeFilename(final long filenum) { 580 if (filenum < 0) { 581 throw new RuntimeException("WAL file number can't be < 0"); 582 } 583 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 584 return new Path(walDir, child); 585 } 586 587 /** 588 * This is a convenience method that computes a new filename with a given using the current WAL 589 * file-number n 590 */ 591 public Path getCurrentFileName() { 592 return computeFilename(this.filenum.get()); 593 } 594 595 /** 596 * retrieve the next path to use for writing. Increments the internal filenum. 597 */ 598 private Path getNewPath() throws IOException { 599 this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime())); 600 Path newPath = getCurrentFileName(); 601 return newPath; 602 } 603 604 Path getOldPath() { 605 long currentFilenum = this.filenum.get(); 606 Path oldPath = null; 607 if (currentFilenum > 0) { 608 // ComputeFilename will take care of meta wal filename 609 oldPath = computeFilename(currentFilenum); 610 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 611 return oldPath; 612 } 613 614 /** 615 * Tell listeners about pre log roll. 616 */ 617 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 618 throws IOException { 619 coprocessorHost.preWALRoll(oldPath, newPath); 620 621 if (!this.listeners.isEmpty()) { 622 for (WALActionsListener i : this.listeners) { 623 i.preLogRoll(oldPath, newPath); 624 } 625 } 626 } 627 628 /** 629 * Tell listeners about post log roll. 630 */ 631 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 632 throws IOException { 633 if (!this.listeners.isEmpty()) { 634 for (WALActionsListener i : this.listeners) { 635 i.postLogRoll(oldPath, newPath); 636 } 637 } 638 639 coprocessorHost.postWALRoll(oldPath, newPath); 640 } 641 642 // public only until class moves to o.a.h.h.wal 643 /** Returns the number of rolled log files */ 644 public int getNumRolledLogFiles() { 645 return walFile2Props.size(); 646 } 647 648 // public only until class moves to o.a.h.h.wal 649 /** Returns the number of log files in use */ 650 public int getNumLogFiles() { 651 // +1 for current use log 652 return getNumRolledLogFiles() + 1; 653 } 654 655 /** 656 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the 657 * first (oldest) WAL, and return those regions which should be flushed so that it can be 658 * let-go/'archived'. 659 * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file. 660 */ 661 Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { 662 Map<byte[], List<byte[]>> regions = null; 663 int logCount = getNumRolledLogFiles(); 664 if (logCount > this.maxLogs && logCount > 0) { 665 Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); 666 regions = 667 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 668 } 669 if (regions != null) { 670 List<String> listForPrint = new ArrayList<>(); 671 for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { 672 StringBuilder families = new StringBuilder(); 673 for (int i = 0; i < r.getValue().size(); i++) { 674 if (i > 0) { 675 families.append(","); 676 } 677 families.append(Bytes.toString(r.getValue().get(i))); 678 } 679 listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); 680 } 681 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs 682 + "; forcing (partial) flush of " + regions.size() + " region(s): " 683 + StringUtils.join(",", listForPrint)); 684 } 685 return regions; 686 } 687 688 /** 689 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 690 */ 691 private void cleanOldLogs() throws IOException { 692 List<Pair<Path, Long>> logsToArchive = null; 693 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 694 // are older than what is currently in memory, the WAL can be GC'd. 695 for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) { 696 Path log = e.getKey(); 697 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 698 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 699 if (logsToArchive == null) { 700 logsToArchive = new ArrayList<>(); 701 } 702 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 703 if (LOG.isTraceEnabled()) { 704 LOG.trace("WAL file ready for archiving " + log); 705 } 706 } 707 } 708 709 if (logsToArchive != null) { 710 final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; 711 // make it async 712 for (Pair<Path, Long> log : localLogsToArchive) { 713 logArchiveExecutor.execute(() -> { 714 archive(log); 715 }); 716 this.walFile2Props.remove(log.getFirst()); 717 } 718 } 719 } 720 721 protected void archive(final Pair<Path, Long> log) { 722 totalLogSize.addAndGet(-log.getSecond()); 723 int retry = 1; 724 while (true) { 725 try { 726 archiveLogFile(log.getFirst()); 727 // successful 728 break; 729 } catch (Throwable e) { 730 if (retry > archiveRetries) { 731 LOG.error("Failed log archiving for the log {},", log.getFirst(), e); 732 if (this.abortable != null) { 733 this.abortable.abort("Failed log archiving", e); 734 break; 735 } 736 } else { 737 LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e); 738 } 739 retry++; 740 } 741 } 742 } 743 744 /* 745 * only public so WALSplitter can use. 746 * @return archived location of a WAL file with the given path p 747 */ 748 public static Path getWALArchivePath(Path archiveDir, Path p) { 749 return new Path(archiveDir, p.getName()); 750 } 751 752 protected void archiveLogFile(final Path p) throws IOException { 753 Path newPath = getWALArchivePath(this.walArchiveDir, p); 754 // Tell our listeners that a log is going to be archived. 755 if (!this.listeners.isEmpty()) { 756 for (WALActionsListener i : this.listeners) { 757 i.preLogArchive(p, newPath); 758 } 759 } 760 LOG.info("Archiving " + p + " to " + newPath); 761 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 762 throw new IOException("Unable to rename " + p + " to " + newPath); 763 } 764 // Tell our listeners that a log has been archived. 765 if (!this.listeners.isEmpty()) { 766 for (WALActionsListener i : this.listeners) { 767 i.postLogArchive(p, newPath); 768 } 769 } 770 } 771 772 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 773 int oldNumEntries = this.numEntries.getAndSet(0); 774 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 775 if (oldPath != null) { 776 this.walFile2Props.put(oldPath, 777 new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 778 this.totalLogSize.addAndGet(oldFileLen); 779 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 780 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 781 newPathString); 782 } else { 783 LOG.info("New WAL {}", newPathString); 784 } 785 } 786 787 private Span createSpan(String name) { 788 return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName); 789 } 790 791 /** 792 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 793 * <p/> 794 * <ul> 795 * <li>In the case of creating a new WAL, oldPath will be null.</li> 796 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 797 * </li> 798 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 799 * null.</li> 800 * </ul> 801 * @param oldPath may be null 802 * @param newPath may be null 803 * @param nextWriter may be null 804 * @return the passed in <code>newPath</code> 805 * @throws IOException if there is a problem flushing or closing the underlying FS 806 */ 807 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 808 return TraceUtil.trace(() -> { 809 doReplaceWriter(oldPath, newPath, nextWriter); 810 return newPath; 811 }, () -> createSpan("WAL.replaceWriter")); 812 } 813 814 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 815 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 816 try { 817 if (syncFuture != null) { 818 if (closed) { 819 throw new IOException("WAL has been closed"); 820 } else { 821 syncFuture.get(walSyncTimeoutNs); 822 } 823 } 824 } catch (TimeoutIOException tioe) { 825 throw tioe; 826 } catch (InterruptedException ie) { 827 LOG.warn("Interrupted", ie); 828 throw convertInterruptedExceptionToIOException(ie); 829 } catch (ExecutionException e) { 830 throw ensureIOException(e.getCause()); 831 } 832 } 833 834 private static IOException ensureIOException(final Throwable t) { 835 return (t instanceof IOException) ? (IOException) t : new IOException(t); 836 } 837 838 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 839 Thread.currentThread().interrupt(); 840 IOException ioe = new InterruptedIOException(); 841 ioe.initCause(ie); 842 return ioe; 843 } 844 845 private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException { 846 rollWriterLock.lock(); 847 try { 848 // Return if nothing to flush. 849 if (!force && this.writer != null && this.numEntries.get() <= 0) { 850 return null; 851 } 852 Map<byte[], List<byte[]>> regionsToFlush = null; 853 if (this.closed) { 854 LOG.debug("WAL closed. Skipping rolling of writer"); 855 return regionsToFlush; 856 } 857 try { 858 Path oldPath = getOldPath(); 859 Path newPath = getNewPath(); 860 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 861 W nextWriter = this.createWriterInstance(newPath); 862 tellListenersAboutPreLogRoll(oldPath, newPath); 863 // NewPath could be equal to oldPath if replaceWriter fails. 864 newPath = replaceWriter(oldPath, newPath, nextWriter); 865 tellListenersAboutPostLogRoll(oldPath, newPath); 866 if (LOG.isDebugEnabled()) { 867 LOG.debug("Create new " + implClassName + " writer with pipeline: " 868 + Arrays.toString(getPipeline())); 869 } 870 // We got a new writer, so reset the slow sync count 871 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 872 slowSyncCount.set(0); 873 // Can we delete any of the old log files? 874 if (getNumRolledLogFiles() > 0) { 875 cleanOldLogs(); 876 regionsToFlush = findRegionsToForceFlush(); 877 } 878 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 879 // If the underlying FileSystem can't do what we ask, treat as IO failure so 880 // we'll abort. 881 throw new IOException( 882 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 883 exception); 884 } 885 return regionsToFlush; 886 } finally { 887 rollWriterLock.unlock(); 888 } 889 } 890 891 @Override 892 public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { 893 return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter")); 894 } 895 896 // public only until class moves to o.a.h.h.wal 897 /** Returns the size of log files in use */ 898 public long getLogFileSize() { 899 return this.totalLogSize.get(); 900 } 901 902 // public only until class moves to o.a.h.h.wal 903 public void requestLogRoll() { 904 requestLogRoll(ERROR); 905 } 906 907 /** 908 * Get the backing files associated with this WAL. 909 * @return may be null if there are no files. 910 */ 911 FileStatus[] getFiles() throws IOException { 912 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 913 } 914 915 @Override 916 public void shutdown() throws IOException { 917 if (!shutdown.compareAndSet(false, true)) { 918 return; 919 } 920 closed = true; 921 // Tell our listeners that the log is closing 922 if (!this.listeners.isEmpty()) { 923 for (WALActionsListener i : this.listeners) { 924 i.logCloseRequested(); 925 } 926 } 927 rollWriterLock.lock(); 928 try { 929 doShutdown(); 930 if (syncFutureCache != null) { 931 syncFutureCache.clear(); 932 } 933 if (logArchiveExecutor != null) { 934 logArchiveExecutor.shutdownNow(); 935 } 936 } finally { 937 rollWriterLock.unlock(); 938 } 939 } 940 941 @Override 942 public void close() throws IOException { 943 shutdown(); 944 final FileStatus[] files = getFiles(); 945 if (null != files && 0 != files.length) { 946 for (FileStatus file : files) { 947 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 948 // Tell our listeners that a log is going to be archived. 949 if (!this.listeners.isEmpty()) { 950 for (WALActionsListener i : this.listeners) { 951 i.preLogArchive(file.getPath(), p); 952 } 953 } 954 955 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 956 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 957 } 958 // Tell our listeners that a log was archived. 959 if (!this.listeners.isEmpty()) { 960 for (WALActionsListener i : this.listeners) { 961 i.postLogArchive(file.getPath(), p); 962 } 963 } 964 } 965 LOG.debug( 966 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 967 } 968 LOG.info("Closed WAL: " + toString()); 969 } 970 971 /** Returns number of WALs currently in the process of closing. */ 972 public int getInflightWALCloseCount() { 973 return inflightWALClosures.size(); 974 } 975 976 /** 977 * updates the sequence number of a specific store. depending on the flag: replaces current seq 978 * number if the given seq id is bigger, or even if it is lower than existing one 979 */ 980 @Override 981 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 982 boolean onlyIfGreater) { 983 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 984 } 985 986 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 987 return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); 988 } 989 990 protected boolean isLogRollRequested() { 991 return rollRequested.get(); 992 } 993 994 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 995 // If we have already requested a roll, don't do it again 996 // And only set rollRequested to true when there is a registered listener 997 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 998 for (WALActionsListener i : this.listeners) { 999 i.logRollRequested(reason); 1000 } 1001 } 1002 } 1003 1004 long getUnflushedEntriesCount() { 1005 long highestSynced = this.highestSyncedTxid.get(); 1006 long highestUnsynced = this.highestUnsyncedTxid; 1007 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 1008 } 1009 1010 boolean isUnflushedEntries() { 1011 return getUnflushedEntriesCount() > 0; 1012 } 1013 1014 /** 1015 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 1016 */ 1017 protected void atHeadOfRingBufferEventHandlerAppend() { 1018 // Noop 1019 } 1020 1021 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 1022 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 1023 atHeadOfRingBufferEventHandlerAppend(); 1024 long start = EnvironmentEdgeManager.currentTime(); 1025 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 1026 long regionSequenceId = entry.getKey().getSequenceId(); 1027 1028 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1029 // region sequence id only, a region edit/sequence id that is not associated with an actual 1030 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1031 if (entry.getEdit().isEmpty()) { 1032 return false; 1033 } 1034 1035 // Coprocessor hook. 1036 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1037 if (!listeners.isEmpty()) { 1038 for (WALActionsListener i : listeners) { 1039 i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1040 } 1041 } 1042 doAppend(writer, entry); 1043 assert highestUnsyncedTxid < entry.getTxid(); 1044 highestUnsyncedTxid = entry.getTxid(); 1045 if (entry.isCloseRegion()) { 1046 // let's clean all the records of this region 1047 sequenceIdAccounting.onRegionClose(encodedRegionName); 1048 } else { 1049 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1050 entry.isInMemStore()); 1051 } 1052 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1053 // Update metrics. 1054 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1055 numEntries.incrementAndGet(); 1056 return true; 1057 } 1058 1059 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 1060 long len = 0; 1061 if (!listeners.isEmpty()) { 1062 for (Cell cell : e.getEdit().getCells()) { 1063 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 1064 } 1065 for (WALActionsListener listener : listeners) { 1066 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 1067 } 1068 } 1069 return len; 1070 } 1071 1072 protected final void postSync(final long timeInNanos, final int handlerSyncs) { 1073 if (timeInNanos > this.slowSyncNs) { 1074 String msg = new StringBuilder().append("Slow sync cost: ") 1075 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ") 1076 .append(Arrays.toString(getPipeline())).toString(); 1077 LOG.info(msg); 1078 // A single sync took too long. 1079 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1080 // effects. Here we have a single data point that indicates we should take immediate 1081 // action, so do so. 1082 if (timeInNanos > this.rollOnSyncNs) { 1083 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" 1084 + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" 1085 + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " 1086 + Arrays.toString(getPipeline())); 1087 requestLogRoll(SLOW_SYNC); 1088 } 1089 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1090 } 1091 if (!listeners.isEmpty()) { 1092 for (WALActionsListener listener : listeners) { 1093 listener.postSync(timeInNanos, handlerSyncs); 1094 } 1095 } 1096 } 1097 1098 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1099 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { 1100 if (this.closed) { 1101 throw new IOException( 1102 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1103 } 1104 MutableLong txidHolder = new MutableLong(); 1105 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1106 txidHolder.setValue(ringBuffer.next()); 1107 }); 1108 long txid = txidHolder.longValue(); 1109 ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) 1110 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); 1111 try { 1112 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1113 entry.stampRegionSequenceId(we); 1114 ringBuffer.get(txid).load(entry); 1115 } finally { 1116 ringBuffer.publish(txid); 1117 } 1118 return txid; 1119 } 1120 1121 @Override 1122 public String toString() { 1123 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1124 } 1125 1126 /** 1127 * if the given {@code path} is being written currently, then return its length. 1128 * <p> 1129 * This is used by replication to prevent replicating unacked log entries. See 1130 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1131 */ 1132 @Override 1133 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1134 rollWriterLock.lock(); 1135 try { 1136 Path currentPath = getOldPath(); 1137 if (path.equals(currentPath)) { 1138 // Currently active path. 1139 W writer = this.writer; 1140 return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); 1141 } else { 1142 W temp = inflightWALClosures.get(path.getName()); 1143 if (temp != null) { 1144 // In the process of being closed, trailer bytes may or may not be flushed. 1145 // Ensuring that we read all the bytes in a file is critical for correctness of tailing 1146 // use cases like replication, see HBASE-25924/HBASE-25932. 1147 return OptionalLong.of(temp.getSyncedLength()); 1148 } 1149 // Log rolled successfully. 1150 return OptionalLong.empty(); 1151 } 1152 } finally { 1153 rollWriterLock.unlock(); 1154 } 1155 } 1156 1157 @Override 1158 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1159 return TraceUtil.trace(() -> append(info, key, edits, true), 1160 () -> createSpan("WAL.appendData")); 1161 } 1162 1163 @Override 1164 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1165 return TraceUtil.trace(() -> append(info, key, edits, false), 1166 () -> createSpan("WAL.appendMarker")); 1167 } 1168 1169 /** 1170 * Append a set of edits to the WAL. 1171 * <p/> 1172 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1173 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1174 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1175 * <p/> 1176 * NOTE: This append, at a time that is usually after this call returns, starts an mvcc 1177 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1178 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1179 * 'complete' the transaction this mvcc transaction by calling 1180 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1181 * in the finally of a try/finally block within which this append lives and any subsequent 1182 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1183 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1184 * immediately available on return from this method. It WILL be available subsequent to a sync of 1185 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1186 * @param info the regioninfo associated with append 1187 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1188 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1189 * sequence id that is after all currently appended edits. 1190 * @param inMemstore Always true except for case where we are writing a region event meta marker 1191 * edit, for example, a compaction completion record into the WAL or noting a 1192 * Region Open event. In these cases the entry is just so we can finish an 1193 * unfinished compaction after a crash when the new Server reads the WAL on 1194 * recovery, etc. These transition event 'Markers' do not go via the memstore. 1195 * When memstore is false, we presume a Marker event edit. 1196 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1197 * in it. 1198 */ 1199 protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1200 throws IOException; 1201 1202 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1203 1204 protected abstract W createWriterInstance(Path path) 1205 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1206 1207 /** 1208 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1209 * will begin to work before returning from this method. If we clear the flag after returning from 1210 * this call, we may miss a roll request. The implementation class should choose a proper place to 1211 * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you 1212 * start writing to the new writer. 1213 */ 1214 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1215 throws IOException; 1216 1217 protected abstract void doShutdown() throws IOException; 1218 1219 protected abstract boolean doCheckLogLowReplication(); 1220 1221 /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ 1222 protected boolean doCheckSlowSync() { 1223 boolean result = false; 1224 long now = EnvironmentEdgeManager.currentTime(); 1225 long elapsedTime = now - lastTimeCheckSlowSync; 1226 if (elapsedTime >= slowSyncCheckInterval) { 1227 if (slowSyncCount.get() >= slowSyncRollThreshold) { 1228 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 1229 // If two or more slowSyncCheckInterval have elapsed this is a corner case 1230 // where a train of slow syncs almost triggered us but then there was a long 1231 // interval from then until the one more that pushed us over. If so, we 1232 // should do nothing and let the count reset. 1233 if (LOG.isDebugEnabled()) { 1234 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count=" 1235 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime=" 1236 + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms"); 1237 } 1238 // Fall through to count reset below 1239 } else { 1240 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" 1241 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: " 1242 + Arrays.toString(getPipeline())); 1243 result = true; 1244 } 1245 } 1246 lastTimeCheckSlowSync = now; 1247 slowSyncCount.set(0); 1248 } 1249 return result; 1250 } 1251 1252 public void checkLogLowReplication(long checkInterval) { 1253 long now = EnvironmentEdgeManager.currentTime(); 1254 if (now - lastTimeCheckLowReplication < checkInterval) { 1255 return; 1256 } 1257 // Will return immediately if we are in the middle of a WAL log roll currently. 1258 if (!rollWriterLock.tryLock()) { 1259 return; 1260 } 1261 try { 1262 lastTimeCheckLowReplication = now; 1263 if (doCheckLogLowReplication()) { 1264 requestLogRoll(LOW_REPLICATION); 1265 } 1266 } finally { 1267 rollWriterLock.unlock(); 1268 } 1269 } 1270 1271 /** 1272 * This method gets the pipeline for the current WAL. 1273 */ 1274 abstract DatanodeInfo[] getPipeline(); 1275 1276 /** 1277 * This method gets the datanode replication count for the current WAL. 1278 */ 1279 abstract int getLogReplication(); 1280 1281 private static void split(final Configuration conf, final Path p) throws IOException { 1282 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 1283 if (!fs.exists(p)) { 1284 throw new FileNotFoundException(p.toString()); 1285 } 1286 if (!fs.getFileStatus(p).isDirectory()) { 1287 throw new IOException(p + " is not a directory"); 1288 } 1289 1290 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 1291 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1292 if ( 1293 conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1294 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR) 1295 ) { 1296 archiveDir = new Path(archiveDir, p.getName()); 1297 } 1298 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1299 } 1300 1301 private static void usage() { 1302 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1303 System.err.println("Arguments:"); 1304 System.err.println(" --dump Dump textual representation of passed one or more files"); 1305 System.err.println(" For example: " 1306 + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1307 System.err.println(" --split Split the passed directory of WAL logs"); 1308 System.err.println( 1309 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1310 } 1311 1312 /** 1313 * Pass one or more log file names and it will either dump out a text version on 1314 * <code>stdout</code> or split the specified log files. 1315 */ 1316 public static void main(String[] args) throws IOException { 1317 if (args.length < 2) { 1318 usage(); 1319 System.exit(-1); 1320 } 1321 // either dump using the WALPrettyPrinter or split, depending on args 1322 if (args[0].compareTo("--dump") == 0) { 1323 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1324 } else if (args[0].compareTo("--perf") == 0) { 1325 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1326 LOG.error(HBaseMarkers.FATAL, 1327 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1328 System.exit(-1); 1329 } else if (args[0].compareTo("--split") == 0) { 1330 Configuration conf = HBaseConfiguration.create(); 1331 for (int i = 1; i < args.length; i++) { 1332 try { 1333 Path logPath = new Path(args[i]); 1334 CommonFSUtils.setFsDefault(conf, logPath); 1335 split(conf, logPath); 1336 } catch (IOException t) { 1337 t.printStackTrace(System.err); 1338 System.exit(-1); 1339 } 1340 } 1341 } else { 1342 usage(); 1343 System.exit(-1); 1344 } 1345 } 1346}