001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile; 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.text.ParseException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.atomic.AtomicReference; 033import org.apache.commons.lang3.ArrayUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 042import org.apache.hadoop.hbase.master.SplitLogManager; 043import org.apache.hadoop.hbase.monitoring.MonitoredTask; 044import org.apache.hadoop.hbase.monitoring.TaskMonitor; 045import org.apache.hadoop.hbase.procedure2.util.StringUtils; 046import org.apache.hadoop.hbase.regionserver.LastSequenceId; 047import org.apache.hadoop.hbase.regionserver.RegionServerServices; 048import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.CancelableProgressable; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 054import org.apache.hadoop.hbase.wal.WAL.Entry; 055import org.apache.hadoop.hbase.wal.WAL.Reader; 056import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 057import org.apache.hadoop.ipc.RemoteException; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 062import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 063import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 066 067/** 068 * Split RegionServer WAL files. Splits the WAL into new files, 069 * one per region, to be picked up on Region reopen. Deletes the split WAL when finished. 070 * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or 071 * {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable, 072 * LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for 073 * entry-point. 074 */ 075@InterfaceAudience.Private 076public class WALSplitter { 077 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 078 079 /** By default we retry errors in splitting, rather than skipping. */ 080 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 081 082 // Parameters for split process 083 protected final Path walDir; 084 protected final FileSystem walFS; 085 protected final Configuration conf; 086 final Path rootDir; 087 final FileSystem rootFS; 088 final RegionServerServices rsServices; 089 090 // Major subcomponents of the split process. 091 // These are separated into inner classes to make testing easier. 092 OutputSink outputSink; 093 private EntryBuffers entryBuffers; 094 095 /** 096 * Coordinator for split log. Used by the zk-based log splitter. 097 * Not used by the procedure v2-based log splitter. 098 */ 099 private SplitLogWorkerCoordination splitLogWorkerCoordination; 100 101 private final WALFactory walFactory; 102 103 private MonitoredTask status; 104 105 // For checking the latest flushed sequence id 106 protected final LastSequenceId sequenceIdChecker; 107 108 // Map encodedRegionName -> lastFlushedSequenceId 109 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 110 111 // Map encodedRegionName -> maxSeqIdInStores 112 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 113 114 // the file being split currently 115 private FileStatus fileBeingSplit; 116 117 private final String tmpDirName; 118 119 /** 120 * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files. 121 */ 122 public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile"; 123 public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; 124 125 /** 126 * True if we are to run with bounded amount of writers rather than let the count blossom. 127 * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that 128 * is always bounded. Only applies when you are doing recovery to 'recovered.edits' 129 * files (the old default). Bounded writing tends to have higher throughput. 130 */ 131 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 132 133 public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; 134 public final static String SPLIT_WAL_WRITER_THREADS = 135 "hbase.regionserver.hlog.splitlog.writer.threads"; 136 137 @VisibleForTesting 138 WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, 139 Path rootDir, FileSystem rootFS, LastSequenceId idChecker, 140 SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) { 141 this.conf = HBaseConfiguration.create(conf); 142 String codecClassName = 143 conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 144 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 145 this.walDir = walDir; 146 this.walFS = walFS; 147 this.rootDir = rootDir; 148 this.rootFS = rootFS; 149 this.sequenceIdChecker = idChecker; 150 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 151 this.rsServices = rsServices; 152 this.walFactory = factory; 153 PipelineController controller = new PipelineController(); 154 this.tmpDirName = 155 conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 156 157 158 // if we limit the number of writers opened for sinking recovered edits 159 boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 160 boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); 161 long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); 162 int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); 163 164 if (splitToHFile) { 165 entryBuffers = new BoundedEntryBuffers(controller, bufferSize); 166 outputSink = 167 new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads); 168 } else if (splitWriterCreationBounded) { 169 entryBuffers = new BoundedEntryBuffers(controller, bufferSize); 170 outputSink = 171 new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); 172 } else { 173 entryBuffers = new EntryBuffers(controller, bufferSize); 174 outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); 175 } 176 } 177 178 WALFactory getWalFactory(){ 179 return this.walFactory; 180 } 181 182 FileStatus getFileBeingSplit() { 183 return fileBeingSplit; 184 } 185 186 String getTmpDirName() { 187 return this.tmpDirName; 188 } 189 190 Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() { 191 return regionMaxSeqIdInStores; 192 } 193 194 /** 195 * Splits a WAL file. 196 * @return false if it is interrupted by the progress-able. 197 */ 198 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 199 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 200 SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory, 201 RegionServerServices rsServices) throws IOException { 202 Path rootDir = CommonFSUtils.getRootDir(conf); 203 FileSystem rootFS = rootDir.getFileSystem(conf); 204 WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker, 205 splitLogWorkerCoordination, rsServices); 206 return s.splitLogFile(logfile, reporter); 207 } 208 209 /** 210 * Split a folder of WAL files. Delete the directory when done. 211 * Used by tools and unit tests. It should be package private. 212 * It is public only because TestWALObserver is in a different package, 213 * which uses this method to do log splitting. 214 * @return List of output files created by the split. 215 */ 216 @VisibleForTesting 217 public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS, 218 Configuration conf, final WALFactory factory) throws IOException { 219 Path rootDir = CommonFSUtils.getRootDir(conf); 220 FileSystem rootFS = rootDir.getFileSystem(conf); 221 final FileStatus[] logfiles = 222 SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); 223 List<Path> splits = new ArrayList<>(); 224 if (ArrayUtils.isNotEmpty(logfiles)) { 225 for (FileStatus logfile : logfiles) { 226 WALSplitter s = 227 new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null); 228 if (s.splitLogFile(logfile, null)) { 229 finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf); 230 if (s.outputSink.splits != null) { 231 splits.addAll(s.outputSink.splits); 232 } 233 } 234 } 235 } 236 if (!walFS.delete(logDir, true)) { 237 throw new IOException("Unable to delete src dir: " + logDir); 238 } 239 return splits; 240 } 241 242 /** 243 * WAL splitting implementation, splits one log file. 244 * @param logfile should be an actual log file. 245 */ 246 @VisibleForTesting 247 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { 248 Preconditions.checkState(status == null); 249 Preconditions.checkArgument(logfile.isFile(), 250 "passed in file status is for something other than a regular file."); 251 boolean isCorrupted = false; 252 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", 253 SPLIT_SKIP_ERRORS_DEFAULT); 254 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 255 Path logPath = logfile.getPath(); 256 boolean outputSinkStarted = false; 257 boolean progressFailed = false; 258 int editsCount = 0; 259 int editsSkipped = 0; 260 261 status = TaskMonitor.get().createStatus( 262 "Splitting log file " + logfile.getPath() + "into a temporary staging area."); 263 status.enableStatusJournal(true); 264 Reader logFileReader = null; 265 this.fileBeingSplit = logfile; 266 long startTS = EnvironmentEdgeManager.currentTime(); 267 try { 268 long logLength = logfile.getLen(); 269 LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength), 270 logLength); 271 status.setStatus("Opening log file " + logPath); 272 if (reporter != null && !reporter.progress()) { 273 progressFailed = true; 274 return false; 275 } 276 logFileReader = getReader(logfile, skipErrors, reporter); 277 if (logFileReader == null) { 278 LOG.warn("Nothing to split in WAL={}", logPath); 279 return true; 280 } 281 long openCost = EnvironmentEdgeManager.currentTime() - startTS; 282 LOG.info("Open WAL={} cost {} ms", logPath, openCost); 283 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 284 int numOpenedFilesLastCheck = 0; 285 outputSink.setReporter(reporter); 286 outputSink.setStatus(status); 287 outputSink.startWriterThreads(); 288 outputSinkStarted = true; 289 Entry entry; 290 Long lastFlushedSequenceId = -1L; 291 startTS = EnvironmentEdgeManager.currentTime(); 292 while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { 293 byte[] region = entry.getKey().getEncodedRegionName(); 294 String encodedRegionNameAsStr = Bytes.toString(region); 295 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 296 if (lastFlushedSequenceId == null) { 297 if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), 298 encodedRegionNameAsStr))) { 299 // The region directory itself is not present in the FS. This indicates that 300 // the region/table is already removed. We can just skip all the edits for this 301 // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits 302 // will get skipped by the seqId check below. 303 // See more details at https://issues.apache.org/jira/browse/HBASE-24189 304 LOG.info("{} no longer available in the FS. Skipping all edits for this region.", 305 encodedRegionNameAsStr); 306 lastFlushedSequenceId = Long.MAX_VALUE; 307 } else { 308 if (sequenceIdChecker != null) { 309 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 310 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 311 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 312 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 313 storeSeqId.getSequenceId()); 314 } 315 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 316 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 317 if (LOG.isDebugEnabled()) { 318 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " 319 + TextFormat.shortDebugString(ids)); 320 } 321 } 322 if (lastFlushedSequenceId == null) { 323 lastFlushedSequenceId = -1L; 324 } 325 } 326 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 327 } 328 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 329 editsSkipped++; 330 continue; 331 } 332 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 333 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 334 editsSkipped++; 335 continue; 336 } 337 entryBuffers.appendEntry(entry); 338 editsCount++; 339 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 340 // If sufficient edits have passed, check if we should report progress. 341 if (editsCount % interval == 0 342 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { 343 numOpenedFilesLastCheck = this.getNumOpenWriters(); 344 String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) 345 + " edits, skipped " + editsSkipped + " edits."; 346 status.setStatus("Split " + countsStr); 347 if (reporter != null && !reporter.progress()) { 348 progressFailed = true; 349 return false; 350 } 351 } 352 } 353 } catch (InterruptedException ie) { 354 IOException iie = new InterruptedIOException(); 355 iie.initCause(ie); 356 throw iie; 357 } catch (CorruptedLogFileException e) { 358 LOG.warn("Could not parse, corrupted WAL={}", logPath, e); 359 if (splitLogWorkerCoordination != null) { 360 // Some tests pass in a csm of null. 361 splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); 362 } else { 363 // for tests only 364 ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); 365 } 366 isCorrupted = true; 367 } catch (IOException e) { 368 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 369 throw e; 370 } finally { 371 final String log = "Finishing writing output logs and closing down"; 372 LOG.debug(log); 373 status.setStatus(log); 374 try { 375 if (null != logFileReader) { 376 logFileReader.close(); 377 } 378 } catch (IOException exception) { 379 LOG.warn("Could not close WAL reader", exception); 380 } 381 try { 382 if (outputSinkStarted) { 383 // Set progress_failed to true as the immediate following statement will reset its value 384 // when close() throws exception, progress_failed has the right value 385 progressFailed = true; 386 progressFailed = outputSink.close() == null; 387 } 388 } finally { 389 long processCost = EnvironmentEdgeManager.currentTime() - startTS; 390 // See if length got updated post lease recovery 391 String msg = "Processed " + editsCount + " edits across " + 392 outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost + 393 " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" + 394 StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() + 395 ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed; 396 LOG.info(msg); 397 status.markComplete(msg); 398 if (LOG.isDebugEnabled()) { 399 LOG.debug("WAL split completed for {} , Journal Log: {}", logPath, 400 status.prettyPrintJournal()); 401 } 402 } 403 } 404 return !progressFailed; 405 } 406 407 private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName) 408 throws IOException { 409 Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName); 410 return this.rootFS.exists(regionDirPath); 411 } 412 413 /** 414 * Create a new {@link Reader} for reading logs to split. 415 */ 416 private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) 417 throws IOException, CorruptedLogFileException { 418 Path path = file.getPath(); 419 long length = file.getLen(); 420 Reader in; 421 422 // Check for possibly empty file. With appends, currently Hadoop reports a 423 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 424 // HDFS-878 is committed. 425 if (length <= 0) { 426 LOG.warn("File {} might be still open, length is 0", path); 427 } 428 429 try { 430 RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter); 431 try { 432 in = getReader(path, reporter); 433 } catch (EOFException e) { 434 if (length <= 0) { 435 // TODO should we ignore an empty, not-last log file if skip.errors 436 // is false? Either way, the caller should decide what to do. E.g. 437 // ignore if this is the last log in sequence. 438 // TODO is this scenario still possible if the log has been 439 // recovered (i.e. closed) 440 LOG.warn("Could not open {} for reading. File is empty", path, e); 441 } 442 // EOFException being ignored 443 return null; 444 } 445 } catch (IOException e) { 446 if (e instanceof FileNotFoundException) { 447 // A wal file may not exist anymore. Nothing can be recovered so move on 448 LOG.warn("File {} does not exist anymore", path, e); 449 return null; 450 } 451 if (!skipErrors || e instanceof InterruptedIOException) { 452 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 453 } 454 throw new CorruptedLogFileException("skipErrors=true Could not open wal " 455 + path + " ignoring", e); 456 } 457 return in; 458 } 459 460 private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) 461 throws CorruptedLogFileException, IOException { 462 try { 463 return in.next(); 464 } catch (EOFException eof) { 465 // truncated files are expected if a RS crashes (see HBASE-2643) 466 LOG.info("EOF from wal {}. Continuing.", path); 467 return null; 468 } catch (IOException e) { 469 // If the IOE resulted from bad file format, 470 // then this problem is idempotent and retrying won't help 471 if (e.getCause() != null && (e.getCause() instanceof ParseException 472 || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { 473 LOG.warn("Parse exception from wal {}. Continuing", path, e); 474 return null; 475 } 476 if (!skipErrors) { 477 throw e; 478 } 479 throw new CorruptedLogFileException("skipErrors=true Ignoring exception" 480 + " while parsing wal " + path + ". Marking as corrupted", e); 481 } 482 } 483 484 /** 485 * Create a new {@link WALProvider.Writer} for writing log splits. 486 * @return a new Writer instance, caller should close 487 */ 488 protected WALProvider.Writer createWriter(Path logfile) throws IOException { 489 return walFactory.createRecoveredEditsWriter(walFS, logfile); 490 } 491 492 /** 493 * Create a new {@link Reader} for reading logs to split. 494 * @return new Reader instance, caller should close 495 */ 496 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { 497 return walFactory.createReader(walFS, curLogFile, reporter); 498 } 499 500 /** 501 * Get current open writers 502 */ 503 private int getNumOpenWriters() { 504 int result = 0; 505 if (this.outputSink != null) { 506 result += this.outputSink.getNumOpenWriters(); 507 } 508 return result; 509 } 510 511 /** 512 * Contains some methods to control WAL-entries producer / consumer interactions 513 */ 514 public static class PipelineController { 515 // If an exception is thrown by one of the other threads, it will be 516 // stored here. 517 AtomicReference<Throwable> thrown = new AtomicReference<>(); 518 519 // Wait/notify for when data has been produced by the writer thread, 520 // consumed by the reader thread, or an exception occurred 521 final Object dataAvailable = new Object(); 522 523 void writerThreadError(Throwable t) { 524 thrown.compareAndSet(null, t); 525 } 526 527 /** 528 * Check for errors in the writer threads. If any is found, rethrow it. 529 */ 530 void checkForErrors() throws IOException { 531 Throwable thrown = this.thrown.get(); 532 if (thrown == null) { 533 return; 534 } 535 this.thrown.set(null); 536 if (thrown instanceof IOException) { 537 throw new IOException(thrown); 538 } else { 539 throw new RuntimeException(thrown); 540 } 541 } 542 } 543 544 static class CorruptedLogFileException extends Exception { 545 private static final long serialVersionUID = 1L; 546 547 CorruptedLogFileException(String s) { 548 super(s); 549 } 550 551 /** 552 * CorruptedLogFileException with cause 553 * 554 * @param message the message for this exception 555 * @param cause the cause for this exception 556 */ 557 CorruptedLogFileException(String message, Throwable cause) { 558 super(message, cause); 559 } 560 } 561}