001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 021import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 022import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 023 024import com.lmax.disruptor.RingBuffer; 025import java.io.FileNotFoundException; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.lang.management.MemoryType; 029import java.net.URLEncoder; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Comparator; 033import java.util.List; 034import java.util.Map; 035import java.util.OptionalLong; 036import java.util.Set; 037import java.util.concurrent.ConcurrentNavigableMap; 038import java.util.concurrent.ConcurrentSkipListMap; 039import java.util.concurrent.CopyOnWriteArrayList; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043import java.util.concurrent.atomic.AtomicInteger; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.locks.ReentrantLock; 046import org.apache.commons.lang3.mutable.MutableLong; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FileStatus; 049import org.apache.hadoop.fs.FileSystem; 050import org.apache.hadoop.fs.Path; 051import org.apache.hadoop.fs.PathFilter; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.HBaseConfiguration; 054import org.apache.hadoop.hbase.HConstants; 055import org.apache.hadoop.hbase.PrivateCellUtil; 056import org.apache.hadoop.hbase.client.RegionInfo; 057import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 058import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 059import org.apache.hadoop.hbase.ipc.RpcServer; 060import org.apache.hadoop.hbase.ipc.ServerCall; 061import org.apache.hadoop.hbase.log.HBaseMarkers; 062import org.apache.hadoop.hbase.regionserver.HRegion; 063import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 064import org.apache.hadoop.hbase.trace.TraceUtil; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.CommonFSUtils; 067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 068import org.apache.hadoop.hbase.util.FSUtils; 069import org.apache.hadoop.hbase.util.Pair; 070import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 071import org.apache.hadoop.hbase.wal.WAL; 072import org.apache.hadoop.hbase.wal.WALEdit; 073import org.apache.hadoop.hbase.wal.WALFactory; 074import org.apache.hadoop.hbase.wal.WALKeyImpl; 075import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 076import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 077import org.apache.hadoop.hbase.wal.WALSplitter; 078import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 079import org.apache.hadoop.util.StringUtils; 080import org.apache.htrace.core.TraceScope; 081import org.apache.yetus.audience.InterfaceAudience; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 086 087/** 088 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 089 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 090 * This is done internal to the implementation. 091 * <p> 092 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 093 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 094 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 095 * flushed out to hfiles, and what is yet in WAL and in memory only. 096 * <p> 097 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 098 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 099 * (smaller) than the most-recent flush. 100 * <p> 101 * To read an WAL, call 102 * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * 103 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 104 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 105 * we have made successful appends to the WAL and we then are unable to sync them, our current 106 * semantic is to return error to the client that the appends failed but also to abort the current 107 * context, usually the hosting server. We need to replay the WALs. <br> 108 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 109 * that the append failed. <br> 110 * TODO: replication may pick up these last edits though they have been marked as failed append 111 * (Need to keep our own file lengths, not rely on HDFS). 112 */ 113@InterfaceAudience.Private 114public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 115 116 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 117 118 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 119 120 private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 121 122 /** 123 * file system instance 124 */ 125 protected final FileSystem fs; 126 127 /** 128 * WAL directory, where all WAL files would be placed. 129 */ 130 protected final Path walDir; 131 132 /** 133 * dir path where old logs are kept. 134 */ 135 protected final Path walArchiveDir; 136 137 /** 138 * Matches just those wal files that belong to this wal instance. 139 */ 140 protected final PathFilter ourFiles; 141 142 /** 143 * Prefix of a WAL file, usually the region server name it is hosted on. 144 */ 145 protected final String walFilePrefix; 146 147 /** 148 * Suffix included on generated wal file names 149 */ 150 protected final String walFileSuffix; 151 152 /** 153 * Prefix used when checking for wal membership. 154 */ 155 protected final String prefixPathStr; 156 157 protected final WALCoprocessorHost coprocessorHost; 158 159 /** 160 * conf object 161 */ 162 protected final Configuration conf; 163 164 /** Listeners that are called on WAL events. */ 165 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 166 167 /** 168 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 169 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 170 * facility for answering questions such as "Is it safe to GC a WAL?". 171 */ 172 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 173 174 protected final long slowSyncNs; 175 176 private final long walSyncTimeoutNs; 177 178 // If > than this size, roll the log. 179 protected final long logrollsize; 180 181 /** 182 * Block size to use writing files. 183 */ 184 protected final long blocksize; 185 186 /* 187 * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too 188 * many and we crash, then will take forever replaying. Keep the number of logs tidy. 189 */ 190 protected final int maxLogs; 191 192 protected final boolean useHsync; 193 194 /** 195 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 196 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 197 * warning when it thinks synchronized controls writer thread safety. It is held when we are 198 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 199 * not. 200 */ 201 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 202 203 // The timestamp (in ms) when the log file was created. 204 protected final AtomicLong filenum = new AtomicLong(-1); 205 206 // Number of transactions in the current Wal. 207 protected final AtomicInteger numEntries = new AtomicInteger(0); 208 209 /** 210 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 211 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 212 * corresponding entry in queue. 213 */ 214 protected volatile long highestUnsyncedTxid = -1; 215 216 /** 217 * Updated to the transaction id of the last successful sync call. This can be less than 218 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 219 * for it. 220 */ 221 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 222 223 /** 224 * The total size of wal 225 */ 226 protected final AtomicLong totalLogSize = new AtomicLong(0); 227 /** 228 * Current log file. 229 */ 230 volatile W writer; 231 232 // Last time to check low replication on hlog's pipeline 233 private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 234 235 protected volatile boolean closed = false; 236 237 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 238 /** 239 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 240 * an IllegalArgumentException if used to compare paths from different wals. 241 */ 242 final Comparator<Path> LOG_NAME_COMPARATOR = 243 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 244 245 private static final class WalProps { 246 247 /** 248 * Map the encoded region name to the highest sequence id. 249 * <p/>Contains all the regions it has an entry for. 250 */ 251 public final Map<byte[], Long> encodedName2HighestSequenceId; 252 253 /** 254 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 255 * sub classes. 256 */ 257 public final long logSize; 258 259 public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 260 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 261 this.logSize = logSize; 262 } 263 } 264 265 /** 266 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 267 * (contained in the log file name). 268 */ 269 protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = 270 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 271 272 /** 273 * Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures. 274 * Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228 275 * <p> 276 */ 277 private final ThreadLocal<SyncFuture> cachedSyncFutures; 278 279 /** 280 * The class name of the runtime implementation, used as prefix for logging/tracing. 281 * <p> 282 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 283 * refer to HBASE-17676 for more details 284 * </p> 285 */ 286 protected final String implClassName; 287 288 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 289 290 public long getFilenum() { 291 return this.filenum.get(); 292 } 293 294 /** 295 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 296 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 297 * the filename is created with the {@link #computeFilename(long filenum)} method. 298 * @return timestamp, as in the log file name. 299 */ 300 protected long getFileNumFromFileName(Path fileName) { 301 checkNotNull(fileName, "file name can't be null"); 302 if (!ourFiles.accept(fileName)) { 303 throw new IllegalArgumentException( 304 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 305 } 306 final String fileNameString = fileName.toString(); 307 String chompedPath = fileNameString.substring(prefixPathStr.length(), 308 (fileNameString.length() - walFileSuffix.length())); 309 return Long.parseLong(chompedPath); 310 } 311 312 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 313 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 314 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 315 } 316 317 // must be power of 2 318 protected final int getPreallocatedEventCount() { 319 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 320 // be stuck and make no progress if the buffer is filled with appends only and there is no 321 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 322 // before they return. 323 int preallocatedEventCount = 324 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 325 checkArgument(preallocatedEventCount >= 0, 326 "hbase.regionserver.wal.disruptor.event.count must > 0"); 327 int floor = Integer.highestOneBit(preallocatedEventCount); 328 if (floor == preallocatedEventCount) { 329 return floor; 330 } 331 // max capacity is 1 << 30 332 if (floor >= 1 << 29) { 333 return 1 << 30; 334 } 335 return floor << 1; 336 } 337 338 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 339 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 340 final boolean failIfWALExists, final String prefix, final String suffix) 341 throws FailedLogCloseException, IOException { 342 this.fs = fs; 343 this.walDir = new Path(rootDir, logDir); 344 this.walArchiveDir = new Path(rootDir, archiveDir); 345 this.conf = conf; 346 347 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 348 throw new IOException("Unable to mkdir " + walDir); 349 } 350 351 if (!fs.exists(this.walArchiveDir)) { 352 if (!fs.mkdirs(this.walArchiveDir)) { 353 throw new IOException("Unable to mkdir " + this.walArchiveDir); 354 } 355 } 356 357 // If prefix is null||empty then just name it wal 358 this.walFilePrefix = 359 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 360 // we only correctly differentiate suffices when numeric ones start with '.' 361 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 362 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + 363 "' but instead was '" + suffix + "'"); 364 } 365 // Now that it exists, set the storage policy for the entire directory of wal files related to 366 // this FSHLog instance 367 String storagePolicy = 368 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 369 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 370 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 371 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 372 373 this.ourFiles = new PathFilter() { 374 @Override 375 public boolean accept(final Path fileName) { 376 // The path should start with dir/<prefix> and end with our suffix 377 final String fileNameString = fileName.toString(); 378 if (!fileNameString.startsWith(prefixPathStr)) { 379 return false; 380 } 381 if (walFileSuffix.isEmpty()) { 382 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 383 return org.apache.commons.lang3.StringUtils 384 .isNumeric(fileNameString.substring(prefixPathStr.length())); 385 } else if (!fileNameString.endsWith(walFileSuffix)) { 386 return false; 387 } 388 return true; 389 } 390 }; 391 392 if (failIfWALExists) { 393 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 394 if (null != walFiles && 0 != walFiles.length) { 395 throw new IOException("Target WAL already exists within directory " + walDir); 396 } 397 } 398 399 // Register listeners. TODO: Should this exist anymore? We have CPs? 400 if (listeners != null) { 401 for (WALActionsListener i : listeners) { 402 registerWALActionsListener(i); 403 } 404 } 405 this.coprocessorHost = new WALCoprocessorHost(this, conf); 406 407 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 408 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 409 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 410 // the block size but experience from the field has it that this was not enough time for the 411 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 412 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 413 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 414 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 415 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 416 float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f); 417 this.logrollsize = (long)(this.blocksize * multiplier); 418 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 419 Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 420 421 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + 422 StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + 423 walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); 424 this.slowSyncNs = TimeUnit.MILLISECONDS 425 .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); 426 this.walSyncTimeoutNs = TimeUnit.MILLISECONDS 427 .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); 428 this.cachedSyncFutures = new ThreadLocal<SyncFuture>() { 429 @Override 430 protected SyncFuture initialValue() { 431 return new SyncFuture(); 432 } 433 }; 434 this.implClassName = getClass().getSimpleName(); 435 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 436 } 437 438 /** 439 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 440 */ 441 public void init() throws IOException { 442 rollWriter(); 443 } 444 445 @Override 446 public void registerWALActionsListener(WALActionsListener listener) { 447 this.listeners.add(listener); 448 } 449 450 @Override 451 public boolean unregisterWALActionsListener(WALActionsListener listener) { 452 return this.listeners.remove(listener); 453 } 454 455 @Override 456 public WALCoprocessorHost getCoprocessorHost() { 457 return coprocessorHost; 458 } 459 460 @Override 461 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 462 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 463 } 464 465 @Override 466 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 467 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 468 } 469 470 @Override 471 public void completeCacheFlush(byte[] encodedRegionName) { 472 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); 473 } 474 475 @Override 476 public void abortCacheFlush(byte[] encodedRegionName) { 477 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 478 } 479 480 @Override 481 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 482 // Used by tests. Deprecated as too subtle for general usage. 483 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 484 } 485 486 @Override 487 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 488 // This method is used by tests and for figuring if we should flush or not because our 489 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 490 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 491 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 492 // currently flushing sequence ids, and if anything found there, it is returning these. This is 493 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 494 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 495 // id is old even though we are currently flushing. This may mean we do too much flushing. 496 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 497 } 498 499 @Override 500 public byte[][] rollWriter() throws FailedLogCloseException, IOException { 501 return rollWriter(false); 502 } 503 504 /** 505 * This is a convenience method that computes a new filename with a given file-number. 506 * @param filenum to use 507 * @return Path 508 */ 509 protected Path computeFilename(final long filenum) { 510 if (filenum < 0) { 511 throw new RuntimeException("WAL file number can't be < 0"); 512 } 513 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 514 return new Path(walDir, child); 515 } 516 517 /** 518 * This is a convenience method that computes a new filename with a given using the current WAL 519 * file-number 520 * @return Path 521 */ 522 public Path getCurrentFileName() { 523 return computeFilename(this.filenum.get()); 524 } 525 526 /** 527 * retrieve the next path to use for writing. Increments the internal filenum. 528 */ 529 private Path getNewPath() throws IOException { 530 this.filenum.set(System.currentTimeMillis()); 531 Path newPath = getCurrentFileName(); 532 while (fs.exists(newPath)) { 533 this.filenum.incrementAndGet(); 534 newPath = getCurrentFileName(); 535 } 536 return newPath; 537 } 538 539 @VisibleForTesting 540 Path getOldPath() { 541 long currentFilenum = this.filenum.get(); 542 Path oldPath = null; 543 if (currentFilenum > 0) { 544 // ComputeFilename will take care of meta wal filename 545 oldPath = computeFilename(currentFilenum); 546 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 547 return oldPath; 548 } 549 550 /** 551 * Tell listeners about pre log roll. 552 */ 553 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 554 throws IOException { 555 coprocessorHost.preWALRoll(oldPath, newPath); 556 557 if (!this.listeners.isEmpty()) { 558 for (WALActionsListener i : this.listeners) { 559 i.preLogRoll(oldPath, newPath); 560 } 561 } 562 } 563 564 /** 565 * Tell listeners about post log roll. 566 */ 567 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 568 throws IOException { 569 if (!this.listeners.isEmpty()) { 570 for (WALActionsListener i : this.listeners) { 571 i.postLogRoll(oldPath, newPath); 572 } 573 } 574 575 coprocessorHost.postWALRoll(oldPath, newPath); 576 } 577 578 // public only until class moves to o.a.h.h.wal 579 /** @return the number of rolled log files */ 580 public int getNumRolledLogFiles() { 581 return walFile2Props.size(); 582 } 583 584 // public only until class moves to o.a.h.h.wal 585 /** @return the number of log files in use */ 586 public int getNumLogFiles() { 587 // +1 for current use log 588 return getNumRolledLogFiles() + 1; 589 } 590 591 /** 592 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, 593 * check the first (oldest) WAL, and return those regions which should be flushed so that 594 * it can be let-go/'archived'. 595 * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. 596 */ 597 byte[][] findRegionsToForceFlush() throws IOException { 598 byte[][] regions = null; 599 int logCount = getNumRolledLogFiles(); 600 if (logCount > this.maxLogs && logCount > 0) { 601 Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry(); 602 regions = 603 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 604 } 605 if (regions != null) { 606 StringBuilder sb = new StringBuilder(); 607 for (int i = 0; i < regions.length; i++) { 608 if (i > 0) { 609 sb.append(", "); 610 } 611 sb.append(Bytes.toStringBinary(regions[i])); 612 } 613 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + 614 "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); 615 } 616 return regions; 617 } 618 619 /** 620 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 621 */ 622 private void cleanOldLogs() throws IOException { 623 List<Pair<Path, Long>> logsToArchive = null; 624 // For each log file, look at its Map of regions to highest sequence id; if all sequence ids 625 // are older than what is currently in memory, the WAL can be GC'd. 626 for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) { 627 Path log = e.getKey(); 628 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 629 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 630 if (logsToArchive == null) { 631 logsToArchive = new ArrayList<>(); 632 } 633 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 634 if (LOG.isTraceEnabled()) { 635 LOG.trace("WAL file ready for archiving " + log); 636 } 637 } 638 } 639 if (logsToArchive != null) { 640 for (Pair<Path, Long> logAndSize : logsToArchive) { 641 this.totalLogSize.addAndGet(-logAndSize.getSecond()); 642 archiveLogFile(logAndSize.getFirst()); 643 this.walFile2Props.remove(logAndSize.getFirst()); 644 } 645 } 646 } 647 648 /* 649 * only public so WALSplitter can use. 650 * @return archived location of a WAL file with the given path p 651 */ 652 public static Path getWALArchivePath(Path archiveDir, Path p) { 653 return new Path(archiveDir, p.getName()); 654 } 655 656 private void archiveLogFile(final Path p) throws IOException { 657 Path newPath = getWALArchivePath(this.walArchiveDir, p); 658 // Tell our listeners that a log is going to be archived. 659 if (!this.listeners.isEmpty()) { 660 for (WALActionsListener i : this.listeners) { 661 i.preLogArchive(p, newPath); 662 } 663 } 664 LOG.info("Archiving " + p + " to " + newPath); 665 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 666 throw new IOException("Unable to rename " + p + " to " + newPath); 667 } 668 // Tell our listeners that a log has been archived. 669 if (!this.listeners.isEmpty()) { 670 for (WALActionsListener i : this.listeners) { 671 i.postLogArchive(p, newPath); 672 } 673 } 674 } 675 676 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 677 int oldNumEntries = this.numEntries.getAndSet(0); 678 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 679 if (oldPath != null) { 680 this.walFile2Props.put(oldPath, 681 new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 682 this.totalLogSize.addAndGet(oldFileLen); 683 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 684 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 685 newPathString); 686 } else { 687 LOG.info("New WAL {}", newPathString); 688 } 689 } 690 691 /** 692 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 693 * <p/> 694 * <ul> 695 * <li>In the case of creating a new WAL, oldPath will be null.</li> 696 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 697 * </li> 698 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 699 * null.</li> 700 * </ul> 701 * @param oldPath may be null 702 * @param newPath may be null 703 * @param nextWriter may be null 704 * @return the passed in <code>newPath</code> 705 * @throws IOException if there is a problem flushing or closing the underlying FS 706 */ 707 @VisibleForTesting 708 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 709 try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { 710 doReplaceWriter(oldPath, newPath, nextWriter); 711 return newPath; 712 } 713 } 714 715 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 716 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 717 try { 718 if (syncFuture != null) { 719 if (closed) { 720 throw new IOException("WAL has been closed"); 721 } else { 722 syncFuture.get(walSyncTimeoutNs); 723 } 724 } 725 } catch (TimeoutIOException tioe) { 726 // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer 727 // still refer to it, so if this thread use it next time may get a wrong 728 // result. 729 this.cachedSyncFutures.remove(); 730 throw tioe; 731 } catch (InterruptedException ie) { 732 LOG.warn("Interrupted", ie); 733 throw convertInterruptedExceptionToIOException(ie); 734 } catch (ExecutionException e) { 735 throw ensureIOException(e.getCause()); 736 } 737 } 738 739 private static IOException ensureIOException(final Throwable t) { 740 return (t instanceof IOException) ? (IOException) t : new IOException(t); 741 } 742 743 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 744 Thread.currentThread().interrupt(); 745 IOException ioe = new InterruptedIOException(); 746 ioe.initCause(ie); 747 return ioe; 748 } 749 750 @Override 751 public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { 752 rollWriterLock.lock(); 753 try { 754 // Return if nothing to flush. 755 if (!force && this.writer != null && this.numEntries.get() <= 0) { 756 return null; 757 } 758 byte[][] regionsToFlush = null; 759 if (this.closed) { 760 LOG.debug("WAL closed. Skipping rolling of writer"); 761 return regionsToFlush; 762 } 763 try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { 764 Path oldPath = getOldPath(); 765 Path newPath = getNewPath(); 766 // Any exception from here on is catastrophic, non-recoverable so we currently abort. 767 W nextWriter = this.createWriterInstance(newPath); 768 tellListenersAboutPreLogRoll(oldPath, newPath); 769 // NewPath could be equal to oldPath if replaceWriter fails. 770 newPath = replaceWriter(oldPath, newPath, nextWriter); 771 tellListenersAboutPostLogRoll(oldPath, newPath); 772 if (LOG.isDebugEnabled()) { 773 LOG.debug("Create new " + implClassName + " writer with pipeline: " + 774 Arrays.toString(getPipeline())); 775 } 776 // Can we delete any of the old log files? 777 if (getNumRolledLogFiles() > 0) { 778 cleanOldLogs(); 779 regionsToFlush = findRegionsToForceFlush(); 780 } 781 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 782 // If the underlying FileSystem can't do what we ask, treat as IO failure so 783 // we'll abort. 784 throw new IOException( 785 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 786 exception); 787 } 788 return regionsToFlush; 789 } finally { 790 rollWriterLock.unlock(); 791 } 792 } 793 794 // public only until class moves to o.a.h.h.wal 795 /** @return the size of log files in use */ 796 public long getLogFileSize() { 797 return this.totalLogSize.get(); 798 } 799 800 // public only until class moves to o.a.h.h.wal 801 public void requestLogRoll() { 802 requestLogRoll(false); 803 } 804 805 /** 806 * Get the backing files associated with this WAL. 807 * @return may be null if there are no files. 808 */ 809 @VisibleForTesting 810 FileStatus[] getFiles() throws IOException { 811 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 812 } 813 814 @Override 815 public void shutdown() throws IOException { 816 if (!shutdown.compareAndSet(false, true)) { 817 return; 818 } 819 closed = true; 820 // Tell our listeners that the log is closing 821 if (!this.listeners.isEmpty()) { 822 for (WALActionsListener i : this.listeners) { 823 i.logCloseRequested(); 824 } 825 } 826 rollWriterLock.lock(); 827 try { 828 doShutdown(); 829 } finally { 830 rollWriterLock.unlock(); 831 } 832 } 833 834 @Override 835 public void close() throws IOException { 836 shutdown(); 837 final FileStatus[] files = getFiles(); 838 if (null != files && 0 != files.length) { 839 for (FileStatus file : files) { 840 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 841 // Tell our listeners that a log is going to be archived. 842 if (!this.listeners.isEmpty()) { 843 for (WALActionsListener i : this.listeners) { 844 i.preLogArchive(file.getPath(), p); 845 } 846 } 847 848 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 849 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 850 } 851 // Tell our listeners that a log was archived. 852 if (!this.listeners.isEmpty()) { 853 for (WALActionsListener i : this.listeners) { 854 i.postLogArchive(file.getPath(), p); 855 } 856 } 857 } 858 LOG.debug( 859 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 860 } 861 LOG.info("Closed WAL: " + toString()); 862 } 863 864 /** 865 * updates the sequence number of a specific store. depending on the flag: replaces current seq 866 * number if the given seq id is bigger, or even if it is lower than existing one 867 */ 868 @Override 869 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 870 boolean onlyIfGreater) { 871 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 872 } 873 874 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 875 return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync); 876 } 877 878 protected boolean isLogRollRequested() { 879 return rollRequested.get(); 880 } 881 882 protected final void requestLogRoll(boolean tooFewReplicas) { 883 // If we have already requested a roll, don't do it again 884 // And only set rollRequested to true when there is a registered listener 885 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 886 for (WALActionsListener i : this.listeners) { 887 i.logRollRequested(tooFewReplicas); 888 } 889 } 890 } 891 892 long getUnflushedEntriesCount() { 893 long highestSynced = this.highestSyncedTxid.get(); 894 long highestUnsynced = this.highestUnsyncedTxid; 895 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 896 } 897 898 boolean isUnflushedEntries() { 899 return getUnflushedEntriesCount() > 0; 900 } 901 902 /** 903 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 904 */ 905 @VisibleForTesting 906 protected void atHeadOfRingBufferEventHandlerAppend() { 907 // Noop 908 } 909 910 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 911 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 912 atHeadOfRingBufferEventHandlerAppend(); 913 long start = EnvironmentEdgeManager.currentTime(); 914 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 915 long regionSequenceId = entry.getKey().getSequenceId(); 916 917 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 918 // region sequence id only, a region edit/sequence id that is not associated with an actual 919 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 920 if (entry.getEdit().isEmpty()) { 921 return false; 922 } 923 924 // Coprocessor hook. 925 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 926 if (!listeners.isEmpty()) { 927 for (WALActionsListener i : listeners) { 928 i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); 929 } 930 } 931 doAppend(writer, entry); 932 assert highestUnsyncedTxid < entry.getTxid(); 933 highestUnsyncedTxid = entry.getTxid(); 934 if (entry.isCloseRegion()) { 935 // let's clean all the records of this region 936 sequenceIdAccounting.onRegionClose(encodedRegionName); 937 } else { 938 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 939 entry.isInMemStore()); 940 } 941 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 942 // Update metrics. 943 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 944 numEntries.incrementAndGet(); 945 return true; 946 } 947 948 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 949 long len = 0; 950 if (!listeners.isEmpty()) { 951 for (Cell cell : e.getEdit().getCells()) { 952 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 953 } 954 for (WALActionsListener listener : listeners) { 955 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 956 } 957 } 958 return len; 959 } 960 961 protected final void postSync(final long timeInNanos, final int handlerSyncs) { 962 if (timeInNanos > this.slowSyncNs) { 963 String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) 964 .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); 965 TraceUtil.addTimelineAnnotation(msg); 966 LOG.info(msg); 967 } 968 if (!listeners.isEmpty()) { 969 for (WALActionsListener listener : listeners) { 970 listener.postSync(timeInNanos, handlerSyncs); 971 } 972 } 973 } 974 975 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 976 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) 977 throws IOException { 978 if (this.closed) { 979 throw new IOException( 980 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 981 } 982 MutableLong txidHolder = new MutableLong(); 983 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 984 txidHolder.setValue(ringBuffer.next()); 985 }); 986 long txid = txidHolder.longValue(); 987 ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) 988 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); 989 try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { 990 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 991 entry.stampRegionSequenceId(we); 992 ringBuffer.get(txid).load(entry); 993 } finally { 994 ringBuffer.publish(txid); 995 } 996 return txid; 997 } 998 999 @Override 1000 public String toString() { 1001 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1002 } 1003 1004 /** 1005 * if the given {@code path} is being written currently, then return its length. 1006 * <p> 1007 * This is used by replication to prevent replicating unacked log entries. See 1008 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1009 */ 1010 @Override 1011 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1012 rollWriterLock.lock(); 1013 try { 1014 Path currentPath = getOldPath(); 1015 if (path.equals(currentPath)) { 1016 W writer = this.writer; 1017 return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); 1018 } else { 1019 return OptionalLong.empty(); 1020 } 1021 } finally { 1022 rollWriterLock.unlock(); 1023 } 1024 } 1025 1026 @Override 1027 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1028 return append(info, key, edits, true); 1029 } 1030 1031 @Override 1032 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) 1033 throws IOException { 1034 return append(info, key, edits, false); 1035 } 1036 1037 /** 1038 * Append a set of edits to the WAL. 1039 * <p/> 1040 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1041 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1042 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1043 * <p/> 1044 * NOTE: This append, at a time that is usually after this call returns, starts an mvcc 1045 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1046 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1047 * 'complete' the transaction this mvcc transaction by calling 1048 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1049 * in the finally of a try/finally block within which this append lives and any subsequent 1050 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1051 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1052 * immediately available on return from this method. It WILL be available subsequent to a sync of 1053 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1054 * @param info the regioninfo associated with append 1055 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1056 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1057 * sequence id that is after all currently appended edits. 1058 * @param inMemstore Always true except for case where we are writing a region event meta 1059 * marker edit, for example, a compaction completion record into the WAL or noting a 1060 * Region Open event. In these cases the entry is just so we can finish an unfinished 1061 * compaction after a crash when the new Server reads the WAL on recovery, etc. These 1062 * transition event 'Markers' do not go via the memstore. When memstore is false, 1063 * we presume a Marker event edit. 1064 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1065 * in it. 1066 */ 1067 protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1068 throws IOException; 1069 1070 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1071 1072 protected abstract W createWriterInstance(Path path) 1073 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1074 1075 /** 1076 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1077 * will begin to work before returning from this method. If we clear the flag after returning from 1078 * this call, we may miss a roll request. The implementation class should choose a proper place to 1079 * clear the {@link #rollRequested} flag so we do not miss a roll request, typically before you 1080 * start writing to the new writer. 1081 */ 1082 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1083 throws IOException; 1084 1085 protected abstract void doShutdown() throws IOException; 1086 1087 protected abstract boolean doCheckLogLowReplication(); 1088 1089 public void checkLogLowReplication(long checkInterval) { 1090 long now = EnvironmentEdgeManager.currentTime(); 1091 if (now - lastTimeCheckLowReplication < checkInterval) { 1092 return; 1093 } 1094 // Will return immediately if we are in the middle of a WAL log roll currently. 1095 if (!rollWriterLock.tryLock()) { 1096 return; 1097 } 1098 try { 1099 lastTimeCheckLowReplication = now; 1100 if (doCheckLogLowReplication()) { 1101 requestLogRoll(true); 1102 } 1103 } finally { 1104 rollWriterLock.unlock(); 1105 } 1106 } 1107 1108 /** 1109 * This method gets the pipeline for the current WAL. 1110 */ 1111 @VisibleForTesting 1112 abstract DatanodeInfo[] getPipeline(); 1113 1114 /** 1115 * This method gets the datanode replication count for the current WAL. 1116 */ 1117 @VisibleForTesting 1118 abstract int getLogReplication(); 1119 1120 private static void split(final Configuration conf, final Path p) throws IOException { 1121 FileSystem fs = FSUtils.getWALFileSystem(conf); 1122 if (!fs.exists(p)) { 1123 throw new FileNotFoundException(p.toString()); 1124 } 1125 if (!fs.getFileStatus(p).isDirectory()) { 1126 throw new IOException(p + " is not a directory"); 1127 } 1128 1129 final Path baseDir = FSUtils.getWALRootDir(conf); 1130 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1131 if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1132 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { 1133 archiveDir = new Path(archiveDir, p.getName()); 1134 } 1135 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1136 } 1137 1138 private static void usage() { 1139 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1140 System.err.println("Arguments:"); 1141 System.err.println(" --dump Dump textual representation of passed one or more files"); 1142 System.err.println(" For example: " + 1143 "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1144 System.err.println(" --split Split the passed directory of WAL logs"); 1145 System.err.println( 1146 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1147 } 1148 1149 /** 1150 * Pass one or more log file names and it will either dump out a text version on 1151 * <code>stdout</code> or split the specified log files. 1152 */ 1153 public static void main(String[] args) throws IOException { 1154 if (args.length < 2) { 1155 usage(); 1156 System.exit(-1); 1157 } 1158 // either dump using the WALPrettyPrinter or split, depending on args 1159 if (args[0].compareTo("--dump") == 0) { 1160 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1161 } else if (args[0].compareTo("--perf") == 0) { 1162 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1163 LOG.error(HBaseMarkers.FATAL, 1164 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1165 System.exit(-1); 1166 } else if (args[0].compareTo("--split") == 0) { 1167 Configuration conf = HBaseConfiguration.create(); 1168 for (int i = 1; i < args.length; i++) { 1169 try { 1170 Path logPath = new Path(args[i]); 1171 FSUtils.setFsDefault(conf, logPath); 1172 split(conf, logPath); 1173 } catch (IOException t) { 1174 t.printStackTrace(System.err); 1175 System.exit(-1); 1176 } 1177 } 1178 } else { 1179 usage(); 1180 System.exit(-1); 1181 } 1182 } 1183}