001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 026 027import com.lmax.disruptor.RingBuffer; 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.lang.management.MemoryType; 032import java.net.URLEncoder; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Comparator; 036import java.util.List; 037import java.util.Map; 038import java.util.OptionalLong; 039import java.util.Set; 040import java.util.concurrent.ConcurrentNavigableMap; 041import java.util.concurrent.ConcurrentSkipListMap; 042import java.util.concurrent.CopyOnWriteArrayList; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicBoolean; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.concurrent.locks.ReentrantLock; 049import org.apache.commons.lang3.mutable.MutableLong; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.hbase.Cell; 056import org.apache.hadoop.hbase.HBaseConfiguration; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.PrivateCellUtil; 059import org.apache.hadoop.hbase.client.RegionInfo; 060import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 061import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 062import org.apache.hadoop.hbase.ipc.RpcServer; 063import org.apache.hadoop.hbase.ipc.ServerCall; 064import org.apache.hadoop.hbase.log.HBaseMarkers; 065import org.apache.hadoop.hbase.regionserver.HRegion; 066import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 067import org.apache.hadoop.hbase.trace.TraceUtil; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 071import org.apache.hadoop.hbase.util.Pair; 072import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 073import org.apache.hadoop.hbase.wal.WAL; 074import org.apache.hadoop.hbase.wal.WALEdit; 075import org.apache.hadoop.hbase.wal.WALFactory; 076import org.apache.hadoop.hbase.wal.WALKeyImpl; 077import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 078import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 079import org.apache.hadoop.hbase.wal.WALSplitter; 080import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 081import org.apache.hadoop.util.StringUtils; 082import org.apache.htrace.core.TraceScope; 083import org.apache.yetus.audience.InterfaceAudience; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 088 089/** 090 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 091 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 092 * This is done internal to the implementation. 093 * <p> 094 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 095 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 096 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 097 * flushed out to hfiles, and what is yet in WAL and in memory only. 098 * <p> 099 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 100 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 101 * (smaller) than the most-recent flush. 102 * <p> 103 * To read an WAL, call 104 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * 105 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 106 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 107 * we have made successful appends to the WAL and we then are unable to sync them, our current 108 * semantic is to return error to the client that the appends failed but also to abort the current 109 * context, usually the hosting server. We need to replay the WALs. <br> 110 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 111 * that the append failed. <br> 112 * TODO: replication may pick up these last edits though they have been marked as failed append 113 * (Need to keep our own file lengths, not rely on HDFS). 114 */ 115@InterfaceAudience.Private 116public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 117 118 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 119 120 protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms"; 121 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 122 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 123 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 124 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 125 "hbase.regionserver.wal.slowsync.roll.threshold"; 126 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 127 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 128 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 129 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 130 131 protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 132 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 133 134 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 135 136 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 137 138 public static final String RING_BUFFER_SLOT_COUNT = 139 "hbase.regionserver.wal.disruptor.event.count"; 140 141 /** 142 * file system instance 143 */ 144 protected final FileSystem fs; 145 146 /** 147 * WAL directory, where all WAL files would be placed. 148 */ 149 protected final Path walDir; 150 151 /** 152 * dir path where old logs are kept. 153 */ 154 protected final Path walArchiveDir; 155 156 /** 157 * Matches just those wal files that belong to this wal instance. 158 */ 159 protected final PathFilter ourFiles; 160 161 /** 162 * Prefix of a WAL file, usually the region server name it is hosted on. 163 */ 164 protected final String walFilePrefix; 165 166 /** 167 * Suffix included on generated wal file names 168 */ 169 protected final String walFileSuffix; 170 171 /** 172 * Prefix used when checking for wal membership. 173 */ 174 protected final String prefixPathStr; 175 176 protected final WALCoprocessorHost coprocessorHost; 177 178 /** 179 * conf object 180 */ 181 protected final Configuration conf; 182 183 /** Listeners that are called on WAL events. */ 184 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 185 186 /** 187 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 188 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 189 * facility for answering questions such as "Is it safe to GC a WAL?". 190 */ 191 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 192 193 protected final long slowSyncNs, rollOnSyncNs; 194 protected final int slowSyncRollThreshold; 195 protected final int slowSyncCheckInterval; 196 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 197 198 private final long walSyncTimeoutNs; 199 200 // If > than this size, roll the log. 201 protected final long logrollsize; 202 203 /** 204 * Block size to use writing files. 205 */ 206 protected final long blocksize; 207 208 /* 209 * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too 210 * many and we crash, then will take forever replaying. Keep the number of logs tidy. 211 */ 212 protected final int maxLogs; 213 214 protected final boolean useHsync; 215 216 /** 217 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 218 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 219 * warning when it thinks synchronized controls writer thread safety. It is held when we are 220 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 221 * not. 222 */ 223 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 224 225 // The timestamp (in ms) when the log file was created. 226 protected final AtomicLong filenum = new AtomicLong(-1); 227 228 // Number of transactions in the current Wal. 229 protected final AtomicInteger numEntries = new AtomicInteger(0); 230 231 /** 232 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 233 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 234 * corresponding entry in queue. 235 */ 236 protected volatile long highestUnsyncedTxid = -1; 237 238 /** 239 * Updated to the transaction id of the last successful sync call. This can be less than 240 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 241 * for it. 242 */ 243 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 244 245 /** 246 * The total size of wal 247 */ 248 protected final AtomicLong totalLogSize = new AtomicLong(0); 249 /** 250 * Current log file. 251 */ 252 volatile W writer; 253 254 // Last time to check low replication on hlog's pipeline 255 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 256 257 // Last time we asked to roll the log due to a slow sync 258 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 259 260 protected volatile boolean closed = false; 261 262 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 263 /** 264 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 265 * an IllegalArgumentException if used to compare paths from different wals. 266 */ 267 final Comparator<Path> LOG_NAME_COMPARATOR = 268 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 269 270 private static final class WalProps { 271 272 /** 273 * Map the encoded region name to the highest sequence id. 274 * <p/>Contains all the regions it has an entry for. 275 */ 276 public final Map<byte[], Long> encodedName2HighestSequenceId; 277 278 /** 279 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 280 * sub classes. 281 */ 282 public final long logSize; 283 284 public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 285 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 286 this.logSize = logSize; 287 } 288 } 289 290 /** 291 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 292 * (contained in the log file name). 293 */ 294 protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = 295 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 296 297 /** 298 * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures. 299 * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228 300 * <p> 301 */ 302 private final ThreadLocal<SyncFuture> cachedSyncFutures; 303 304 /** 305 * The class name of the runtime implementation, used as prefix for logging/tracing. 306 * <p> 307 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 308 * refer to HBASE-17676 for more details 309 * </p> 310 */ 311 protected final String implClassName; 312 313 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 314 315 public long getFilenum() { 316 return this.filenum.get(); 317 } 318 319 /** 320 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 321 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 322 * the filename is created with the {@link #computeFilename(long filenum)} method. 323 * @return timestamp, as in the log file name. 324 */ 325 protected long getFileNumFromFileName(Path fileName) { 326 checkNotNull(fileName, "file name can't be null"); 327 if (!ourFiles.accept(fileName)) { 328 throw new IllegalArgumentException( 329 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 330 } 331 final String fileNameString = fileName.toString(); 332 String chompedPath = fileNameString.substring(prefixPathStr.length(), 333 (fileNameString.length() - walFileSuffix.length())); 334 return Long.parseLong(chompedPath); 335 } 336 337 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 338 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 339 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 340 } 341 342 // must be power of 2 343 protected final int getPreallocatedEventCount() { 344 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 345 // be stuck and make no progress if the buffer is filled with appends only and there is no 346 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 347 // before they return. 348 int preallocatedEventCount = 349 this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 350 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 351 int floor = Integer.highestOneBit(preallocatedEventCount); 352 if (floor == preallocatedEventCount) { 353 return floor; 354 } 355 // max capacity is 1 << 30 356 if (floor >= 1 << 29) { 357 return 1 << 30; 358 } 359 return floor << 1; 360 } 361 362 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 363 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 364 final boolean failIfWALExists, final String prefix, final String suffix) 365 throws FailedLogCloseException, IOException { 366 this.fs = fs; 367 this.walDir = new Path(rootDir, logDir); 368 this.walArchiveDir = new Path(rootDir, archiveDir); 369 this.conf = conf; 370 371 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 372 throw new IOException("Unable to mkdir " + walDir); 373 } 374 375 if (!fs.exists(this.walArchiveDir)) { 376 if (!fs.mkdirs(this.walArchiveDir)) { 377 throw new IOException("Unable to mkdir " + this.walArchiveDir); 378 } 379 } 380 381 // If prefix is null||empty then just name it wal 382 this.walFilePrefix = 383 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 384 // we only correctly differentiate suffices when numeric ones start with '.' 385 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 386 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + 387 "' but instead was '" + suffix + "'"); 388 } 389 // Now that it exists, set the storage policy for the entire directory of wal files related to 390 // this FSHLog instance 391 String storagePolicy = 392 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 393 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 394 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 395 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 396 397 this.ourFiles = new PathFilter() { 398 @Override 399 public boolean accept(final Path fileName) { 400 // The path should start with dir/<prefix> and end with our suffix 401 final String fileNameString = fileName.toString(); 402 if (!fileNameString.startsWith(prefixPathStr)) { 403 return false; 404 } 405 if (walFileSuffix.isEmpty()) { 406 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 407 return org.apache.commons.lang3.StringUtils 408 .isNumeric(fileNameString.substring(prefixPathStr.length())); 409 } else if (!fileNameString.endsWith(walFileSuffix)) { 410 return false; 411 } 412 return true; 413 } 414 }; 415 416 if (failIfWALExists) { 417 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 418 if (null != walFiles && 0 != walFiles.length) { 419 throw new IOException("Target WAL already exists within directory " + walDir); 420 } 421 } 422 423 // Register listeners. TODO: Should this exist anymore? We have CPs? 424 if (listeners != null) { 425 for (WALActionsListener i : listeners) { 426 registerWALActionsListener(i); 427 } 428 } 429 this.coprocessorHost = new WALCoprocessorHost(this, conf); 430 431 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 432 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 433 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 434 // the block size but experience from the field has it that this was not enough time for the 435 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 436 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 437 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 438 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 439 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 440 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 441 this.logrollsize = (long) (this.blocksize * multiplier); 442 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 443 444 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + 445 StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + 446 walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir + 447 ", maxLogs=" + this.maxLogs); 448 this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, 449 conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS))); 450 this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, 451 DEFAULT_ROLL_ON_SYNC_TIME_MS)); 452 this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, 453 DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 454 this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, 455 DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 456 this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, 457 conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS))); 458 this.cachedSyncFutures = new ThreadLocal<SyncFuture>() { 459 @Override 460 protected SyncFuture initialValue() { 461 return new SyncFuture(); 462 } 463 }; 464 this.implClassName = getClass().getSimpleName(); 465 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 466 } 467 468 /** 469 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 470 */ 471 public void init() throws IOException { 472 rollWriter(); 473 } 474 475 @Override 476 public void registerWALActionsListener(WALActionsListener listener) { 477 this.listeners.add(listener); 478 } 479 480 @Override 481 public boolean unregisterWALActionsListener(WALActionsListener listener) { 482 return this.listeners.remove(listener); 483 } 484 485 @Override 486 public WALCoprocessorHost getCoprocessorHost() { 487 return coprocessorHost; 488 } 489 490 @Override 491 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 492 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 493 } 494 495 @Override 496 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 497 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 498 } 499 500 @Override 501 public void completeCacheFlush(byte[] encodedRegionName) { 502 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); 503 } 504 505 @Override 506 public void abortCacheFlush(byte[] encodedRegionName) { 507 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 508 } 509 510 @Override 511 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 512 // Used by tests. Deprecated as too subtle for general usage. 513 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 514 } 515 516 @Override 517 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 518 // This method is used by tests and for figuring if we should flush or not because our 519 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 520 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 521 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 522 // currently flushing sequence ids, and if anything found there, it is returning these. This is 523 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 524 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 525 // id is old even though we are currently flushing. This may mean we do too much flushing. 526 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 527 } 528 529 @Override 530 public byte[][] rollWriter() throws FailedLogCloseException, IOException { 531 return rollWriter(false); 532 } 533 534 /** 535 * This is a convenience method that computes a new filename with a given file-number. 536 * @param filenum to use 537 * @return Path 538 */ 539 protected Path computeFilename(final long filenum) { 540 if (filenum < 0) { 541 throw new RuntimeException("WAL file number can't be < 0"); 542 } 543 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 544 return new Path(walDir, child); 545 } 546 547 /** 548 * This is a convenience method that computes a new filename with a given using the current WAL 549 * file-number 550 * @return Path 551 */ 552 public Path getCurrentFileName() { 553 return computeFilename(this.filenum.get()); 554 } 555 556 /** 557 * retrieve the next path to use for writing. Increments the internal filenum. 558 */ 559 private Path getNewPath() throws IOException { 560 this.filenum.set(System.currentTimeMillis()); 561 Path newPath = getCurrentFileName(); 562 while (fs.exists(newPath)) { 563 this.filenum.incrementAndGet(); 564 newPath = getCurrentFileName(); 565 } 566 return newPath; 567 } 568 569 @VisibleForTesting 570 Path getOldPath() { 571 long currentFilenum = this.filenum.get(); 572 Path oldPath = null; 573 if (currentFilenum > 0) { 574 // ComputeFilename will take care of meta wal filename 575 oldPath = computeFilename(currentFilenum); 576 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 577 return oldPath; 578 } 579 580 /** 581 * Tell listeners about pre log roll. 582 */ 583 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 584 throws IOException { 585 coprocessorHost.preWALRoll(oldPath, newPath); 586 587 if (!this.listeners.isEmpty()) { 588 for (WALActionsListener i : this.listeners) { 589 i.preLogRoll(oldPath, newPath); 590 } 591 } 592 } 593 594 /** 595 * Tell listeners about post log roll. 596 */ 597 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 598 throws IOException { 599 if (!this.listeners.isEmpty()) { 600 for (WALActionsListener i : this.listeners) { 601 i.postLogRoll(oldPath, newPath); 602 } 603 } 604 605 coprocessorHost.postWALRoll(oldPath, newPath); 606 } 607 608 // public only until class moves to o.a.h.h.wal 609 /** @return the number of rolled log files */ 610 public int getNumRolledLogFiles() { 611 return walFile2Props.size(); 612 } 613 614 // public only until class moves to o.a.h.h.wal 615 /** @return the number of log files in use */ 616 public int getNumLogFiles() { 617 // +1 for current use log 618 return getNumRolledLogFiles() + 1; 619 } 620 621 /** 622 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, 623 * check the first (oldest) WAL, and return those regions which should be flushed so that 624 * it can be let-go/'archived'. 625 * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. 626 */ 627 byte[][] findRegionsToForceFlush() throws IOException { 628 byte[][] regions = null; 629 int logCount = getNumRolledLogFiles(); 630 if (logCount > this.maxLogs && logCount > 0) { 631 Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); 632 regions = 633 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 634 } 635 if (regions != null) { 636 StringBuilder sb = new StringBuilder(); 637 for (int i = 0; i < regions.length; i++) { 638 if (i > 0) { 639 sb.append(", "); 640 } 641 sb.append(Bytes.toStringBinary(regions[i])); 642 } 643 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + 644 "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); 645 } 646 return regions; 647 } 648 649 /** 650 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 651 */ 652 private void cleanOldLogs() throws IOException { 653 List<Pair<Path, Long>> logsToArchive = null; 654 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 655 // are older than what is currently in memory, the WAL can be GC'd. 656 for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) { 657 Path log = e.getKey(); 658 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 659 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 660 if (logsToArchive == null) { 661 logsToArchive = new ArrayList<>(); 662 } 663 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 664 if (LOG.isTraceEnabled()) { 665 LOG.trace("WAL file ready for archiving " + log); 666 } 667 } 668 } 669 if (logsToArchive != null) { 670 for (Pair<Path, Long> logAndSize : logsToArchive) { 671 this.totalLogSize.addAndGet(-logAndSize.getSecond()); 672 archiveLogFile(logAndSize.getFirst()); 673 this.walFile2Props.remove(logAndSize.getFirst()); 674 } 675 } 676 } 677 678 /* 679 * only public so WALSplitter can use. 680 * @return archived location of a WAL file with the given path p 681 */ 682 public static Path getWALArchivePath(Path archiveDir, Path p) { 683 return new Path(archiveDir, p.getName()); 684 } 685 686 private void archiveLogFile(final Path p) throws IOException { 687 Path newPath = getWALArchivePath(this.walArchiveDir, p); 688 // Tell our listeners that a log is going to be archived. 689 if (!this.listeners.isEmpty()) { 690 for (WALActionsListener i : this.listeners) { 691 i.preLogArchive(p, newPath); 692 } 693 } 694 LOG.info("Archiving " + p + " to " + newPath); 695 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 696 throw new IOException("Unable to rename " + p + " to " + newPath); 697 } 698 // Tell our listeners that a log has been archived. 699 if (!this.listeners.isEmpty()) { 700 for (WALActionsListener i : this.listeners) { 701 i.postLogArchive(p, newPath); 702 } 703 } 704 } 705 706 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 707 int oldNumEntries = this.numEntries.getAndSet(0); 708 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 709 if (oldPath != null) { 710 this.walFile2Props.put(oldPath, 711 new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 712 this.totalLogSize.addAndGet(oldFileLen); 713 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 714 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 715 newPathString); 716 } else { 717 LOG.info("New WAL {}", newPathString); 718 } 719 } 720 721 /** 722 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 723 * <p/> 724 * <ul> 725 * <li>In the case of creating a new WAL, oldPath will be null.</li> 726 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 727 * </li> 728 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 729 * null.</li> 730 * </ul> 731 * @param oldPath may be null 732 * @param newPath may be null 733 * @param nextWriter may be null 734 * @return the passed in <code>newPath</code> 735 * @throws IOException if there is a problem flushing or closing the underlying FS 736 */ 737 @VisibleForTesting 738 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 739 try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { 740 doReplaceWriter(oldPath, newPath, nextWriter); 741 return newPath; 742 } 743 } 744 745 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 746 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 747 try { 748 if (syncFuture != null) { 749 if (closed) { 750 throw new IOException("WAL has been closed"); 751 } else { 752 syncFuture.get(walSyncTimeoutNs); 753 } 754 } 755 } catch (TimeoutIOException tioe) { 756 // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer 757 // still refer to it, so if this thread use it next time may get a wrong 758 // result. 759 this.cachedSyncFutures.remove(); 760 throw tioe; 761 } catch (InterruptedException ie) { 762 LOG.warn("Interrupted", ie); 763 throw convertInterruptedExceptionToIOException(ie); 764 } catch (ExecutionException e) { 765 throw ensureIOException(e.getCause()); 766 } 767 } 768 769 private static IOException ensureIOException(final Throwable t) { 770 return (t instanceof IOException) ? (IOException) t : new IOException(t); 771 } 772 773 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 774 Thread.currentThread().interrupt(); 775 IOException ioe = new InterruptedIOException(); 776 ioe.initCause(ie); 777 return ioe; 778 } 779 780 @Override 781 public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { 782 rollWriterLock.lock(); 783 try { 784 // Return if nothing to flush. 785 if (!force && this.writer != null && this.numEntries.get() <= 0) { 786 return null; 787 } 788 byte[][] regionsToFlush = null; 789 if (this.closed) { 790 LOG.debug("WAL closed. Skipping rolling of writer"); 791 return regionsToFlush; 792 } 793 try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { 794 Path oldPath = getOldPath(); 795 Path newPath = getNewPath(); 796 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 797 W nextWriter = this.createWriterInstance(newPath); 798 tellListenersAboutPreLogRoll(oldPath, newPath); 799 // NewPath could be equal to oldPath if replaceWriter fails. 800 newPath = replaceWriter(oldPath, newPath, nextWriter); 801 tellListenersAboutPostLogRoll(oldPath, newPath); 802 if (LOG.isDebugEnabled()) { 803 LOG.debug("Create new " + implClassName + " writer with pipeline: " + 804 Arrays.toString(getPipeline())); 805 } 806 // We got a new writer, so reset the slow sync count 807 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 808 slowSyncCount.set(0); 809 // Can we delete any of the old log files? 810 if (getNumRolledLogFiles() > 0) { 811 cleanOldLogs(); 812 regionsToFlush = findRegionsToForceFlush(); 813 } 814 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 815 // If the underlying FileSystem can't do what we ask, treat as IO failure so 816 // we'll abort. 817 throw new IOException( 818 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 819 exception); 820 } 821 return regionsToFlush; 822 } finally { 823 rollWriterLock.unlock(); 824 } 825 } 826 827 // public only until class moves to o.a.h.h.wal 828 /** @return the size of log files in use */ 829 public long getLogFileSize() { 830 return this.totalLogSize.get(); 831 } 832 833 // public only until class moves to o.a.h.h.wal 834 public void requestLogRoll() { 835 requestLogRoll(ERROR); 836 } 837 838 /** 839 * Get the backing files associated with this WAL. 840 * @return may be null if there are no files. 841 */ 842 @VisibleForTesting 843 FileStatus[] getFiles() throws IOException { 844 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 845 } 846 847 @Override 848 public void shutdown() throws IOException { 849 if (!shutdown.compareAndSet(false, true)) { 850 return; 851 } 852 closed = true; 853 // Tell our listeners that the log is closing 854 if (!this.listeners.isEmpty()) { 855 for (WALActionsListener i : this.listeners) { 856 i.logCloseRequested(); 857 } 858 } 859 rollWriterLock.lock(); 860 try { 861 doShutdown(); 862 } finally { 863 rollWriterLock.unlock(); 864 } 865 } 866 867 @Override 868 public void close() throws IOException { 869 shutdown(); 870 final FileStatus[] files = getFiles(); 871 if (null != files && 0 != files.length) { 872 for (FileStatus file : files) { 873 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 874 // Tell our listeners that a log is going to be archived. 875 if (!this.listeners.isEmpty()) { 876 for (WALActionsListener i : this.listeners) { 877 i.preLogArchive(file.getPath(), p); 878 } 879 } 880 881 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 882 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 883 } 884 // Tell our listeners that a log was archived. 885 if (!this.listeners.isEmpty()) { 886 for (WALActionsListener i : this.listeners) { 887 i.postLogArchive(file.getPath(), p); 888 } 889 } 890 } 891 LOG.debug( 892 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 893 } 894 LOG.info("Closed WAL: " + toString()); 895 } 896 897 /** 898 * updates the sequence number of a specific store. depending on the flag: replaces current seq 899 * number if the given seq id is bigger, or even if it is lower than existing one 900 */ 901 @Override 902 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 903 boolean onlyIfGreater) { 904 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 905 } 906 907 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 908 return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync); 909 } 910 911 protected boolean isLogRollRequested() { 912 return rollRequested.get(); 913 } 914 915 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 916 // If we have already requested a roll, don't do it again 917 // And only set rollRequested to true when there is a registered listener 918 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 919 for (WALActionsListener i : this.listeners) { 920 i.logRollRequested(reason); 921 } 922 } 923 } 924 925 long getUnflushedEntriesCount() { 926 long highestSynced = this.highestSyncedTxid.get(); 927 long highestUnsynced = this.highestUnsyncedTxid; 928 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 929 } 930 931 boolean isUnflushedEntries() { 932 return getUnflushedEntriesCount() > 0; 933 } 934 935 /** 936 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 937 */ 938 @VisibleForTesting 939 protected void atHeadOfRingBufferEventHandlerAppend() { 940 // Noop 941 } 942 943 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 944 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 945 atHeadOfRingBufferEventHandlerAppend(); 946 long start = EnvironmentEdgeManager.currentTime(); 947 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 948 long regionSequenceId = entry.getKey().getSequenceId(); 949 950 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 951 // region sequence id only, a region edit/sequence id that is not associated with an actual 952 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 953 if (entry.getEdit().isEmpty()) { 954 return false; 955 } 956 957 // Coprocessor hook. 958 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 959 if (!listeners.isEmpty()) { 960 for (WALActionsListener i : listeners) { 961 i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); 962 } 963 } 964 doAppend(writer, entry); 965 assert highestUnsyncedTxid < entry.getTxid(); 966 highestUnsyncedTxid = entry.getTxid(); 967 if (entry.isCloseRegion()) { 968 // let's clean all the records of this region 969 sequenceIdAccounting.onRegionClose(encodedRegionName); 970 } else { 971 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 972 entry.isInMemStore()); 973 } 974 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 975 // Update metrics. 976 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 977 numEntries.incrementAndGet(); 978 return true; 979 } 980 981 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 982 long len = 0; 983 if (!listeners.isEmpty()) { 984 for (Cell cell : e.getEdit().getCells()) { 985 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 986 } 987 for (WALActionsListener listener : listeners) { 988 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 989 } 990 } 991 return len; 992 } 993 994 protected final void postSync(final long timeInNanos, final int handlerSyncs) { 995 if (timeInNanos > this.slowSyncNs) { 996 String msg = new StringBuilder().append("Slow sync cost: ") 997 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)) 998 .append(" ms, current pipeline: ") 999 .append(Arrays.toString(getPipeline())).toString(); 1000 TraceUtil.addTimelineAnnotation(msg); 1001 LOG.info(msg); 1002 // A single sync took too long. 1003 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1004 // effects. Here we have a single data point that indicates we should take immediate 1005 // action, so do so. 1006 if (timeInNanos > this.rollOnSyncNs) { 1007 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" + 1008 TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" + 1009 TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " + 1010 Arrays.toString(getPipeline())); 1011 requestLogRoll(SLOW_SYNC); 1012 } 1013 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1014 } 1015 if (!listeners.isEmpty()) { 1016 for (WALActionsListener listener : listeners) { 1017 listener.postSync(timeInNanos, handlerSyncs); 1018 } 1019 } 1020 } 1021 1022 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1023 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) 1024 throws IOException { 1025 if (this.closed) { 1026 throw new IOException( 1027 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1028 } 1029 MutableLong txidHolder = new MutableLong(); 1030 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1031 txidHolder.setValue(ringBuffer.next()); 1032 }); 1033 long txid = txidHolder.longValue(); 1034 ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) 1035 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); 1036 try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { 1037 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1038 entry.stampRegionSequenceId(we); 1039 ringBuffer.get(txid).load(entry); 1040 } finally { 1041 ringBuffer.publish(txid); 1042 } 1043 return txid; 1044 } 1045 1046 @Override 1047 public String toString() { 1048 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1049 } 1050 1051 /** 1052 * if the given {@code path} is being written currently, then return its length. 1053 * <p> 1054 * This is used by replication to prevent replicating unacked log entries. See 1055 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1056 */ 1057 @Override 1058 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1059 rollWriterLock.lock(); 1060 try { 1061 Path currentPath = getOldPath(); 1062 if (path.equals(currentPath)) { 1063 W writer = this.writer; 1064 return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); 1065 } else { 1066 return OptionalLong.empty(); 1067 } 1068 } finally { 1069 rollWriterLock.unlock(); 1070 } 1071 } 1072 1073 @Override 1074 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1075 return append(info, key, edits, true); 1076 } 1077 1078 @Override 1079 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) 1080 throws IOException { 1081 return append(info, key, edits, false); 1082 } 1083 1084 /** 1085 * Append a set of edits to the WAL. 1086 * <p/> 1087 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1088 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1089 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1090 * <p/> 1091 * NOTE: This append, at a time that is usually after this call returns, starts an mvcc 1092 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1093 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1094 * 'complete' the transaction this mvcc transaction by calling 1095 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1096 * in the finally of a try/finally block within which this append lives and any subsequent 1097 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1098 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1099 * immediately available on return from this method. It WILL be available subsequent to a sync of 1100 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1101 * @param info the regioninfo associated with append 1102 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1103 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1104 * sequence id that is after all currently appended edits. 1105 * @param inMemstore Always true except for case where we are writing a region event meta 1106 * marker edit, for example, a compaction completion record into the WAL or noting a 1107 * Region Open event. In these cases the entry is just so we can finish an unfinished 1108 * compaction after a crash when the new Server reads the WAL on recovery, etc. These 1109 * transition event 'Markers' do not go via the memstore. When memstore is false, 1110 * we presume a Marker event edit. 1111 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1112 * in it. 1113 */ 1114 protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1115 throws IOException; 1116 1117 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1118 1119 protected abstract W createWriterInstance(Path path) 1120 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1121 1122 /** 1123 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1124 * will begin to work before returning from this method. If we clear the flag after returning from 1125 * this call, we may miss a roll request. The implementation class should choose a proper place to 1126 * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you 1127 * start writing to the new writer. 1128 */ 1129 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1130 throws IOException; 1131 1132 protected abstract void doShutdown() throws IOException; 1133 1134 protected abstract boolean doCheckLogLowReplication(); 1135 1136 /** 1137 * @return true if we exceeded the slow sync roll threshold over the last check 1138 * interval 1139 */ 1140 protected boolean doCheckSlowSync() { 1141 boolean result = false; 1142 long now = EnvironmentEdgeManager.currentTime(); 1143 long elapsedTime = now - lastTimeCheckSlowSync; 1144 if (elapsedTime >= slowSyncCheckInterval) { 1145 if (slowSyncCount.get() >= slowSyncRollThreshold) { 1146 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 1147 // If two or more slowSyncCheckInterval have elapsed this is a corner case 1148 // where a train of slow syncs almost triggered us but then there was a long 1149 // interval from then until the one more that pushed us over. If so, we 1150 // should do nothing and let the count reset. 1151 if (LOG.isDebugEnabled()) { 1152 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + 1153 "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + 1154 ", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" + 1155 slowSyncCheckInterval + " ms"); 1156 } 1157 // Fall through to count reset below 1158 } else { 1159 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" + 1160 slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + 1161 ", current pipeline: " + Arrays.toString(getPipeline())); 1162 result = true; 1163 } 1164 } 1165 lastTimeCheckSlowSync = now; 1166 slowSyncCount.set(0); 1167 } 1168 return result; 1169 } 1170 1171 public void checkLogLowReplication(long checkInterval) { 1172 long now = EnvironmentEdgeManager.currentTime(); 1173 if (now - lastTimeCheckLowReplication < checkInterval) { 1174 return; 1175 } 1176 // Will return immediately if we are in the middle of a WAL log roll currently. 1177 if (!rollWriterLock.tryLock()) { 1178 return; 1179 } 1180 try { 1181 lastTimeCheckLowReplication = now; 1182 if (doCheckLogLowReplication()) { 1183 requestLogRoll(LOW_REPLICATION); 1184 } 1185 } finally { 1186 rollWriterLock.unlock(); 1187 } 1188 } 1189 1190 /** 1191 * This method gets the pipeline for the current WAL. 1192 */ 1193 @VisibleForTesting 1194 abstract DatanodeInfo[] getPipeline(); 1195 1196 /** 1197 * This method gets the datanode replication count for the current WAL. 1198 */ 1199 @VisibleForTesting 1200 abstract int getLogReplication(); 1201 1202 private static void split(final Configuration conf, final Path p) throws IOException { 1203 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 1204 if (!fs.exists(p)) { 1205 throw new FileNotFoundException(p.toString()); 1206 } 1207 if (!fs.getFileStatus(p).isDirectory()) { 1208 throw new IOException(p + " is not a directory"); 1209 } 1210 1211 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 1212 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1213 if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1214 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { 1215 archiveDir = new Path(archiveDir, p.getName()); 1216 } 1217 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1218 } 1219 1220 private static void usage() { 1221 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1222 System.err.println("Arguments:"); 1223 System.err.println(" --dump Dump textual representation of passed one or more files"); 1224 System.err.println(" For example: " + 1225 "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1226 System.err.println(" --split Split the passed directory of WAL logs"); 1227 System.err.println( 1228 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1229 } 1230 1231 /** 1232 * Pass one or more log file names and it will either dump out a text version on 1233 * <code>stdout</code> or split the specified log files. 1234 */ 1235 public static void main(String[] args) throws IOException { 1236 if (args.length < 2) { 1237 usage(); 1238 System.exit(-1); 1239 } 1240 // either dump using the WALPrettyPrinter or split, depending on args 1241 if (args[0].compareTo("--dump") == 0) { 1242 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1243 } else if (args[0].compareTo("--perf") == 0) { 1244 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1245 LOG.error(HBaseMarkers.FATAL, 1246 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1247 System.exit(-1); 1248 } else if (args[0].compareTo("--split") == 0) { 1249 Configuration conf = HBaseConfiguration.create(); 1250 for (int i = 1; i < args.length; i++) { 1251 try { 1252 Path logPath = new Path(args[i]); 1253 CommonFSUtils.setFsDefault(conf, logPath); 1254 split(conf, logPath); 1255 } catch (IOException t) { 1256 t.printStackTrace(System.err); 1257 System.exit(-1); 1258 } 1259 } 1260 } else { 1261 usage(); 1262 System.exit(-1); 1263 } 1264 } 1265}