001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile; 021 022import java.io.EOFException; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.text.ParseException; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.commons.lang3.ArrayUtils; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 043import org.apache.hadoop.hbase.io.HeapSize; 044import org.apache.hadoop.hbase.master.SplitLogManager; 045import org.apache.hadoop.hbase.monitoring.MonitoredTask; 046import org.apache.hadoop.hbase.monitoring.TaskMonitor; 047import org.apache.hadoop.hbase.procedure2.util.StringUtils; 048import org.apache.hadoop.hbase.regionserver.LastSequenceId; 049import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CancelableProgressable; 052import org.apache.hadoop.hbase.util.ClassSize; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.FSUtils; 055import org.apache.hadoop.hbase.wal.WAL.Entry; 056import org.apache.hadoop.hbase.wal.WAL.Reader; 057import org.apache.hadoop.hbase.wal.WALProvider.Writer; 058import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 059import org.apache.hadoop.ipc.RemoteException; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 065import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 066import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 067 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 070/** 071 * This class is responsible for splitting up a bunch of regionserver commit log 072 * files that are no longer being written to, into new files, one per region, for 073 * recovering data on startup. Delete the old log files when finished. 074 */ 075@InterfaceAudience.Private 076public class WALSplitter { 077 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 078 079 /** By default we retry errors in splitting, rather than skipping. */ 080 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 081 082 // Parameters for split process 083 protected final Path walDir; 084 protected final FileSystem walFS; 085 protected final Configuration conf; 086 087 // Major subcomponents of the split process. 088 // These are separated into inner classes to make testing easier. 089 OutputSink outputSink; 090 private EntryBuffers entryBuffers; 091 092 private SplitLogWorkerCoordination splitLogWorkerCoordination; 093 private final WALFactory walFactory; 094 095 private MonitoredTask status; 096 097 // For checking the latest flushed sequence id 098 protected final LastSequenceId sequenceIdChecker; 099 100 // Map encodedRegionName -> lastFlushedSequenceId 101 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 102 103 // Map encodedRegionName -> maxSeqIdInStores 104 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 105 106 // the file being split currently 107 private FileStatus fileBeingSplit; 108 109 // if we limit the number of writers opened for sinking recovered edits 110 private final boolean splitWriterCreationBounded; 111 112 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 113 114 115 @VisibleForTesting 116 WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, 117 LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { 118 this.conf = HBaseConfiguration.create(conf); 119 String codecClassName = 120 conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 121 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 122 this.walDir = walDir; 123 this.walFS = walFS; 124 this.sequenceIdChecker = idChecker; 125 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 126 127 this.walFactory = factory; 128 PipelineController controller = new PipelineController(); 129 130 this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 131 132 entryBuffers = new EntryBuffers(controller, 133 this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), 134 splitWriterCreationBounded); 135 136 int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); 137 if (splitWriterCreationBounded) { 138 outputSink = 139 new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads); 140 } else { 141 outputSink = 142 new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); 143 } 144 } 145 146 WALFactory getWalFactory(){ 147 return this.walFactory; 148 } 149 150 FileStatus getFileBeingSplit() { 151 return fileBeingSplit; 152 } 153 154 Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() { 155 return regionMaxSeqIdInStores; 156 } 157 158 /** 159 * Splits a WAL file into region's recovered-edits directory. 160 * This is the main entry point for distributed log splitting from SplitLogWorker. 161 * <p> 162 * If the log file has N regions then N recovered.edits files will be produced. 163 * <p> 164 * @return false if it is interrupted by the progress-able. 165 */ 166 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 167 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 168 SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) 169 throws IOException { 170 WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, 171 splitLogWorkerCoordination); 172 return s.splitLogFile(logfile, reporter); 173 } 174 175 // A wrapper to split one log folder using the method used by distributed 176 // log splitting. Used by tools and unit tests. It should be package private. 177 // It is public only because TestWALObserver is in a different package, 178 // which uses this method to do log splitting. 179 @VisibleForTesting 180 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, 181 FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { 182 final FileStatus[] logfiles = SplitLogManager.getFileList(conf, 183 Collections.singletonList(logDir), null); 184 List<Path> splits = new ArrayList<>(); 185 if (ArrayUtils.isNotEmpty(logfiles)) { 186 for (FileStatus logfile: logfiles) { 187 WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); 188 if (s.splitLogFile(logfile, null)) { 189 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); 190 if (s.outputSink.splits != null) { 191 splits.addAll(s.outputSink.splits); 192 } 193 } 194 } 195 } 196 if (!walFS.delete(logDir, true)) { 197 throw new IOException("Unable to delete src dir: " + logDir); 198 } 199 return splits; 200 } 201 202 /** 203 * log splitting implementation, splits one log file. 204 * @param logfile should be an actual log file. 205 */ 206 @VisibleForTesting 207 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { 208 Preconditions.checkState(status == null); 209 Preconditions.checkArgument(logfile.isFile(), 210 "passed in file status is for something other than a regular file."); 211 boolean isCorrupted = false; 212 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", 213 SPLIT_SKIP_ERRORS_DEFAULT); 214 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 215 Path logPath = logfile.getPath(); 216 boolean outputSinkStarted = false; 217 boolean progress_failed = false; 218 int editsCount = 0; 219 int editsSkipped = 0; 220 221 status = TaskMonitor.get().createStatus( 222 "Splitting log file " + logfile.getPath() + "into a temporary staging area."); 223 Reader logFileReader = null; 224 this.fileBeingSplit = logfile; 225 long startTS = EnvironmentEdgeManager.currentTime(); 226 try { 227 long logLength = logfile.getLen(); 228 LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength), 229 logLength); 230 status.setStatus("Opening log file"); 231 if (reporter != null && !reporter.progress()) { 232 progress_failed = true; 233 return false; 234 } 235 logFileReader = getReader(logfile, skipErrors, reporter); 236 if (logFileReader == null) { 237 LOG.warn("Nothing to split in WAL={}", logPath); 238 return true; 239 } 240 long openCost = EnvironmentEdgeManager.currentTime() - startTS; 241 LOG.info("Open WAL={} cost {} ms", logPath, openCost); 242 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 243 int numOpenedFilesLastCheck = 0; 244 outputSink.setReporter(reporter); 245 outputSink.startWriterThreads(); 246 outputSinkStarted = true; 247 Entry entry; 248 Long lastFlushedSequenceId = -1L; 249 startTS = EnvironmentEdgeManager.currentTime(); 250 while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { 251 byte[] region = entry.getKey().getEncodedRegionName(); 252 String encodedRegionNameAsStr = Bytes.toString(region); 253 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 254 if (lastFlushedSequenceId == null) { 255 if (sequenceIdChecker != null) { 256 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 257 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 258 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 259 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 260 storeSeqId.getSequenceId()); 261 } 262 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 263 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 264 if (LOG.isDebugEnabled()) { 265 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + 266 TextFormat.shortDebugString(ids)); 267 } 268 } 269 if (lastFlushedSequenceId == null) { 270 lastFlushedSequenceId = -1L; 271 } 272 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 273 } 274 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 275 editsSkipped++; 276 continue; 277 } 278 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 279 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 280 editsSkipped++; 281 continue; 282 } 283 entryBuffers.appendEntry(entry); 284 editsCount++; 285 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 286 // If sufficient edits have passed, check if we should report progress. 287 if (editsCount % interval == 0 288 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { 289 numOpenedFilesLastCheck = this.getNumOpenWriters(); 290 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits())) 291 + " edits, skipped " + editsSkipped + " edits."; 292 status.setStatus("Split " + countsStr); 293 if (reporter != null && !reporter.progress()) { 294 progress_failed = true; 295 return false; 296 } 297 } 298 } 299 } catch (InterruptedException ie) { 300 IOException iie = new InterruptedIOException(); 301 iie.initCause(ie); 302 throw iie; 303 } catch (CorruptedLogFileException e) { 304 LOG.warn("Could not parse, corrupted WAL={}", logPath, e); 305 if (splitLogWorkerCoordination != null) { 306 // Some tests pass in a csm of null. 307 splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); 308 } else { 309 // for tests only 310 ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); 311 } 312 isCorrupted = true; 313 } catch (IOException e) { 314 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 315 throw e; 316 } finally { 317 LOG.debug("Finishing writing output logs and closing down"); 318 try { 319 if (null != logFileReader) { 320 logFileReader.close(); 321 } 322 } catch (IOException exception) { 323 LOG.warn("Could not close WAL reader", exception); 324 } 325 try { 326 if (outputSinkStarted) { 327 // Set progress_failed to true as the immediate following statement will reset its value 328 // when finishWritingAndClose() throws exception, progress_failed has the right value 329 progress_failed = true; 330 progress_failed = outputSink.finishWritingAndClose() == null; 331 } 332 } finally { 333 long processCost = EnvironmentEdgeManager.currentTime() - startTS; 334 // See if length got updated post lease recovery 335 String msg = "Processed " + editsCount + " edits across " + 336 outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost + 337 " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" + 338 StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() + 339 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; 340 LOG.info(msg); 341 status.markComplete(msg); 342 } 343 } 344 return !progress_failed; 345 } 346 347 /** 348 * Create a new {@link Reader} for reading logs to split. 349 */ 350 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) 351 throws IOException, CorruptedLogFileException { 352 Path path = file.getPath(); 353 long length = file.getLen(); 354 Reader in; 355 356 // Check for possibly empty file. With appends, currently Hadoop reports a 357 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 358 // HDFS-878 is committed. 359 if (length <= 0) { 360 LOG.warn("File {} might be still open, length is 0", path); 361 } 362 363 try { 364 FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); 365 try { 366 in = getReader(path, reporter); 367 } catch (EOFException e) { 368 if (length <= 0) { 369 // TODO should we ignore an empty, not-last log file if skip.errors 370 // is false? Either way, the caller should decide what to do. E.g. 371 // ignore if this is the last log in sequence. 372 // TODO is this scenario still possible if the log has been 373 // recovered (i.e. closed) 374 LOG.warn("Could not open {} for reading. File is empty", path, e); 375 } 376 // EOFException being ignored 377 return null; 378 } 379 } catch (IOException e) { 380 if (e instanceof FileNotFoundException) { 381 // A wal file may not exist anymore. Nothing can be recovered so move on 382 LOG.warn("File {} does not exist anymore", path, e); 383 return null; 384 } 385 if (!skipErrors || e instanceof InterruptedIOException) { 386 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 387 } 388 CorruptedLogFileException t = 389 new CorruptedLogFileException("skipErrors=true Could not open wal " + 390 path + " ignoring"); 391 t.initCause(e); 392 throw t; 393 } 394 return in; 395 } 396 397 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) 398 throws CorruptedLogFileException, IOException { 399 try { 400 return in.next(); 401 } catch (EOFException eof) { 402 // truncated files are expected if a RS crashes (see HBASE-2643) 403 LOG.info("EOF from wal {}. Continuing.", path); 404 return null; 405 } catch (IOException e) { 406 // If the IOE resulted from bad file format, 407 // then this problem is idempotent and retrying won't help 408 if (e.getCause() != null && (e.getCause() instanceof ParseException 409 || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { 410 LOG.warn("Parse exception from wal {}. Continuing", path, e); 411 return null; 412 } 413 if (!skipErrors) { 414 throw e; 415 } 416 CorruptedLogFileException t = 417 new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal " 418 + path + ". Marking as corrupted"); 419 t.initCause(e); 420 throw t; 421 } 422 } 423 424 /** 425 * Create a new {@link WALProvider.Writer} for writing log splits. 426 * @return a new Writer instance, caller should close 427 */ 428 protected WALProvider.Writer createWriter(Path logfile) throws IOException { 429 return walFactory.createRecoveredEditsWriter(walFS, logfile); 430 } 431 432 /** 433 * Create a new {@link Reader} for reading logs to split. 434 * @return new Reader instance, caller should close 435 */ 436 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { 437 return walFactory.createReader(walFS, curLogFile, reporter); 438 } 439 440 /** 441 * Get current open writers 442 */ 443 private int getNumOpenWriters() { 444 int result = 0; 445 if (this.outputSink != null) { 446 result += this.outputSink.getNumOpenWriters(); 447 } 448 return result; 449 } 450 451 /** 452 * Contains some methods to control WAL-entries producer / consumer interactions 453 */ 454 public static class PipelineController { 455 // If an exception is thrown by one of the other threads, it will be 456 // stored here. 457 AtomicReference<Throwable> thrown = new AtomicReference<>(); 458 459 // Wait/notify for when data has been produced by the writer thread, 460 // consumed by the reader thread, or an exception occurred 461 final Object dataAvailable = new Object(); 462 463 void writerThreadError(Throwable t) { 464 thrown.compareAndSet(null, t); 465 } 466 467 /** 468 * Check for errors in the writer threads. If any is found, rethrow it. 469 */ 470 void checkForErrors() throws IOException { 471 Throwable thrown = this.thrown.get(); 472 if (thrown == null) { 473 return; 474 } 475 if (thrown instanceof IOException) { 476 throw new IOException(thrown); 477 } else { 478 throw new RuntimeException(thrown); 479 } 480 } 481 } 482 483 /** 484 * A buffer of some number of edits for a given region. 485 * This accumulates edits and also provides a memory optimization in order to 486 * share a single byte array instance for the table and region name. 487 * Also tracks memory usage of the accumulated edits. 488 */ 489 public static class RegionEntryBuffer implements HeapSize { 490 long heapInBuffer = 0; 491 List<Entry> entryBuffer; 492 TableName tableName; 493 byte[] encodedRegionName; 494 495 RegionEntryBuffer(TableName tableName, byte[] region) { 496 this.tableName = tableName; 497 this.encodedRegionName = region; 498 this.entryBuffer = new ArrayList<>(); 499 } 500 501 long appendEntry(Entry entry) { 502 internify(entry); 503 entryBuffer.add(entry); 504 long incrHeap = entry.getEdit().heapSize() + 505 ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers 506 0; // TODO linkedlist entry 507 heapInBuffer += incrHeap; 508 return incrHeap; 509 } 510 511 private void internify(Entry entry) { 512 WALKeyImpl k = entry.getKey(); 513 k.internTableName(this.tableName); 514 k.internEncodedRegionName(this.encodedRegionName); 515 } 516 517 @Override 518 public long heapSize() { 519 return heapInBuffer; 520 } 521 522 public byte[] getEncodedRegionName() { 523 return encodedRegionName; 524 } 525 526 public List<Entry> getEntryBuffer() { 527 return entryBuffer; 528 } 529 530 public TableName getTableName() { 531 return tableName; 532 } 533 } 534 535 /** 536 * Class wraps the actual writer which writes data out and related statistics 537 */ 538 public abstract static class SinkWriter { 539 /* Count of edits written to this path */ 540 long editsWritten = 0; 541 /* Count of edits skipped to this path */ 542 long editsSkipped = 0; 543 /* Number of nanos spent writing to this log */ 544 long nanosSpent = 0; 545 546 void incrementEdits(int edits) { 547 editsWritten += edits; 548 } 549 550 void incrementSkippedEdits(int skipped) { 551 editsSkipped += skipped; 552 } 553 554 void incrementNanoTime(long nanos) { 555 nanosSpent += nanos; 556 } 557 } 558 559 /** 560 * Private data structure that wraps a Writer and its Path, also collecting statistics about the 561 * data written to this output. 562 */ 563 final static class WriterAndPath extends SinkWriter { 564 final Path path; 565 final Writer writer; 566 final long minLogSeqNum; 567 568 WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) { 569 this.path = path; 570 this.writer = writer; 571 this.minLogSeqNum = minLogSeqNum; 572 } 573 } 574 575 static class CorruptedLogFileException extends Exception { 576 private static final long serialVersionUID = 1L; 577 578 CorruptedLogFileException(String s) { 579 super(s); 580 } 581 } 582}