001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 023import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 024import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 026 027import com.lmax.disruptor.RingBuffer; 028import java.io.FileNotFoundException; 029import java.io.IOException; 030import java.io.InterruptedIOException; 031import java.lang.management.MemoryType; 032import java.net.URLEncoder; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Comparator; 036import java.util.List; 037import java.util.Map; 038import java.util.OptionalLong; 039import java.util.Set; 040import java.util.concurrent.ConcurrentNavigableMap; 041import java.util.concurrent.ConcurrentSkipListMap; 042import java.util.concurrent.CopyOnWriteArrayList; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Executors; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicBoolean; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.concurrent.atomic.AtomicLong; 050import java.util.concurrent.locks.ReentrantLock; 051import org.apache.commons.lang3.mutable.MutableLong; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FileStatus; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.fs.PathFilter; 057import org.apache.hadoop.hbase.Abortable; 058import org.apache.hadoop.hbase.Cell; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.PrivateCellUtil; 062import org.apache.hadoop.hbase.client.RegionInfo; 063import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 064import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 065import org.apache.hadoop.hbase.ipc.RpcServer; 066import org.apache.hadoop.hbase.ipc.ServerCall; 067import org.apache.hadoop.hbase.log.HBaseMarkers; 068import org.apache.hadoop.hbase.regionserver.HRegion; 069import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 070import org.apache.hadoop.hbase.trace.TraceUtil; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.CommonFSUtils; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.Pair; 075import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 076import org.apache.hadoop.hbase.wal.WAL; 077import org.apache.hadoop.hbase.wal.WALEdit; 078import org.apache.hadoop.hbase.wal.WALFactory; 079import org.apache.hadoop.hbase.wal.WALKeyImpl; 080import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 081import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 082import org.apache.hadoop.hbase.wal.WALSplitter; 083import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 084import org.apache.hadoop.util.StringUtils; 085import org.apache.htrace.core.TraceScope; 086import org.apache.yetus.audience.InterfaceAudience; 087import org.slf4j.Logger; 088import org.slf4j.LoggerFactory; 089 090import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 091 092/** 093 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 094 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 095 * This is done internal to the implementation. 096 * <p> 097 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 098 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 099 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 100 * flushed out to hfiles, and what is yet in WAL and in memory only. 101 * <p> 102 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 103 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 104 * (smaller) than the most-recent flush. 105 * <p> 106 * To read an WAL, call 107 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * 108 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 109 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 110 * we have made successful appends to the WAL and we then are unable to sync them, our current 111 * semantic is to return error to the client that the appends failed but also to abort the current 112 * context, usually the hosting server. We need to replay the WALs. <br> 113 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 114 * that the append failed. <br> 115 * TODO: replication may pick up these last edits though they have been marked as failed append 116 * (Need to keep our own file lengths, not rely on HDFS). 117 */ 118@InterfaceAudience.Private 119public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 120 121 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 122 123 protected static final String SLOW_SYNC_TIME_MS ="hbase.regionserver.wal.slowsync.ms"; 124 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 125 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 126 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 127 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 128 "hbase.regionserver.wal.slowsync.roll.threshold"; 129 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 130 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 131 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 132 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 133 134 protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 135 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 136 137 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 138 139 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 140 141 public static final String RING_BUFFER_SLOT_COUNT = 142 "hbase.regionserver.wal.disruptor.event.count"; 143 144 /** 145 * file system instance 146 */ 147 protected final FileSystem fs; 148 149 /** 150 * WAL directory, where all WAL files would be placed. 151 */ 152 protected final Path walDir; 153 154 /** 155 * dir path where old logs are kept. 156 */ 157 protected final Path walArchiveDir; 158 159 /** 160 * Matches just those wal files that belong to this wal instance. 161 */ 162 protected final PathFilter ourFiles; 163 164 /** 165 * Prefix of a WAL file, usually the region server name it is hosted on. 166 */ 167 protected final String walFilePrefix; 168 169 /** 170 * Suffix included on generated wal file names 171 */ 172 protected final String walFileSuffix; 173 174 /** 175 * Prefix used when checking for wal membership. 176 */ 177 protected final String prefixPathStr; 178 179 protected final WALCoprocessorHost coprocessorHost; 180 181 /** 182 * conf object 183 */ 184 protected final Configuration conf; 185 186 protected final Abortable abortable; 187 188 /** Listeners that are called on WAL events. */ 189 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 190 191 /** 192 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 193 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 194 * facility for answering questions such as "Is it safe to GC a WAL?". 195 */ 196 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 197 198 protected final long slowSyncNs, rollOnSyncNs; 199 protected final int slowSyncRollThreshold; 200 protected final int slowSyncCheckInterval; 201 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 202 203 private final long walSyncTimeoutNs; 204 205 // If > than this size, roll the log. 206 protected final long logrollsize; 207 208 /** 209 * Block size to use writing files. 210 */ 211 protected final long blocksize; 212 213 /* 214 * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too 215 * many and we crash, then will take forever replaying. Keep the number of logs tidy. 216 */ 217 protected final int maxLogs; 218 219 protected final boolean useHsync; 220 221 /** 222 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 223 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 224 * warning when it thinks synchronized controls writer thread safety. It is held when we are 225 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 226 * not. 227 */ 228 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 229 230 // The timestamp (in ms) when the log file was created. 231 protected final AtomicLong filenum = new AtomicLong(-1); 232 233 // Number of transactions in the current Wal. 234 protected final AtomicInteger numEntries = new AtomicInteger(0); 235 236 /** 237 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 238 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 239 * corresponding entry in queue. 240 */ 241 protected volatile long highestUnsyncedTxid = -1; 242 243 /** 244 * Updated to the transaction id of the last successful sync call. This can be less than 245 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 246 * for it. 247 */ 248 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 249 250 /** 251 * The total size of wal 252 */ 253 protected final AtomicLong totalLogSize = new AtomicLong(0); 254 /** 255 * Current log file. 256 */ 257 volatile W writer; 258 259 // Last time to check low replication on hlog's pipeline 260 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 261 262 // Last time we asked to roll the log due to a slow sync 263 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 264 265 protected volatile boolean closed = false; 266 267 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 268 /** 269 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 270 * an IllegalArgumentException if used to compare paths from different wals. 271 */ 272 final Comparator<Path> LOG_NAME_COMPARATOR = 273 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 274 275 private static final class WalProps { 276 277 /** 278 * Map the encoded region name to the highest sequence id. 279 * <p/>Contains all the regions it has an entry for. 280 */ 281 public final Map<byte[], Long> encodedName2HighestSequenceId; 282 283 /** 284 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 285 * sub classes. 286 */ 287 public final long logSize; 288 289 public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 290 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 291 this.logSize = logSize; 292 } 293 } 294 295 /** 296 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 297 * (contained in the log file name). 298 */ 299 protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = 300 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 301 302 /** 303 * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures. 304 * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228 305 * <p> 306 */ 307 private final ThreadLocal<SyncFuture> cachedSyncFutures; 308 309 /** 310 * The class name of the runtime implementation, used as prefix for logging/tracing. 311 * <p> 312 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 313 * refer to HBASE-17676 for more details 314 * </p> 315 */ 316 protected final String implClassName; 317 318 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 319 320 private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( 321 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Log-Archiver-%d").build()); 322 323 private final int archiveRetries; 324 325 public long getFilenum() { 326 return this.filenum.get(); 327 } 328 329 /** 330 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 331 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 332 * the filename is created with the {@link #computeFilename(long filenum)} method. 333 * @return timestamp, as in the log file name. 334 */ 335 protected long getFileNumFromFileName(Path fileName) { 336 checkNotNull(fileName, "file name can't be null"); 337 if (!ourFiles.accept(fileName)) { 338 throw new IllegalArgumentException( 339 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 340 } 341 final String fileNameString = fileName.toString(); 342 String chompedPath = fileNameString.substring(prefixPathStr.length(), 343 (fileNameString.length() - walFileSuffix.length())); 344 return Long.parseLong(chompedPath); 345 } 346 347 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 348 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 349 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 350 } 351 352 // must be power of 2 353 protected final int getPreallocatedEventCount() { 354 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 355 // be stuck and make no progress if the buffer is filled with appends only and there is no 356 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 357 // before they return. 358 int preallocatedEventCount = 359 this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 360 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 361 int floor = Integer.highestOneBit(preallocatedEventCount); 362 if (floor == preallocatedEventCount) { 363 return floor; 364 } 365 // max capacity is 1 << 30 366 if (floor >= 1 << 29) { 367 return 1 << 30; 368 } 369 return floor << 1; 370 } 371 372 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 373 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 374 final boolean failIfWALExists, final String prefix, final String suffix) 375 throws FailedLogCloseException, IOException { 376 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 377 } 378 379 protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, 380 final String logDir, final String archiveDir, final Configuration conf, 381 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 382 final String suffix) 383 throws FailedLogCloseException, IOException { 384 this.fs = fs; 385 this.walDir = new Path(rootDir, logDir); 386 this.walArchiveDir = new Path(rootDir, archiveDir); 387 this.conf = conf; 388 this.abortable = abortable; 389 390 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 391 throw new IOException("Unable to mkdir " + walDir); 392 } 393 394 if (!fs.exists(this.walArchiveDir)) { 395 if (!fs.mkdirs(this.walArchiveDir)) { 396 throw new IOException("Unable to mkdir " + this.walArchiveDir); 397 } 398 } 399 400 // If prefix is null||empty then just name it wal 401 this.walFilePrefix = 402 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 403 // we only correctly differentiate suffices when numeric ones start with '.' 404 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 405 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + 406 "' but instead was '" + suffix + "'"); 407 } 408 // Now that it exists, set the storage policy for the entire directory of wal files related to 409 // this FSHLog instance 410 String storagePolicy = 411 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 412 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 413 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 414 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 415 416 this.ourFiles = new PathFilter() { 417 @Override 418 public boolean accept(final Path fileName) { 419 // The path should start with dir/<prefix> and end with our suffix 420 final String fileNameString = fileName.toString(); 421 if (!fileNameString.startsWith(prefixPathStr)) { 422 return false; 423 } 424 if (walFileSuffix.isEmpty()) { 425 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 426 return org.apache.commons.lang3.StringUtils 427 .isNumeric(fileNameString.substring(prefixPathStr.length())); 428 } else if (!fileNameString.endsWith(walFileSuffix)) { 429 return false; 430 } 431 return true; 432 } 433 }; 434 435 if (failIfWALExists) { 436 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 437 if (null != walFiles && 0 != walFiles.length) { 438 throw new IOException("Target WAL already exists within directory " + walDir); 439 } 440 } 441 442 // Register listeners. TODO: Should this exist anymore? We have CPs? 443 if (listeners != null) { 444 for (WALActionsListener i : listeners) { 445 registerWALActionsListener(i); 446 } 447 } 448 this.coprocessorHost = new WALCoprocessorHost(this, conf); 449 450 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 451 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 452 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 453 // the block size but experience from the field has it that this was not enough time for the 454 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 455 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 456 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 457 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 458 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 459 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 460 this.logrollsize = (long) (this.blocksize * multiplier); 461 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 462 463 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + 464 StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + 465 walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir + 466 ", maxLogs=" + this.maxLogs); 467 this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, 468 conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS))); 469 this.rollOnSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, 470 DEFAULT_ROLL_ON_SYNC_TIME_MS)); 471 this.slowSyncRollThreshold = conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, 472 DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 473 this.slowSyncCheckInterval = conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, 474 DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 475 this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, 476 conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS))); 477 this.cachedSyncFutures = new ThreadLocal<SyncFuture>() { 478 @Override 479 protected SyncFuture initialValue() { 480 return new SyncFuture(); 481 } 482 }; 483 this.implClassName = getClass().getSimpleName(); 484 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 485 archiveRetries = this.conf.getInt("hbase.regionserver.logroll.archive.retries", 0); 486 } 487 488 /** 489 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 490 */ 491 public void init() throws IOException { 492 rollWriter(); 493 } 494 495 @Override 496 public void registerWALActionsListener(WALActionsListener listener) { 497 this.listeners.add(listener); 498 } 499 500 @Override 501 public boolean unregisterWALActionsListener(WALActionsListener listener) { 502 return this.listeners.remove(listener); 503 } 504 505 @Override 506 public WALCoprocessorHost getCoprocessorHost() { 507 return coprocessorHost; 508 } 509 510 @Override 511 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 512 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 513 } 514 515 @Override 516 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 517 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 518 } 519 520 @Override 521 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 522 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 523 } 524 525 @Override 526 public void abortCacheFlush(byte[] encodedRegionName) { 527 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 528 } 529 530 @Override 531 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 532 // Used by tests. Deprecated as too subtle for general usage. 533 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 534 } 535 536 @Override 537 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 538 // This method is used by tests and for figuring if we should flush or not because our 539 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 540 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 541 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 542 // currently flushing sequence ids, and if anything found there, it is returning these. This is 543 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 544 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 545 // id is old even though we are currently flushing. This may mean we do too much flushing. 546 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 547 } 548 549 @Override 550 public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { 551 return rollWriter(false); 552 } 553 554 /** 555 * This is a convenience method that computes a new filename with a given file-number. 556 * @param filenum to use 557 * @return Path 558 */ 559 protected Path computeFilename(final long filenum) { 560 if (filenum < 0) { 561 throw new RuntimeException("WAL file number can't be < 0"); 562 } 563 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 564 return new Path(walDir, child); 565 } 566 567 /** 568 * This is a convenience method that computes a new filename with a given using the current WAL 569 * file-number 570 * @return Path 571 */ 572 public Path getCurrentFileName() { 573 return computeFilename(this.filenum.get()); 574 } 575 576 /** 577 * retrieve the next path to use for writing. Increments the internal filenum. 578 */ 579 private Path getNewPath() throws IOException { 580 this.filenum.set(System.currentTimeMillis()); 581 Path newPath = getCurrentFileName(); 582 while (fs.exists(newPath)) { 583 this.filenum.incrementAndGet(); 584 newPath = getCurrentFileName(); 585 } 586 return newPath; 587 } 588 589 Path getOldPath() { 590 long currentFilenum = this.filenum.get(); 591 Path oldPath = null; 592 if (currentFilenum > 0) { 593 // ComputeFilename will take care of meta wal filename 594 oldPath = computeFilename(currentFilenum); 595 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 596 return oldPath; 597 } 598 599 /** 600 * Tell listeners about pre log roll. 601 */ 602 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 603 throws IOException { 604 coprocessorHost.preWALRoll(oldPath, newPath); 605 606 if (!this.listeners.isEmpty()) { 607 for (WALActionsListener i : this.listeners) { 608 i.preLogRoll(oldPath, newPath); 609 } 610 } 611 } 612 613 /** 614 * Tell listeners about post log roll. 615 */ 616 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 617 throws IOException { 618 if (!this.listeners.isEmpty()) { 619 for (WALActionsListener i : this.listeners) { 620 i.postLogRoll(oldPath, newPath); 621 } 622 } 623 624 coprocessorHost.postWALRoll(oldPath, newPath); 625 } 626 627 // public only until class moves to o.a.h.h.wal 628 /** @return the number of rolled log files */ 629 public int getNumRolledLogFiles() { 630 return walFile2Props.size(); 631 } 632 633 // public only until class moves to o.a.h.h.wal 634 /** @return the number of log files in use */ 635 public int getNumLogFiles() { 636 // +1 for current use log 637 return getNumRolledLogFiles() + 1; 638 } 639 640 /** 641 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, 642 * check the first (oldest) WAL, and return those regions which should be flushed so that 643 * it can be let-go/'archived'. 644 * @return stores of regions (encodedRegionNames) to flush in order to archive oldest WAL file. 645 */ 646 Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { 647 Map<byte[], List<byte[]>> regions = null; 648 int logCount = getNumRolledLogFiles(); 649 if (logCount > this.maxLogs && logCount > 0) { 650 Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); 651 regions = 652 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 653 } 654 if (regions != null) { 655 List<String> listForPrint = new ArrayList(); 656 for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { 657 StringBuilder families = new StringBuilder(); 658 for (int i = 0; i < r.getValue().size(); i++) { 659 if (i > 0) { 660 families.append(","); 661 } 662 families.append(Bytes.toString(r.getValue().get(i))); 663 } 664 listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); 665 } 666 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + 667 "; forcing (partial) flush of " + regions.size() + " region(s): " + 668 StringUtils.join(",", listForPrint)); 669 } 670 return regions; 671 } 672 673 /** 674 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 675 */ 676 private void cleanOldLogs() throws IOException { 677 List<Pair<Path, Long>> logsToArchive = null; 678 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 679 // are older than what is currently in memory, the WAL can be GC'd. 680 for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) { 681 Path log = e.getKey(); 682 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 683 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 684 if (logsToArchive == null) { 685 logsToArchive = new ArrayList<>(); 686 } 687 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 688 if (LOG.isTraceEnabled()) { 689 LOG.trace("WAL file ready for archiving " + log); 690 } 691 } 692 } 693 694 if (logsToArchive != null) { 695 final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; 696 // make it async 697 for (Pair<Path, Long> log : localLogsToArchive) { 698 logArchiveExecutor.execute(() -> { 699 archive(log); 700 }); 701 this.walFile2Props.remove(log.getFirst()); 702 } 703 } 704 } 705 706 protected void archive(final Pair<Path, Long> log) { 707 int retry = 1; 708 while (true) { 709 try { 710 archiveLogFile(log.getFirst()); 711 totalLogSize.addAndGet(-log.getSecond()); 712 // successful 713 break; 714 } catch (Throwable e) { 715 if (retry > archiveRetries) { 716 LOG.error("Failed log archiving for the log {},", log.getFirst(), e); 717 if (this.abortable != null) { 718 this.abortable.abort("Failed log archiving", e); 719 break; 720 } 721 } else { 722 LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, 723 e); 724 } 725 retry++; 726 } 727 } 728 } 729 730 /* 731 * only public so WALSplitter can use. 732 * @return archived location of a WAL file with the given path p 733 */ 734 public static Path getWALArchivePath(Path archiveDir, Path p) { 735 return new Path(archiveDir, p.getName()); 736 } 737 738 protected void archiveLogFile(final Path p) throws IOException { 739 Path newPath = getWALArchivePath(this.walArchiveDir, p); 740 // Tell our listeners that a log is going to be archived. 741 if (!this.listeners.isEmpty()) { 742 for (WALActionsListener i : this.listeners) { 743 i.preLogArchive(p, newPath); 744 } 745 } 746 LOG.info("Archiving " + p + " to " + newPath); 747 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 748 throw new IOException("Unable to rename " + p + " to " + newPath); 749 } 750 // Tell our listeners that a log has been archived. 751 if (!this.listeners.isEmpty()) { 752 for (WALActionsListener i : this.listeners) { 753 i.postLogArchive(p, newPath); 754 } 755 } 756 } 757 758 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 759 int oldNumEntries = this.numEntries.getAndSet(0); 760 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 761 if (oldPath != null) { 762 this.walFile2Props.put(oldPath, 763 new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 764 this.totalLogSize.addAndGet(oldFileLen); 765 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 766 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 767 newPathString); 768 } else { 769 LOG.info("New WAL {}", newPathString); 770 } 771 } 772 773 /** 774 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 775 * <p/> 776 * <ul> 777 * <li>In the case of creating a new WAL, oldPath will be null.</li> 778 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 779 * </li> 780 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 781 * null.</li> 782 * </ul> 783 * @param oldPath may be null 784 * @param newPath may be null 785 * @param nextWriter may be null 786 * @return the passed in <code>newPath</code> 787 * @throws IOException if there is a problem flushing or closing the underlying FS 788 */ 789 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 790 try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { 791 doReplaceWriter(oldPath, newPath, nextWriter); 792 return newPath; 793 } 794 } 795 796 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 797 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 798 try { 799 if (syncFuture != null) { 800 if (closed) { 801 throw new IOException("WAL has been closed"); 802 } else { 803 syncFuture.get(walSyncTimeoutNs); 804 } 805 } 806 } catch (TimeoutIOException tioe) { 807 // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer 808 // still refer to it, so if this thread use it next time may get a wrong 809 // result. 810 this.cachedSyncFutures.remove(); 811 throw tioe; 812 } catch (InterruptedException ie) { 813 LOG.warn("Interrupted", ie); 814 throw convertInterruptedExceptionToIOException(ie); 815 } catch (ExecutionException e) { 816 throw ensureIOException(e.getCause()); 817 } 818 } 819 820 private static IOException ensureIOException(final Throwable t) { 821 return (t instanceof IOException) ? (IOException) t : new IOException(t); 822 } 823 824 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 825 Thread.currentThread().interrupt(); 826 IOException ioe = new InterruptedIOException(); 827 ioe.initCause(ie); 828 return ioe; 829 } 830 831 @Override 832 public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { 833 rollWriterLock.lock(); 834 try { 835 // Return if nothing to flush. 836 if (!force && this.writer != null && this.numEntries.get() <= 0) { 837 return null; 838 } 839 Map<byte[], List<byte[]>> regionsToFlush = null; 840 if (this.closed) { 841 LOG.debug("WAL closed. Skipping rolling of writer"); 842 return regionsToFlush; 843 } 844 try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { 845 Path oldPath = getOldPath(); 846 Path newPath = getNewPath(); 847 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 848 W nextWriter = this.createWriterInstance(newPath); 849 tellListenersAboutPreLogRoll(oldPath, newPath); 850 // NewPath could be equal to oldPath if replaceWriter fails. 851 newPath = replaceWriter(oldPath, newPath, nextWriter); 852 tellListenersAboutPostLogRoll(oldPath, newPath); 853 if (LOG.isDebugEnabled()) { 854 LOG.debug("Create new " + implClassName + " writer with pipeline: " + 855 Arrays.toString(getPipeline())); 856 } 857 // We got a new writer, so reset the slow sync count 858 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 859 slowSyncCount.set(0); 860 // Can we delete any of the old log files? 861 if (getNumRolledLogFiles() > 0) { 862 cleanOldLogs(); 863 regionsToFlush = findRegionsToForceFlush(); 864 } 865 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 866 // If the underlying FileSystem can't do what we ask, treat as IO failure so 867 // we'll abort. 868 throw new IOException( 869 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 870 exception); 871 } 872 return regionsToFlush; 873 } finally { 874 rollWriterLock.unlock(); 875 } 876 } 877 878 // public only until class moves to o.a.h.h.wal 879 /** @return the size of log files in use */ 880 public long getLogFileSize() { 881 return this.totalLogSize.get(); 882 } 883 884 // public only until class moves to o.a.h.h.wal 885 public void requestLogRoll() { 886 requestLogRoll(ERROR); 887 } 888 889 /** 890 * Get the backing files associated with this WAL. 891 * @return may be null if there are no files. 892 */ 893 FileStatus[] getFiles() throws IOException { 894 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 895 } 896 897 @Override 898 public void shutdown() throws IOException { 899 if (!shutdown.compareAndSet(false, true)) { 900 return; 901 } 902 closed = true; 903 // Tell our listeners that the log is closing 904 if (!this.listeners.isEmpty()) { 905 for (WALActionsListener i : this.listeners) { 906 i.logCloseRequested(); 907 } 908 } 909 rollWriterLock.lock(); 910 try { 911 doShutdown(); 912 if (logArchiveExecutor != null) { 913 logArchiveExecutor.shutdownNow(); 914 } 915 } finally { 916 rollWriterLock.unlock(); 917 } 918 } 919 920 @Override 921 public void close() throws IOException { 922 shutdown(); 923 final FileStatus[] files = getFiles(); 924 if (null != files && 0 != files.length) { 925 for (FileStatus file : files) { 926 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 927 // Tell our listeners that a log is going to be archived. 928 if (!this.listeners.isEmpty()) { 929 for (WALActionsListener i : this.listeners) { 930 i.preLogArchive(file.getPath(), p); 931 } 932 } 933 934 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 935 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 936 } 937 // Tell our listeners that a log was archived. 938 if (!this.listeners.isEmpty()) { 939 for (WALActionsListener i : this.listeners) { 940 i.postLogArchive(file.getPath(), p); 941 } 942 } 943 } 944 LOG.debug( 945 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 946 } 947 LOG.info("Closed WAL: " + toString()); 948 } 949 950 /** 951 * updates the sequence number of a specific store. depending on the flag: replaces current seq 952 * number if the given seq id is bigger, or even if it is lower than existing one 953 */ 954 @Override 955 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 956 boolean onlyIfGreater) { 957 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 958 } 959 960 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 961 return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync); 962 } 963 964 protected boolean isLogRollRequested() { 965 return rollRequested.get(); 966 } 967 968 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 969 // If we have already requested a roll, don't do it again 970 // And only set rollRequested to true when there is a registered listener 971 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 972 for (WALActionsListener i : this.listeners) { 973 i.logRollRequested(reason); 974 } 975 } 976 } 977 978 long getUnflushedEntriesCount() { 979 long highestSynced = this.highestSyncedTxid.get(); 980 long highestUnsynced = this.highestUnsyncedTxid; 981 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 982 } 983 984 boolean isUnflushedEntries() { 985 return getUnflushedEntriesCount() > 0; 986 } 987 988 /** 989 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 990 */ 991 protected void atHeadOfRingBufferEventHandlerAppend() { 992 // Noop 993 } 994 995 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 996 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 997 atHeadOfRingBufferEventHandlerAppend(); 998 long start = EnvironmentEdgeManager.currentTime(); 999 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 1000 long regionSequenceId = entry.getKey().getSequenceId(); 1001 1002 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1003 // region sequence id only, a region edit/sequence id that is not associated with an actual 1004 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1005 if (entry.getEdit().isEmpty()) { 1006 return false; 1007 } 1008 1009 // Coprocessor hook. 1010 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1011 if (!listeners.isEmpty()) { 1012 for (WALActionsListener i : listeners) { 1013 i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); 1014 } 1015 } 1016 doAppend(writer, entry); 1017 assert highestUnsyncedTxid < entry.getTxid(); 1018 highestUnsyncedTxid = entry.getTxid(); 1019 if (entry.isCloseRegion()) { 1020 // let's clean all the records of this region 1021 sequenceIdAccounting.onRegionClose(encodedRegionName); 1022 } else { 1023 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1024 entry.isInMemStore()); 1025 } 1026 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1027 // Update metrics. 1028 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1029 numEntries.incrementAndGet(); 1030 return true; 1031 } 1032 1033 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 1034 long len = 0; 1035 if (!listeners.isEmpty()) { 1036 for (Cell cell : e.getEdit().getCells()) { 1037 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 1038 } 1039 for (WALActionsListener listener : listeners) { 1040 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 1041 } 1042 } 1043 return len; 1044 } 1045 1046 protected final void postSync(final long timeInNanos, final int handlerSyncs) { 1047 if (timeInNanos > this.slowSyncNs) { 1048 String msg = new StringBuilder().append("Slow sync cost: ") 1049 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)) 1050 .append(" ms, current pipeline: ") 1051 .append(Arrays.toString(getPipeline())).toString(); 1052 TraceUtil.addTimelineAnnotation(msg); 1053 LOG.info(msg); 1054 // A single sync took too long. 1055 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1056 // effects. Here we have a single data point that indicates we should take immediate 1057 // action, so do so. 1058 if (timeInNanos > this.rollOnSyncNs) { 1059 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" + 1060 TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" + 1061 TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " + 1062 Arrays.toString(getPipeline())); 1063 requestLogRoll(SLOW_SYNC); 1064 } 1065 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1066 } 1067 if (!listeners.isEmpty()) { 1068 for (WALActionsListener listener : listeners) { 1069 listener.postSync(timeInNanos, handlerSyncs); 1070 } 1071 } 1072 } 1073 1074 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1075 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) 1076 throws IOException { 1077 if (this.closed) { 1078 throw new IOException( 1079 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1080 } 1081 MutableLong txidHolder = new MutableLong(); 1082 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1083 txidHolder.setValue(ringBuffer.next()); 1084 }); 1085 long txid = txidHolder.longValue(); 1086 ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) 1087 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); 1088 try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { 1089 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1090 entry.stampRegionSequenceId(we); 1091 ringBuffer.get(txid).load(entry); 1092 } finally { 1093 ringBuffer.publish(txid); 1094 } 1095 return txid; 1096 } 1097 1098 @Override 1099 public String toString() { 1100 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1101 } 1102 1103 /** 1104 * if the given {@code path} is being written currently, then return its length. 1105 * <p> 1106 * This is used by replication to prevent replicating unacked log entries. See 1107 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1108 */ 1109 @Override 1110 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1111 rollWriterLock.lock(); 1112 try { 1113 Path currentPath = getOldPath(); 1114 if (path.equals(currentPath)) { 1115 W writer = this.writer; 1116 return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); 1117 } else { 1118 return OptionalLong.empty(); 1119 } 1120 } finally { 1121 rollWriterLock.unlock(); 1122 } 1123 } 1124 1125 @Override 1126 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1127 return append(info, key, edits, true); 1128 } 1129 1130 @Override 1131 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) 1132 throws IOException { 1133 return append(info, key, edits, false); 1134 } 1135 1136 /** 1137 * Append a set of edits to the WAL. 1138 * <p/> 1139 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1140 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1141 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1142 * <p/> 1143 * NOTE: This append, at a time that is usually after this call returns, starts an mvcc 1144 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1145 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1146 * 'complete' the transaction this mvcc transaction by calling 1147 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1148 * in the finally of a try/finally block within which this append lives and any subsequent 1149 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1150 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1151 * immediately available on return from this method. It WILL be available subsequent to a sync of 1152 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1153 * @param info the regioninfo associated with append 1154 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1155 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1156 * sequence id that is after all currently appended edits. 1157 * @param inMemstore Always true except for case where we are writing a region event meta 1158 * marker edit, for example, a compaction completion record into the WAL or noting a 1159 * Region Open event. In these cases the entry is just so we can finish an unfinished 1160 * compaction after a crash when the new Server reads the WAL on recovery, etc. These 1161 * transition event 'Markers' do not go via the memstore. When memstore is false, 1162 * we presume a Marker event edit. 1163 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1164 * in it. 1165 */ 1166 protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1167 throws IOException; 1168 1169 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1170 1171 protected abstract W createWriterInstance(Path path) 1172 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1173 1174 /** 1175 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1176 * will begin to work before returning from this method. If we clear the flag after returning from 1177 * this call, we may miss a roll request. The implementation class should choose a proper place to 1178 * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you 1179 * start writing to the new writer. 1180 */ 1181 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1182 throws IOException; 1183 1184 protected abstract void doShutdown() throws IOException; 1185 1186 protected abstract boolean doCheckLogLowReplication(); 1187 1188 /** 1189 * @return true if we exceeded the slow sync roll threshold over the last check 1190 * interval 1191 */ 1192 protected boolean doCheckSlowSync() { 1193 boolean result = false; 1194 long now = EnvironmentEdgeManager.currentTime(); 1195 long elapsedTime = now - lastTimeCheckSlowSync; 1196 if (elapsedTime >= slowSyncCheckInterval) { 1197 if (slowSyncCount.get() >= slowSyncRollThreshold) { 1198 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 1199 // If two or more slowSyncCheckInterval have elapsed this is a corner case 1200 // where a train of slow syncs almost triggered us but then there was a long 1201 // interval from then until the one more that pushed us over. If so, we 1202 // should do nothing and let the count reset. 1203 if (LOG.isDebugEnabled()) { 1204 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + 1205 "count=" + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + 1206 ", elapsedTime=" + elapsedTime + " ms, slowSyncCheckInterval=" + 1207 slowSyncCheckInterval + " ms"); 1208 } 1209 // Fall through to count reset below 1210 } else { 1211 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" + 1212 slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + 1213 ", current pipeline: " + Arrays.toString(getPipeline())); 1214 result = true; 1215 } 1216 } 1217 lastTimeCheckSlowSync = now; 1218 slowSyncCount.set(0); 1219 } 1220 return result; 1221 } 1222 1223 public void checkLogLowReplication(long checkInterval) { 1224 long now = EnvironmentEdgeManager.currentTime(); 1225 if (now - lastTimeCheckLowReplication < checkInterval) { 1226 return; 1227 } 1228 // Will return immediately if we are in the middle of a WAL log roll currently. 1229 if (!rollWriterLock.tryLock()) { 1230 return; 1231 } 1232 try { 1233 lastTimeCheckLowReplication = now; 1234 if (doCheckLogLowReplication()) { 1235 requestLogRoll(LOW_REPLICATION); 1236 } 1237 } finally { 1238 rollWriterLock.unlock(); 1239 } 1240 } 1241 1242 /** 1243 * This method gets the pipeline for the current WAL. 1244 */ 1245 abstract DatanodeInfo[] getPipeline(); 1246 1247 /** 1248 * This method gets the datanode replication count for the current WAL. 1249 */ 1250 abstract int getLogReplication(); 1251 1252 private static void split(final Configuration conf, final Path p) throws IOException { 1253 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 1254 if (!fs.exists(p)) { 1255 throw new FileNotFoundException(p.toString()); 1256 } 1257 if (!fs.getFileStatus(p).isDirectory()) { 1258 throw new IOException(p + " is not a directory"); 1259 } 1260 1261 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 1262 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1263 if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1264 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { 1265 archiveDir = new Path(archiveDir, p.getName()); 1266 } 1267 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1268 } 1269 1270 private static void usage() { 1271 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1272 System.err.println("Arguments:"); 1273 System.err.println(" --dump Dump textual representation of passed one or more files"); 1274 System.err.println(" For example: " + 1275 "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1276 System.err.println(" --split Split the passed directory of WAL logs"); 1277 System.err.println( 1278 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1279 } 1280 1281 /** 1282 * Pass one or more log file names and it will either dump out a text version on 1283 * <code>stdout</code> or split the specified log files. 1284 */ 1285 public static void main(String[] args) throws IOException { 1286 if (args.length < 2) { 1287 usage(); 1288 System.exit(-1); 1289 } 1290 // either dump using the WALPrettyPrinter or split, depending on args 1291 if (args[0].compareTo("--dump") == 0) { 1292 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1293 } else if (args[0].compareTo("--perf") == 0) { 1294 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1295 LOG.error(HBaseMarkers.FATAL, 1296 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1297 System.exit(-1); 1298 } else if (args[0].compareTo("--split") == 0) { 1299 Configuration conf = HBaseConfiguration.create(); 1300 for (int i = 1; i < args.length; i++) { 1301 try { 1302 Path logPath = new Path(args[i]); 1303 CommonFSUtils.setFsDefault(conf, logPath); 1304 split(conf, logPath); 1305 } catch (IOException t) { 1306 t.printStackTrace(System.err); 1307 System.exit(-1); 1308 } 1309 } 1310 } else { 1311 usage(); 1312 System.exit(-1); 1313 } 1314 } 1315}