001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 021import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 023 024import com.lmax.disruptor.RingBuffer; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.lang.management.MemoryType; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Comparator; 033import java.util.List; 034import java.util.Map; 035import java.util.OptionalLong; 036import java.util.Set; 037import java.util.concurrent.ConcurrentNavigableMap; 038import java.util.concurrent.ConcurrentSkipListMap; 039import java.util.concurrent.CopyOnWriteArrayList; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicInteger; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.locks.ReentrantLock; 046import org.apache.commons.lang3.mutable.MutableLong; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.PrivateCellUtil; 056import org.apache.hadoop.hbase.client.RegionInfo; 057import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 058import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 059import org.apache.hadoop.hbase.ipc.RpcServer; 060import org.apache.hadoop.hbase.ipc.ServerCall; 061import org.apache.hadoop.hbase.log.HBaseMarkers; 062import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 063import org.apache.hadoop.hbase.trace.TraceUtil; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.FSUtils; 068import org.apache.hadoop.hbase.util.Pair; 069import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 070import org.apache.hadoop.hbase.wal.WAL; 071import org.apache.hadoop.hbase.wal.WALEdit; 072import org.apache.hadoop.hbase.wal.WALFactory; 073import org.apache.hadoop.hbase.wal.WALKeyImpl; 074import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 075import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 076import org.apache.hadoop.hbase.wal.WALSplitter; 077import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 078import org.apache.hadoop.util.StringUtils; 079import org.apache.htrace.core.TraceScope; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 085 086/** 087 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 088 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 089 * This is done internal to the implementation. 090 * <p> 091 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 092 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 093 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 094 * flushed out to hfiles, and what is yet in WAL and in memory only. 095 * <p> 096 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 097 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 098 * (smaller) than the most-recent flush. 099 * <p> 100 * To read an WAL, call 101 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * 102 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 103 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 104 * we have made successful appends to the WAL and we then are unable to sync them, our current 105 * semantic is to return error to the client that the appends failed but also to abort the current 106 * context, usually the hosting server. We need to replay the WALs. <br> 107 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 108 * that the append failed. <br> 109 * TODO: replication may pick up these last edits though they have been marked as failed append 110 * (Need to keep our own file lengths, not rely on HDFS). 111 */ 112@InterfaceAudience.Private 113public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 114 115 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 116 117 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 118 119 private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 120 121 /** 122 * file system instance 123 */ 124 protected final FileSystem fs; 125 126 /** 127 * WAL directory, where all WAL files would be placed. 128 */ 129 protected final Path walDir; 130 131 /** 132 * dir path where old logs are kept. 133 */ 134 protected final Path walArchiveDir; 135 136 /** 137 * Matches just those wal files that belong to this wal instance. 138 */ 139 protected final PathFilter ourFiles; 140 141 /** 142 * Prefix of a WAL file, usually the region server name it is hosted on. 143 */ 144 protected final String walFilePrefix; 145 146 /** 147 * Suffix included on generated wal file names 148 */ 149 protected final String walFileSuffix; 150 151 /** 152 * Prefix used when checking for wal membership. 153 */ 154 protected final String prefixPathStr; 155 156 protected final WALCoprocessorHost coprocessorHost; 157 158 /** 159 * conf object 160 */ 161 protected final Configuration conf; 162 163 /** Listeners that are called on WAL events. */ 164 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 165 166 /** 167 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 168 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 169 * facility for answering questions such as "Is it safe to GC a WAL?". 170 */ 171 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 172 173 protected final long slowSyncNs; 174 175 private final long walSyncTimeoutNs; 176 177 // If > than this size, roll the log. 178 protected final long logrollsize; 179 180 /** 181 * Block size to use writing files. 182 */ 183 protected final long blocksize; 184 185 /* 186 * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too 187 * many and we crash, then will take forever replaying. Keep the number of logs tidy. 188 */ 189 protected final int maxLogs; 190 191 /** 192 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 193 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 194 * warning when it thinks synchronized controls writer thread safety. It is held when we are 195 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 196 * not. 197 */ 198 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 199 200 // The timestamp (in ms) when the log file was created. 201 protected final AtomicLong filenum = new AtomicLong(-1); 202 203 // Number of transactions in the current Wal. 204 protected final AtomicInteger numEntries = new AtomicInteger(0); 205 206 /** 207 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 208 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 209 * corresponding entry in queue. 210 */ 211 protected volatile long highestUnsyncedTxid = -1; 212 213 /** 214 * Updated to the transaction id of the last successful sync call. This can be less than 215 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 216 * for it. 217 */ 218 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 219 220 /** 221 * The total size of wal 222 */ 223 protected final AtomicLong totalLogSize = new AtomicLong(0); 224 /** 225 * Current log file. 226 */ 227 volatile W writer; 228 229 // Last time to check low replication on hlog's pipeline 230 private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 231 232 protected volatile boolean closed = false; 233 234 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 235 /** 236 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 237 * an IllegalArgumentException if used to compare paths from different wals. 238 */ 239 final Comparator<Path> LOG_NAME_COMPARATOR = 240 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 241 242 private static final class WalProps { 243 244 /** 245 * Map the encoded region name to the highest sequence id. Contain all the regions it has 246 * entries of 247 */ 248 public final Map<byte[], Long> encodedName2HighestSequenceId; 249 250 /** 251 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 252 * sub classes. 253 */ 254 public final long logSize; 255 256 public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 257 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 258 this.logSize = logSize; 259 } 260 } 261 262 /** 263 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 264 * (contained in the log file name). 265 */ 266 protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = 267 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 268 269 /** 270 * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures. 271 * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228 272 * <p> 273 */ 274 private final ThreadLocal<SyncFuture> cachedSyncFutures; 275 276 /** 277 * The class name of the runtime implementation, used as prefix for logging/tracing. 278 * <p> 279 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 280 * refer to HBASE-17676 for more details 281 * </p> 282 */ 283 protected final String implClassName; 284 285 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 286 287 public long getFilenum() { 288 return this.filenum.get(); 289 } 290 291 /** 292 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 293 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 294 * the filename is created with the {@link #computeFilename(long filenum)} method. 295 * @return timestamp, as in the log file name. 296 */ 297 protected long getFileNumFromFileName(Path fileName) { 298 checkNotNull(fileName, "file name can't be null"); 299 if (!ourFiles.accept(fileName)) { 300 throw new IllegalArgumentException( 301 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 302 } 303 final String fileNameString = fileName.toString(); 304 String chompedPath = fileNameString.substring(prefixPathStr.length(), 305 (fileNameString.length() - walFileSuffix.length())); 306 return Long.parseLong(chompedPath); 307 } 308 309 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 310 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 311 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 312 } 313 314 // must be power of 2 315 protected final int getPreallocatedEventCount() { 316 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 317 // be stuck and make no progress if the buffer is filled with appends only and there is no 318 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 319 // before they return. 320 int preallocatedEventCount = 321 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 322 checkArgument(preallocatedEventCount >= 0, 323 "hbase.regionserver.wal.disruptor.event.count must > 0"); 324 int floor = Integer.highestOneBit(preallocatedEventCount); 325 if (floor == preallocatedEventCount) { 326 return floor; 327 } 328 // max capacity is 1 << 30 329 if (floor >= 1 << 29) { 330 return 1 << 30; 331 } 332 return floor << 1; 333 } 334 335 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 336 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 337 final boolean failIfWALExists, final String prefix, final String suffix) 338 throws FailedLogCloseException, IOException { 339 this.fs = fs; 340 this.walDir = new Path(rootDir, logDir); 341 this.walArchiveDir = new Path(rootDir, archiveDir); 342 this.conf = conf; 343 344 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 345 throw new IOException("Unable to mkdir " + walDir); 346 } 347 348 if (!fs.exists(this.walArchiveDir)) { 349 if (!fs.mkdirs(this.walArchiveDir)) { 350 throw new IOException("Unable to mkdir " + this.walArchiveDir); 351 } 352 } 353 354 // If prefix is null||empty then just name it wal 355 this.walFilePrefix = 356 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 357 // we only correctly differentiate suffices when numeric ones start with '.' 358 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 359 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + 360 "' but instead was '" + suffix + "'"); 361 } 362 // Now that it exists, set the storage policy for the entire directory of wal files related to 363 // this FSHLog instance 364 String storagePolicy = 365 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 366 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 367 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 368 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 369 370 this.ourFiles = new PathFilter() { 371 @Override 372 public boolean accept(final Path fileName) { 373 // The path should start with dir/<prefix> and end with our suffix 374 final String fileNameString = fileName.toString(); 375 if (!fileNameString.startsWith(prefixPathStr)) { 376 return false; 377 } 378 if (walFileSuffix.isEmpty()) { 379 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 380 return org.apache.commons.lang3.StringUtils 381 .isNumeric(fileNameString.substring(prefixPathStr.length())); 382 } else if (!fileNameString.endsWith(walFileSuffix)) { 383 return false; 384 } 385 return true; 386 } 387 }; 388 389 if (failIfWALExists) { 390 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 391 if (null != walFiles && 0 != walFiles.length) { 392 throw new IOException("Target WAL already exists within directory " + walDir); 393 } 394 } 395 396 // Register listeners. TODO: Should this exist anymore? We have CPs? 397 if (listeners != null) { 398 for (WALActionsListener i : listeners) { 399 registerWALActionsListener(i); 400 } 401 } 402 this.coprocessorHost = new WALCoprocessorHost(this, conf); 403 404 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 405 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 406 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 407 // the block size but experience from the field has it that this was not enough time for the 408 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 409 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 410 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 411 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 412 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 413 float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f); 414 this.logrollsize = (long)(this.blocksize * multiplier); 415 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 416 Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 417 418 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + 419 StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + 420 walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); 421 this.slowSyncNs = TimeUnit.MILLISECONDS 422 .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); 423 this.walSyncTimeoutNs = TimeUnit.MILLISECONDS 424 .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); 425 this.cachedSyncFutures = new ThreadLocal<SyncFuture>() { 426 @Override 427 protected SyncFuture initialValue() { 428 return new SyncFuture(); 429 } 430 }; 431 this.implClassName = getClass().getSimpleName(); 432 } 433 434 @Override 435 public void registerWALActionsListener(WALActionsListener listener) { 436 this.listeners.add(listener); 437 } 438 439 @Override 440 public boolean unregisterWALActionsListener(WALActionsListener listener) { 441 return this.listeners.remove(listener); 442 } 443 444 @Override 445 public WALCoprocessorHost getCoprocessorHost() { 446 return coprocessorHost; 447 } 448 449 @Override 450 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 451 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 452 } 453 454 @Override 455 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 456 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 457 } 458 459 @Override 460 public void completeCacheFlush(byte[] encodedRegionName) { 461 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); 462 } 463 464 @Override 465 public void abortCacheFlush(byte[] encodedRegionName) { 466 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 467 } 468 469 @Override 470 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 471 // Used by tests. Deprecated as too subtle for general usage. 472 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 473 } 474 475 @Override 476 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 477 // This method is used by tests and for figuring if we should flush or not because our 478 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 479 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 480 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 481 // currently flushing sequence ids, and if anything found there, it is returning these. This is 482 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 483 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 484 // id is old even though we are currently flushing. This may mean we do too much flushing. 485 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 486 } 487 488 @Override 489 public byte[][] rollWriter() throws FailedLogCloseException, IOException { 490 return rollWriter(false); 491 } 492 493 /** 494 * This is a convenience method that computes a new filename with a given file-number. 495 * @param filenum to use 496 * @return Path 497 */ 498 protected Path computeFilename(final long filenum) { 499 if (filenum < 0) { 500 throw new RuntimeException("WAL file number can't be < 0"); 501 } 502 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 503 return new Path(walDir, child); 504 } 505 506 /** 507 * This is a convenience method that computes a new filename with a given using the current WAL 508 * file-number 509 * @return Path 510 */ 511 public Path getCurrentFileName() { 512 return computeFilename(this.filenum.get()); 513 } 514 515 /** 516 * retrieve the next path to use for writing. Increments the internal filenum. 517 */ 518 private Path getNewPath() throws IOException { 519 this.filenum.set(System.currentTimeMillis()); 520 Path newPath = getCurrentFileName(); 521 while (fs.exists(newPath)) { 522 this.filenum.incrementAndGet(); 523 newPath = getCurrentFileName(); 524 } 525 return newPath; 526 } 527 528 @VisibleForTesting 529 Path getOldPath() { 530 long currentFilenum = this.filenum.get(); 531 Path oldPath = null; 532 if (currentFilenum > 0) { 533 // ComputeFilename will take care of meta wal filename 534 oldPath = computeFilename(currentFilenum); 535 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 536 return oldPath; 537 } 538 539 /** 540 * Tell listeners about pre log roll. 541 */ 542 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 543 throws IOException { 544 coprocessorHost.preWALRoll(oldPath, newPath); 545 546 if (!this.listeners.isEmpty()) { 547 for (WALActionsListener i : this.listeners) { 548 i.preLogRoll(oldPath, newPath); 549 } 550 } 551 } 552 553 /** 554 * Tell listeners about post log roll. 555 */ 556 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 557 throws IOException { 558 if (!this.listeners.isEmpty()) { 559 for (WALActionsListener i : this.listeners) { 560 i.postLogRoll(oldPath, newPath); 561 } 562 } 563 564 coprocessorHost.postWALRoll(oldPath, newPath); 565 } 566 567 // public only until class moves to o.a.h.h.wal 568 /** @return the number of rolled log files */ 569 public int getNumRolledLogFiles() { 570 return walFile2Props.size(); 571 } 572 573 // public only until class moves to o.a.h.h.wal 574 /** @return the number of log files in use */ 575 public int getNumLogFiles() { 576 // +1 for current use log 577 return getNumRolledLogFiles() + 1; 578 } 579 580 /** 581 * If the number of un-archived WAL files is greater than maximum allowed, check the first 582 * (oldest) WAL file, and returns those regions which should be flushed so that it can be 583 * archived. 584 * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. 585 */ 586 byte[][] findRegionsToForceFlush() throws IOException { 587 byte[][] regions = null; 588 int logCount = getNumRolledLogFiles(); 589 if (logCount > this.maxLogs && logCount > 0) { 590 Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); 591 regions = 592 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 593 } 594 if (regions != null) { 595 StringBuilder sb = new StringBuilder(); 596 for (int i = 0; i < regions.length; i++) { 597 if (i > 0) { 598 sb.append(", "); 599 } 600 sb.append(Bytes.toStringBinary(regions[i])); 601 } 602 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + 603 "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); 604 } 605 return regions; 606 } 607 608 /** 609 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 610 */ 611 private void cleanOldLogs() throws IOException { 612 List<Pair<Path, Long>> logsToArchive = null; 613 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 614 // are older than what is currently in memory, the WAL can be GC'd. 615 for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) { 616 Path log = e.getKey(); 617 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 618 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 619 if (logsToArchive == null) { 620 logsToArchive = new ArrayList<>(); 621 } 622 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 623 if (LOG.isTraceEnabled()) { 624 LOG.trace("WAL file ready for archiving " + log); 625 } 626 } 627 } 628 if (logsToArchive != null) { 629 for (Pair<Path, Long> logAndSize : logsToArchive) { 630 this.totalLogSize.addAndGet(-logAndSize.getSecond()); 631 archiveLogFile(logAndSize.getFirst()); 632 this.walFile2Props.remove(logAndSize.getFirst()); 633 } 634 } 635 } 636 637 /* 638 * only public so WALSplitter can use. 639 * @return archived location of a WAL file with the given path p 640 */ 641 public static Path getWALArchivePath(Path archiveDir, Path p) { 642 return new Path(archiveDir, p.getName()); 643 } 644 645 private void archiveLogFile(final Path p) throws IOException { 646 Path newPath = getWALArchivePath(this.walArchiveDir, p); 647 // Tell our listeners that a log is going to be archived. 648 if (!this.listeners.isEmpty()) { 649 for (WALActionsListener i : this.listeners) { 650 i.preLogArchive(p, newPath); 651 } 652 } 653 LOG.info("Archiving " + p + " to " + newPath); 654 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 655 throw new IOException("Unable to rename " + p + " to " + newPath); 656 } 657 // Tell our listeners that a log has been archived. 658 if (!this.listeners.isEmpty()) { 659 for (WALActionsListener i : this.listeners) { 660 i.postLogArchive(p, newPath); 661 } 662 } 663 } 664 665 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 666 int oldNumEntries = this.numEntries.getAndSet(0); 667 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 668 if (oldPath != null) { 669 this.walFile2Props.put(oldPath, 670 new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 671 this.totalLogSize.addAndGet(oldFileLen); 672 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 673 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 674 newPathString); 675 } else { 676 LOG.info("New WAL {}", newPathString); 677 } 678 } 679 680 /** 681 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 682 * <p/> 683 * <ul> 684 * <li>In the case of creating a new WAL, oldPath will be null.</li> 685 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 686 * </li> 687 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 688 * null.</li> 689 * </ul> 690 * @param oldPath may be null 691 * @param newPath may be null 692 * @param nextWriter may be null 693 * @return the passed in <code>newPath</code> 694 * @throws IOException if there is a problem flushing or closing the underlying FS 695 */ 696 @VisibleForTesting 697 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 698 try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { 699 doReplaceWriter(oldPath, newPath, nextWriter); 700 return newPath; 701 } 702 } 703 704 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 705 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 706 try { 707 if (syncFuture != null) { 708 if (closed) { 709 throw new IOException("WAL has been closed"); 710 } else { 711 syncFuture.get(walSyncTimeoutNs); 712 } 713 } 714 } catch (TimeoutIOException tioe) { 715 // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer 716 // still refer to it, so if this thread use it next time may get a wrong 717 // result. 718 this.cachedSyncFutures.remove(); 719 throw tioe; 720 } catch (InterruptedException ie) { 721 LOG.warn("Interrupted", ie); 722 throw convertInterruptedExceptionToIOException(ie); 723 } catch (ExecutionException e) { 724 throw ensureIOException(e.getCause()); 725 } 726 } 727 728 private static IOException ensureIOException(final Throwable t) { 729 return (t instanceof IOException) ? (IOException) t : new IOException(t); 730 } 731 732 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 733 Thread.currentThread().interrupt(); 734 IOException ioe = new InterruptedIOException(); 735 ioe.initCause(ie); 736 return ioe; 737 } 738 739 @Override 740 public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { 741 rollWriterLock.lock(); 742 try { 743 // Return if nothing to flush. 744 if (!force && this.writer != null && this.numEntries.get() <= 0) { 745 return null; 746 } 747 byte[][] regionsToFlush = null; 748 if (this.closed) { 749 LOG.debug("WAL closed. Skipping rolling of writer"); 750 return regionsToFlush; 751 } 752 try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { 753 Path oldPath = getOldPath(); 754 Path newPath = getNewPath(); 755 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 756 W nextWriter = this.createWriterInstance(newPath); 757 tellListenersAboutPreLogRoll(oldPath, newPath); 758 // NewPath could be equal to oldPath if replaceWriter fails. 759 newPath = replaceWriter(oldPath, newPath, nextWriter); 760 tellListenersAboutPostLogRoll(oldPath, newPath); 761 if (LOG.isDebugEnabled()) { 762 LOG.debug("Create new " + implClassName + " writer with pipeline: " + 763 Arrays.toString(getPipeline())); 764 } 765 // Can we delete any of the old log files? 766 if (getNumRolledLogFiles() > 0) { 767 cleanOldLogs(); 768 regionsToFlush = findRegionsToForceFlush(); 769 } 770 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 771 // If the underlying FileSystem can't do what we ask, treat as IO failure so 772 // we'll abort. 773 throw new IOException( 774 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 775 exception); 776 } 777 return regionsToFlush; 778 } finally { 779 rollWriterLock.unlock(); 780 } 781 } 782 783 // public only until class moves to o.a.h.h.wal 784 /** @return the size of log files in use */ 785 public long getLogFileSize() { 786 return this.totalLogSize.get(); 787 } 788 789 // public only until class moves to o.a.h.h.wal 790 public void requestLogRoll() { 791 requestLogRoll(false); 792 } 793 794 /** 795 * Get the backing files associated with this WAL. 796 * @return may be null if there are no files. 797 */ 798 @VisibleForTesting 799 FileStatus[] getFiles() throws IOException { 800 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 801 } 802 803 @Override 804 public void shutdown() throws IOException { 805 if (!shutdown.compareAndSet(false, true)) { 806 return; 807 } 808 closed = true; 809 // Tell our listeners that the log is closing 810 if (!this.listeners.isEmpty()) { 811 for (WALActionsListener i : this.listeners) { 812 i.logCloseRequested(); 813 } 814 } 815 rollWriterLock.lock(); 816 try { 817 doShutdown(); 818 } finally { 819 rollWriterLock.unlock(); 820 } 821 } 822 823 @Override 824 public void close() throws IOException { 825 shutdown(); 826 final FileStatus[] files = getFiles(); 827 if (null != files && 0 != files.length) { 828 for (FileStatus file : files) { 829 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 830 // Tell our listeners that a log is going to be archived. 831 if (!this.listeners.isEmpty()) { 832 for (WALActionsListener i : this.listeners) { 833 i.preLogArchive(file.getPath(), p); 834 } 835 } 836 837 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 838 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 839 } 840 // Tell our listeners that a log was archived. 841 if (!this.listeners.isEmpty()) { 842 for (WALActionsListener i : this.listeners) { 843 i.postLogArchive(file.getPath(), p); 844 } 845 } 846 } 847 LOG.debug( 848 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 849 } 850 LOG.info("Closed WAL: " + toString()); 851 } 852 853 /** 854 * updates the sequence number of a specific store. depending on the flag: replaces current seq 855 * number if the given seq id is bigger, or even if it is lower than existing one 856 * @param encodedRegionName 857 * @param familyName 858 * @param sequenceid 859 * @param onlyIfGreater 860 */ 861 @Override 862 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 863 boolean onlyIfGreater) { 864 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 865 } 866 867 protected final SyncFuture getSyncFuture(long sequence) { 868 return cachedSyncFutures.get().reset(sequence); 869 } 870 871 protected boolean isLogRollRequested() { 872 return rollRequested.get(); 873 } 874 875 protected final void requestLogRoll(boolean tooFewReplicas) { 876 // If we have already requested a roll, don't do it again 877 // And only set rollRequested to true when there is a registered listener 878 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 879 for (WALActionsListener i : this.listeners) { 880 i.logRollRequested(tooFewReplicas); 881 } 882 } 883 } 884 885 long getUnflushedEntriesCount() { 886 long highestSynced = this.highestSyncedTxid.get(); 887 long highestUnsynced = this.highestUnsyncedTxid; 888 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 889 } 890 891 boolean isUnflushedEntries() { 892 return getUnflushedEntriesCount() > 0; 893 } 894 895 /** 896 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 897 */ 898 @VisibleForTesting 899 protected void atHeadOfRingBufferEventHandlerAppend() { 900 // Noop 901 } 902 903 protected final boolean append(W writer, FSWALEntry entry) throws IOException { 904 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 905 atHeadOfRingBufferEventHandlerAppend(); 906 long start = EnvironmentEdgeManager.currentTime(); 907 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 908 long regionSequenceId = entry.getKey().getSequenceId(); 909 910 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 911 // region sequence id only, a region edit/sequence id that is not associated with an actual 912 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 913 if (entry.getEdit().isEmpty()) { 914 return false; 915 } 916 917 // Coprocessor hook. 918 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 919 if (!listeners.isEmpty()) { 920 for (WALActionsListener i : listeners) { 921 i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); 922 } 923 } 924 doAppend(writer, entry); 925 assert highestUnsyncedTxid < entry.getTxid(); 926 highestUnsyncedTxid = entry.getTxid(); 927 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 928 entry.isInMemStore()); 929 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 930 // Update metrics. 931 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 932 numEntries.incrementAndGet(); 933 return true; 934 } 935 936 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 937 long len = 0; 938 if (!listeners.isEmpty()) { 939 for (Cell cell : e.getEdit().getCells()) { 940 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 941 } 942 for (WALActionsListener listener : listeners) { 943 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 944 } 945 } 946 return len; 947 } 948 949 protected final void postSync(final long timeInNanos, final int handlerSyncs) { 950 if (timeInNanos > this.slowSyncNs) { 951 String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) 952 .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); 953 TraceUtil.addTimelineAnnotation(msg); 954 LOG.info(msg); 955 } 956 if (!listeners.isEmpty()) { 957 for (WALActionsListener listener : listeners) { 958 listener.postSync(timeInNanos, handlerSyncs); 959 } 960 } 961 } 962 963 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 964 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) 965 throws IOException { 966 if (this.closed) { 967 throw new IOException( 968 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 969 } 970 MutableLong txidHolder = new MutableLong(); 971 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 972 txidHolder.setValue(ringBuffer.next()); 973 }); 974 long txid = txidHolder.longValue(); 975 ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) 976 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); 977 try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { 978 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 979 entry.stampRegionSequenceId(we); 980 ringBuffer.get(txid).load(entry); 981 } finally { 982 ringBuffer.publish(txid); 983 } 984 return txid; 985 } 986 987 @Override 988 public String toString() { 989 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 990 } 991 992 /** 993 * if the given {@code path} is being written currently, then return its length. 994 * <p> 995 * This is used by replication to prevent replicating unacked log entries. See 996 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 997 */ 998 @Override 999 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1000 rollWriterLock.lock(); 1001 try { 1002 Path currentPath = getOldPath(); 1003 if (path.equals(currentPath)) { 1004 W writer = this.writer; 1005 return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); 1006 } else { 1007 return OptionalLong.empty(); 1008 } 1009 } finally { 1010 rollWriterLock.unlock(); 1011 } 1012 } 1013 1014 /** 1015 * NOTE: This append, at a time that is usually after this call returns, starts an mvcc 1016 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1017 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1018 * 'complete' the transaction this mvcc transaction by calling 1019 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1020 * in the finally of a try/finally block within which this append lives and any subsequent 1021 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1022 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1023 * immediately available on return from this method. It WILL be available subsequent to a sync of 1024 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1025 */ 1026 @Override 1027 public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1028 throws IOException; 1029 1030 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1031 1032 protected abstract W createWriterInstance(Path path) 1033 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1034 1035 /** 1036 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1037 * will begin to work before returning from this method. If we clear the flag after returning from 1038 * this call, we may miss a roll request. The implementation class should choose a proper place to 1039 * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you 1040 * start writing to the new writer. 1041 */ 1042 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1043 throws IOException; 1044 1045 protected abstract void doShutdown() throws IOException; 1046 1047 protected abstract boolean doCheckLogLowReplication(); 1048 1049 public void checkLogLowReplication(long checkInterval) { 1050 long now = EnvironmentEdgeManager.currentTime(); 1051 if (now - lastTimeCheckLowReplication < checkInterval) { 1052 return; 1053 } 1054 // Will return immediately if we are in the middle of a WAL log roll currently. 1055 if (!rollWriterLock.tryLock()) { 1056 return; 1057 } 1058 try { 1059 lastTimeCheckLowReplication = now; 1060 if (doCheckLogLowReplication()) { 1061 requestLogRoll(true); 1062 } 1063 } finally { 1064 rollWriterLock.unlock(); 1065 } 1066 } 1067 1068 /** 1069 * This method gets the pipeline for the current WAL. 1070 */ 1071 @VisibleForTesting 1072 abstract DatanodeInfo[] getPipeline(); 1073 1074 /** 1075 * This method gets the datanode replication count for the current WAL. 1076 */ 1077 @VisibleForTesting 1078 abstract int getLogReplication(); 1079 1080 private static void split(final Configuration conf, final Path p) throws IOException { 1081 FileSystem fs = FSUtils.getWALFileSystem(conf); 1082 if (!fs.exists(p)) { 1083 throw new FileNotFoundException(p.toString()); 1084 } 1085 if (!fs.getFileStatus(p).isDirectory()) { 1086 throw new IOException(p + " is not a directory"); 1087 } 1088 1089 final Path baseDir = FSUtils.getWALRootDir(conf); 1090 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1091 if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1092 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { 1093 archiveDir = new Path(archiveDir, p.getName()); 1094 } 1095 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1096 } 1097 1098 private static void usage() { 1099 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1100 System.err.println("Arguments:"); 1101 System.err.println(" --dump Dump textual representation of passed one or more files"); 1102 System.err.println(" For example: " + 1103 "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1104 System.err.println(" --split Split the passed directory of WAL logs"); 1105 System.err.println( 1106 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1107 } 1108 1109 /** 1110 * Pass one or more log file names and it will either dump out a text version on 1111 * <code>stdout</code> or split the specified log files. 1112 */ 1113 public static void main(String[] args) throws IOException { 1114 if (args.length < 2) { 1115 usage(); 1116 System.exit(-1); 1117 } 1118 // either dump using the WALPrettyPrinter or split, depending on args 1119 if (args[0].compareTo("--dump") == 0) { 1120 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1121 } else if (args[0].compareTo("--perf") == 0) { 1122 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1123 LOG.error(HBaseMarkers.FATAL, 1124 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1125 System.exit(-1); 1126 } else if (args[0].compareTo("--split") == 0) { 1127 Configuration conf = HBaseConfiguration.create(); 1128 for (int i = 1; i < args.length; i++) { 1129 try { 1130 Path logPath = new Path(args[i]); 1131 FSUtils.setFsDefault(conf, logPath); 1132 split(conf, logPath); 1133 } catch (IOException t) { 1134 t.printStackTrace(System.err); 1135 System.exit(-1); 1136 } 1137 } 1138 } else { 1139 usage(); 1140 System.exit(-1); 1141 } 1142 } 1143}