001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.EOFException; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.NavigableSet; 032import java.util.Set; 033import java.util.TreeMap; 034import java.util.TreeSet; 035import java.util.UUID; 036import java.util.concurrent.Callable; 037import java.util.concurrent.CompletionService; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ExecutorCompletionService; 041import java.util.concurrent.Future; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.concurrent.atomic.AtomicReference; 047import java.util.regex.Matcher; 048import java.util.regex.Pattern; 049import org.apache.commons.lang3.ArrayUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.FileAlreadyExistsException; 052import org.apache.hadoop.fs.FileStatus; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.PathFilter; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellScanner; 058import org.apache.hadoop.hbase.CellUtil; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.client.Delete; 063import org.apache.hadoop.hbase.client.Durability; 064import org.apache.hadoop.hbase.client.Mutation; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 068import org.apache.hadoop.hbase.io.HeapSize; 069import org.apache.hadoop.hbase.log.HBaseMarkers; 070import org.apache.hadoop.hbase.master.SplitLogManager; 071import org.apache.hadoop.hbase.monitoring.MonitoredTask; 072import org.apache.hadoop.hbase.monitoring.TaskMonitor; 073import org.apache.hadoop.hbase.regionserver.HRegion; 074import org.apache.hadoop.hbase.regionserver.LastSequenceId; 075import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 076import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 077import org.apache.hadoop.hbase.util.Bytes; 078import org.apache.hadoop.hbase.util.CancelableProgressable; 079import org.apache.hadoop.hbase.util.ClassSize; 080import org.apache.hadoop.hbase.util.CollectionUtils.IOExceptionSupplier; 081import org.apache.hadoop.hbase.util.FSUtils; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hbase.util.Threads; 084import org.apache.hadoop.hbase.wal.WAL.Entry; 085import org.apache.hadoop.hbase.wal.WAL.Reader; 086import org.apache.hadoop.hbase.wal.WALProvider.Writer; 087import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 088import org.apache.hadoop.io.MultipleIOException; 089import org.apache.hadoop.ipc.RemoteException; 090import org.apache.yetus.audience.InterfaceAudience; 091import org.slf4j.Logger; 092import org.slf4j.LoggerFactory; 093 094import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 095import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 096import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 097import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 098import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 099import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 100 101import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 106/** 107 * This class is responsible for splitting up a bunch of regionserver commit log 108 * files that are no longer being written to, into new files, one per region, for 109 * recovering data on startup. Delete the old log files when finished. 110 */ 111@InterfaceAudience.Private 112public class WALSplitter { 113 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 114 115 /** By default we retry errors in splitting, rather than skipping. */ 116 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 117 118 // Parameters for split process 119 protected final Path walDir; 120 protected final FileSystem walFS; 121 protected final Configuration conf; 122 123 // Major subcomponents of the split process. 124 // These are separated into inner classes to make testing easier. 125 OutputSink outputSink; 126 private EntryBuffers entryBuffers; 127 128 private SplitLogWorkerCoordination splitLogWorkerCoordination; 129 private final WALFactory walFactory; 130 131 private MonitoredTask status; 132 133 // For checking the latest flushed sequence id 134 protected final LastSequenceId sequenceIdChecker; 135 136 // Map encodedRegionName -> lastFlushedSequenceId 137 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 138 139 // Map encodedRegionName -> maxSeqIdInStores 140 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 141 142 // the file being split currently 143 private FileStatus fileBeingSplit; 144 145 // if we limit the number of writers opened for sinking recovered edits 146 private final boolean splitWriterCreationBounded; 147 148 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 149 150 151 @VisibleForTesting 152 WALSplitter(final WALFactory factory, Configuration conf, Path walDir, 153 FileSystem walFS, LastSequenceId idChecker, 154 SplitLogWorkerCoordination splitLogWorkerCoordination) { 155 this.conf = HBaseConfiguration.create(conf); 156 String codecClassName = conf 157 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 158 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 159 this.walDir = walDir; 160 this.walFS = walFS; 161 this.sequenceIdChecker = idChecker; 162 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 163 164 this.walFactory = factory; 165 PipelineController controller = new PipelineController(); 166 167 this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 168 169 entryBuffers = new EntryBuffers(controller, 170 this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), 171 splitWriterCreationBounded); 172 173 int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); 174 if(splitWriterCreationBounded){ 175 outputSink = new BoundedLogWriterCreationOutputSink( 176 controller, entryBuffers, numWriterThreads); 177 }else { 178 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); 179 } 180 } 181 182 /** 183 * Splits a WAL file into region's recovered-edits directory. 184 * This is the main entry point for distributed log splitting from SplitLogWorker. 185 * <p> 186 * If the log file has N regions then N recovered.edits files will be produced. 187 * <p> 188 * @return false if it is interrupted by the progress-able. 189 */ 190 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 191 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 192 SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) 193 throws IOException { 194 WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, 195 splitLogWorkerCoordination); 196 return s.splitLogFile(logfile, reporter); 197 } 198 199 // A wrapper to split one log folder using the method used by distributed 200 // log splitting. Used by tools and unit tests. It should be package private. 201 // It is public only because TestWALObserver is in a different package, 202 // which uses this method to do log splitting. 203 @VisibleForTesting 204 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, 205 FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { 206 final FileStatus[] logfiles = SplitLogManager.getFileList(conf, 207 Collections.singletonList(logDir), null); 208 List<Path> splits = new ArrayList<>(); 209 if (ArrayUtils.isNotEmpty(logfiles)) { 210 for (FileStatus logfile: logfiles) { 211 WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); 212 if (s.splitLogFile(logfile, null)) { 213 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); 214 if (s.outputSink.splits != null) { 215 splits.addAll(s.outputSink.splits); 216 } 217 } 218 } 219 } 220 if (!walFS.delete(logDir, true)) { 221 throw new IOException("Unable to delete src dir: " + logDir); 222 } 223 return splits; 224 } 225 226 /** 227 * log splitting implementation, splits one log file. 228 * @param logfile should be an actual log file. 229 */ 230 @VisibleForTesting 231 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { 232 Preconditions.checkState(status == null); 233 Preconditions.checkArgument(logfile.isFile(), 234 "passed in file status is for something other than a regular file."); 235 boolean isCorrupted = false; 236 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", 237 SPLIT_SKIP_ERRORS_DEFAULT); 238 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 239 Path logPath = logfile.getPath(); 240 boolean outputSinkStarted = false; 241 boolean progress_failed = false; 242 int editsCount = 0; 243 int editsSkipped = 0; 244 245 status = TaskMonitor.get().createStatus( 246 "Splitting log file " + logfile.getPath() + "into a temporary staging area."); 247 Reader logFileReader = null; 248 this.fileBeingSplit = logfile; 249 try { 250 long logLength = logfile.getLen(); 251 LOG.info("Splitting WAL={}, length={}", logPath, logLength); 252 status.setStatus("Opening log file"); 253 if (reporter != null && !reporter.progress()) { 254 progress_failed = true; 255 return false; 256 } 257 logFileReader = getReader(logfile, skipErrors, reporter); 258 if (logFileReader == null) { 259 LOG.warn("Nothing to split in WAL={}", logPath); 260 return true; 261 } 262 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 263 int numOpenedFilesLastCheck = 0; 264 outputSink.setReporter(reporter); 265 outputSink.startWriterThreads(); 266 outputSinkStarted = true; 267 Entry entry; 268 Long lastFlushedSequenceId = -1L; 269 while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { 270 byte[] region = entry.getKey().getEncodedRegionName(); 271 String encodedRegionNameAsStr = Bytes.toString(region); 272 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 273 if (lastFlushedSequenceId == null) { 274 if (sequenceIdChecker != null) { 275 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 276 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 277 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 278 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 279 storeSeqId.getSequenceId()); 280 } 281 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 282 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 283 if (LOG.isDebugEnabled()) { 284 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + 285 TextFormat.shortDebugString(ids)); 286 } 287 } 288 if (lastFlushedSequenceId == null) { 289 lastFlushedSequenceId = -1L; 290 } 291 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 292 } 293 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 294 editsSkipped++; 295 continue; 296 } 297 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 298 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 299 editsSkipped++; 300 continue; 301 } 302 entryBuffers.appendEntry(entry); 303 editsCount++; 304 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 305 // If sufficient edits have passed, check if we should report progress. 306 if (editsCount % interval == 0 307 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { 308 numOpenedFilesLastCheck = this.getNumOpenWriters(); 309 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits())) 310 + " edits, skipped " + editsSkipped + " edits."; 311 status.setStatus("Split " + countsStr); 312 if (reporter != null && !reporter.progress()) { 313 progress_failed = true; 314 return false; 315 } 316 } 317 } 318 } catch (InterruptedException ie) { 319 IOException iie = new InterruptedIOException(); 320 iie.initCause(ie); 321 throw iie; 322 } catch (CorruptedLogFileException e) { 323 LOG.warn("Could not parse, corrupted WAL={}", logPath, e); 324 if (splitLogWorkerCoordination != null) { 325 // Some tests pass in a csm of null. 326 splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); 327 } else { 328 // for tests only 329 ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); 330 } 331 isCorrupted = true; 332 } catch (IOException e) { 333 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 334 throw e; 335 } finally { 336 LOG.debug("Finishing writing output logs and closing down"); 337 try { 338 if (null != logFileReader) { 339 logFileReader.close(); 340 } 341 } catch (IOException exception) { 342 LOG.warn("Could not close WAL reader", exception); 343 } 344 try { 345 if (outputSinkStarted) { 346 // Set progress_failed to true as the immediate following statement will reset its value 347 // when finishWritingAndClose() throws exception, progress_failed has the right value 348 progress_failed = true; 349 progress_failed = outputSink.finishWritingAndClose() == null; 350 } 351 } finally { 352 String msg = 353 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() 354 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath + 355 ", length=" + logfile.getLen() + // See if length got updated post lease recovery 356 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; 357 LOG.info(msg); 358 status.markComplete(msg); 359 } 360 } 361 return !progress_failed; 362 } 363 364 /** 365 * Completes the work done by splitLogFile by archiving logs 366 * <p> 367 * It is invoked by SplitLogManager once it knows that one of the 368 * SplitLogWorkers have completed the splitLogFile() part. If the master 369 * crashes then this function might get called multiple times. 370 * <p> 371 * @param logfile 372 * @param conf 373 * @throws IOException 374 */ 375 public static void finishSplitLogFile(String logfile, 376 Configuration conf) throws IOException { 377 Path walDir = FSUtils.getWALRootDir(conf); 378 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 379 Path logPath; 380 if (FSUtils.isStartingWithPath(walDir, logfile)) { 381 logPath = new Path(logfile); 382 } else { 383 logPath = new Path(walDir, logfile); 384 } 385 finishSplitLogFile(walDir, oldLogDir, logPath, conf); 386 } 387 388 private static void finishSplitLogFile(Path walDir, Path oldLogDir, 389 Path logPath, Configuration conf) throws IOException { 390 List<Path> processedLogs = new ArrayList<>(); 391 List<Path> corruptedLogs = new ArrayList<>(); 392 FileSystem walFS = walDir.getFileSystem(conf); 393 if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { 394 corruptedLogs.add(logPath); 395 } else { 396 processedLogs.add(logPath); 397 } 398 archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); 399 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); 400 walFS.delete(stagingDir, true); 401 } 402 403 /** 404 * Moves processed logs to a oldLogDir after successful processing Moves 405 * corrupted logs (any log that couldn't be successfully parsed to corruptDir 406 * (.corrupt) for later investigation 407 * 408 * @param corruptedLogs 409 * @param processedLogs 410 * @param oldLogDir 411 * @param walFS WAL FileSystem to archive files on. 412 * @param conf 413 * @throws IOException 414 */ 415 private static void archiveLogs( 416 final List<Path> corruptedLogs, 417 final List<Path> processedLogs, final Path oldLogDir, 418 final FileSystem walFS, final Configuration conf) throws IOException { 419 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 420 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 421 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", 422 corruptDir); 423 } 424 if (!walFS.mkdirs(corruptDir)) { 425 LOG.info("Unable to mkdir {}", corruptDir); 426 } 427 walFS.mkdirs(oldLogDir); 428 429 // this method can get restarted or called multiple times for archiving 430 // the same log files. 431 for (Path corrupted : corruptedLogs) { 432 Path p = new Path(corruptDir, corrupted.getName()); 433 if (walFS.exists(corrupted)) { 434 if (!walFS.rename(corrupted, p)) { 435 LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); 436 } else { 437 LOG.warn("Moved corrupted log {} to {}", corrupted, p); 438 } 439 } 440 } 441 442 for (Path p : processedLogs) { 443 Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); 444 if (walFS.exists(p)) { 445 if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { 446 LOG.warn("Unable to move {} to {}", p, newPath); 447 } else { 448 LOG.info("Archived processed log {} to {}", p, newPath); 449 } 450 } 451 } 452 } 453 454 /** 455 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in 456 * <code>logEntry</code> named for the sequenceid in the passed 457 * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332. 458 * This method also ensures existence of RECOVERED_EDITS_DIR under the region 459 * creating it if necessary. 460 * @param fs 461 * @param logEntry 462 * @param rootDir HBase root dir. 463 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 464 * @return Path to file into which to dump split log edits. 465 * @throws IOException 466 */ 467 @SuppressWarnings("deprecation") 468 @VisibleForTesting 469 static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, 470 Configuration conf) throws IOException { 471 FileSystem walFS = FSUtils.getWALFileSystem(conf); 472 Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName()); 473 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); 474 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); 475 Path dir = getRegionDirRecoveredEditsDir(regionDir); 476 477 478 if (walFS.exists(dir) && walFS.isFile(dir)) { 479 Path tmp = new Path("/tmp"); 480 if (!walFS.exists(tmp)) { 481 walFS.mkdirs(tmp); 482 } 483 tmp = new Path(tmp, 484 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); 485 LOG.warn("Found existing old file: {}. It could be some " 486 + "leftover of an old installation. It should be a folder instead. " 487 + "So moving it to {}", dir, tmp); 488 if (!walFS.rename(dir, tmp)) { 489 LOG.warn("Failed to sideline old file {}", dir); 490 } 491 } 492 493 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 494 LOG.warn("mkdir failed on {}", dir); 495 } 496 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 497 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 498 // region's replayRecoveredEdits will not delete it 499 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId()); 500 fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); 501 return new Path(dir, fileName); 502 } 503 504 private static String getTmpRecoveredEditsFileName(String fileName) { 505 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 506 } 507 508 /** 509 * Get the completed recovered edits file path, renaming it to be by last edit 510 * in the file from its first edit. Then we could use the name to skip 511 * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}. 512 * @param srcPath 513 * @param maximumEditLogSeqNum 514 * @return dstPath take file's last edit log seq num as the name 515 */ 516 private static Path getCompletedRecoveredEditsFilePath(Path srcPath, 517 long maximumEditLogSeqNum) { 518 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); 519 return new Path(srcPath.getParent(), fileName); 520 } 521 522 @VisibleForTesting 523 static String formatRecoveredEditsFileName(final long seqid) { 524 return String.format("%019d", seqid); 525 } 526 527 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 528 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 529 530 /** 531 * @param regionDir 532 * This regions directory in the filesystem. 533 * @return The directory that holds recovered edits files for the region 534 * <code>regionDir</code> 535 */ 536 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 537 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 538 } 539 540 /** 541 * Check whether there is recovered.edits in the region dir 542 * @param conf conf 543 * @param regionInfo the region to check 544 * @throws IOException IOException 545 * @return true if recovered.edits exist in the region dir 546 */ 547 public static boolean hasRecoveredEdits(final Configuration conf, 548 final RegionInfo regionInfo) throws IOException { 549 // No recovered.edits for non default replica regions 550 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 551 return false; 552 } 553 // Only default replica region can reach here, so we can use regioninfo 554 // directly without converting it to default replica's regioninfo. 555 Path regionWALDir = 556 FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 557 Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), regionInfo); 558 Path wrongRegionWALDir = 559 FSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 560 FileSystem walFs = FSUtils.getWALFileSystem(conf); 561 FileSystem rootFs = FSUtils.getRootDirFileSystem(conf); 562 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 563 if (!files.isEmpty()) { 564 return true; 565 } 566 files = getSplitEditFilesSorted(rootFs, regionDir); 567 if (!files.isEmpty()) { 568 return true; 569 } 570 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 571 return !files.isEmpty(); 572 } 573 574 575 /** 576 * Returns sorted set of edit files made by splitter, excluding files 577 * with '.temp' suffix. 578 * 579 * @param walFS WAL FileSystem used to retrieving split edits files. 580 * @param regionDir WAL region dir to look for recovered edits files under. 581 * @return Files in passed <code>regionDir</code> as a sorted set. 582 * @throws IOException 583 */ 584 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 585 final Path regionDir) throws IOException { 586 NavigableSet<Path> filesSorted = new TreeSet<>(); 587 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 588 if (!walFS.exists(editsdir)) { 589 return filesSorted; 590 } 591 FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { 592 @Override 593 public boolean accept(Path p) { 594 boolean result = false; 595 try { 596 // Return files and only files that match the editfile names pattern. 597 // There can be other files in this directory other than edit files. 598 // In particular, on error, we'll move aside the bad edit file giving 599 // it a timestamp suffix. See moveAsideBadEditsFile. 600 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 601 result = walFS.isFile(p) && m.matches(); 602 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 603 // because it means splitwal thread is writting this file. 604 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 605 result = false; 606 } 607 // Skip SeqId Files 608 if (isSequenceIdFile(p)) { 609 result = false; 610 } 611 } catch (IOException e) { 612 LOG.warn("Failed isFile check on {}", p, e); 613 } 614 return result; 615 } 616 }); 617 if (ArrayUtils.isNotEmpty(files)) { 618 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 619 } 620 return filesSorted; 621 } 622 623 /** 624 * Move aside a bad edits file. 625 * 626 * @param walFS WAL FileSystem used to rename bad edits file. 627 * @param edits 628 * Edits file to move aside. 629 * @return The name of the moved aside file. 630 * @throws IOException 631 */ 632 public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) 633 throws IOException { 634 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." 635 + System.currentTimeMillis()); 636 if (!walFS.rename(edits, moveAsideName)) { 637 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 638 } 639 return moveAsideName; 640 } 641 642 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 643 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 644 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 645 646 /** 647 * Is the given file a region open sequence id file. 648 */ 649 @VisibleForTesting 650 public static boolean isSequenceIdFile(final Path file) { 651 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 652 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 653 } 654 655 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 656 throws IOException { 657 // TODO: Why are we using a method in here as part of our normal region open where 658 // there is no splitting involved? Fix. St.Ack 01/20/2017. 659 Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); 660 try { 661 FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile); 662 return files != null ? files : new FileStatus[0]; 663 } catch (FileNotFoundException e) { 664 return new FileStatus[0]; 665 } 666 } 667 668 private static long getMaxSequenceId(FileStatus[] files) { 669 long maxSeqId = -1L; 670 for (FileStatus file : files) { 671 String fileName = file.getPath().getName(); 672 try { 673 maxSeqId = Math.max(maxSeqId, Long 674 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 675 } catch (NumberFormatException ex) { 676 LOG.warn("Invalid SeqId File Name={}", fileName); 677 } 678 } 679 return maxSeqId; 680 } 681 682 /** 683 * Get the max sequence id which is stored in the region directory. -1 if none. 684 */ 685 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 686 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 687 } 688 689 /** 690 * Create a file with name as region's max sequence id 691 */ 692 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 693 throws IOException { 694 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 695 long maxSeqId = getMaxSequenceId(files); 696 if (maxSeqId > newMaxSeqId) { 697 throw new IOException("The new max sequence id " + newMaxSeqId + 698 " is less than the old max sequence id " + maxSeqId); 699 } 700 // write a new seqId file 701 Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir), 702 newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 703 if (newMaxSeqId != maxSeqId) { 704 try { 705 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 706 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 707 } 708 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 709 maxSeqId); 710 } catch (FileAlreadyExistsException ignored) { 711 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 712 } 713 } 714 // remove old ones 715 for (FileStatus status : files) { 716 if (!newSeqIdFile.equals(status.getPath())) { 717 walFS.delete(status.getPath(), false); 718 } 719 } 720 } 721 722 /** 723 * This method will check 3 places for finding the max sequence id file. One is the expected 724 * place, another is the old place under the region directory, and the last one is the wrong one 725 * we introduced in HBASE-20734. See HBASE-22617 for more details. 726 * <p/> 727 * Notice that, you should always call this method instead of 728 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 729 * @deprecated Only for compatibility, will be removed in 4.0.0. 730 */ 731 @Deprecated 732 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 733 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 734 throws IOException { 735 FileSystem rootFs = rootFsSupplier.get(); 736 FileSystem walFs = walFsSupplier.get(); 737 Path regionWALDir = FSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 738 // This is the old place where we store max sequence id file 739 Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), region); 740 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 741 Path wrongRegionWALDir = 742 FSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 743 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 744 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 745 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 746 return maxSeqId; 747 } 748 749 /** 750 * Create a new {@link Reader} for reading logs to split. 751 * 752 * @param file 753 * @return A new Reader instance, caller should close 754 * @throws IOException 755 * @throws CorruptedLogFileException 756 */ 757 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) 758 throws IOException, CorruptedLogFileException { 759 Path path = file.getPath(); 760 long length = file.getLen(); 761 Reader in; 762 763 // Check for possibly empty file. With appends, currently Hadoop reports a 764 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 765 // HDFS-878 is committed. 766 if (length <= 0) { 767 LOG.warn("File {} might be still open, length is 0", path); 768 } 769 770 try { 771 FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); 772 try { 773 in = getReader(path, reporter); 774 } catch (EOFException e) { 775 if (length <= 0) { 776 // TODO should we ignore an empty, not-last log file if skip.errors 777 // is false? Either way, the caller should decide what to do. E.g. 778 // ignore if this is the last log in sequence. 779 // TODO is this scenario still possible if the log has been 780 // recovered (i.e. closed) 781 LOG.warn("Could not open {} for reading. File is empty", path, e); 782 } 783 // EOFException being ignored 784 return null; 785 } 786 } catch (IOException e) { 787 if (e instanceof FileNotFoundException) { 788 // A wal file may not exist anymore. Nothing can be recovered so move on 789 LOG.warn("File {} does not exist anymore", path, e); 790 return null; 791 } 792 if (!skipErrors || e instanceof InterruptedIOException) { 793 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 794 } 795 CorruptedLogFileException t = 796 new CorruptedLogFileException("skipErrors=true Could not open wal " + 797 path + " ignoring"); 798 t.initCause(e); 799 throw t; 800 } 801 return in; 802 } 803 804 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) 805 throws CorruptedLogFileException, IOException { 806 try { 807 return in.next(); 808 } catch (EOFException eof) { 809 // truncated files are expected if a RS crashes (see HBASE-2643) 810 LOG.info("EOF from wal {}. Continuing.", path); 811 return null; 812 } catch (IOException e) { 813 // If the IOE resulted from bad file format, 814 // then this problem is idempotent and retrying won't help 815 if (e.getCause() != null && 816 (e.getCause() instanceof ParseException || 817 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { 818 LOG.warn("Parse exception from wal {}. Continuing", path, e); 819 return null; 820 } 821 if (!skipErrors) { 822 throw e; 823 } 824 CorruptedLogFileException t = 825 new CorruptedLogFileException("skipErrors=true Ignoring exception" + 826 " while parsing wal " + path + ". Marking as corrupted"); 827 t.initCause(e); 828 throw t; 829 } 830 } 831 832 /** 833 * Create a new {@link Writer} for writing log splits. 834 * @return a new Writer instance, caller should close 835 */ 836 protected Writer createWriter(Path logfile) 837 throws IOException { 838 return walFactory.createRecoveredEditsWriter(walFS, logfile); 839 } 840 841 /** 842 * Create a new {@link Reader} for reading logs to split. 843 * @return new Reader instance, caller should close 844 */ 845 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { 846 return walFactory.createReader(walFS, curLogFile, reporter); 847 } 848 849 /** 850 * Get current open writers 851 */ 852 private int getNumOpenWriters() { 853 int result = 0; 854 if (this.outputSink != null) { 855 result += this.outputSink.getNumOpenWriters(); 856 } 857 return result; 858 } 859 860 /** 861 * Contains some methods to control WAL-entries producer / consumer interactions 862 */ 863 public static class PipelineController { 864 // If an exception is thrown by one of the other threads, it will be 865 // stored here. 866 AtomicReference<Throwable> thrown = new AtomicReference<>(); 867 868 // Wait/notify for when data has been produced by the writer thread, 869 // consumed by the reader thread, or an exception occurred 870 public final Object dataAvailable = new Object(); 871 872 void writerThreadError(Throwable t) { 873 thrown.compareAndSet(null, t); 874 } 875 876 /** 877 * Check for errors in the writer threads. If any is found, rethrow it. 878 */ 879 void checkForErrors() throws IOException { 880 Throwable thrown = this.thrown.get(); 881 if (thrown == null) return; 882 if (thrown instanceof IOException) { 883 throw new IOException(thrown); 884 } else { 885 throw new RuntimeException(thrown); 886 } 887 } 888 } 889 890 /** 891 * Class which accumulates edits and separates them into a buffer per region 892 * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses 893 * a predefined threshold. 894 * 895 * Writer threads then pull region-specific buffers from this class. 896 */ 897 public static class EntryBuffers { 898 PipelineController controller; 899 900 Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 901 902 /* Track which regions are currently in the middle of writing. We don't allow 903 an IO thread to pick up bytes from a region if we're already writing 904 data for that region in a different IO thread. */ 905 Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 906 907 long totalBuffered = 0; 908 long maxHeapUsage; 909 boolean splitWriterCreationBounded; 910 911 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 912 this(controller, maxHeapUsage, false); 913 } 914 915 public EntryBuffers(PipelineController controller, long maxHeapUsage, 916 boolean splitWriterCreationBounded){ 917 this.controller = controller; 918 this.maxHeapUsage = maxHeapUsage; 919 this.splitWriterCreationBounded = splitWriterCreationBounded; 920 } 921 922 /** 923 * Append a log entry into the corresponding region buffer. 924 * Blocks if the total heap usage has crossed the specified threshold. 925 * 926 * @throws InterruptedException 927 * @throws IOException 928 */ 929 public void appendEntry(Entry entry) throws InterruptedException, IOException { 930 WALKey key = entry.getKey(); 931 932 RegionEntryBuffer buffer; 933 long incrHeap; 934 synchronized (this) { 935 buffer = buffers.get(key.getEncodedRegionName()); 936 if (buffer == null) { 937 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 938 buffers.put(key.getEncodedRegionName(), buffer); 939 } 940 incrHeap= buffer.appendEntry(entry); 941 } 942 943 // If we crossed the chunk threshold, wait for more space to be available 944 synchronized (controller.dataAvailable) { 945 totalBuffered += incrHeap; 946 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 947 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 948 controller.dataAvailable.wait(2000); 949 } 950 controller.dataAvailable.notifyAll(); 951 } 952 controller.checkForErrors(); 953 } 954 955 /** 956 * @return RegionEntryBuffer a buffer of edits to be written. 957 */ 958 synchronized RegionEntryBuffer getChunkToWrite() { 959 // The core part of limiting opening writers is it doesn't return chunk only if the 960 // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each 961 // region during splitting. It will flush all the logs in the buffer after splitting 962 // through a threadpool, which means the number of writers it created is under control. 963 if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { 964 return null; 965 } 966 long biggestSize = 0; 967 byte[] biggestBufferKey = null; 968 969 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 970 long size = entry.getValue().heapSize(); 971 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 972 biggestSize = size; 973 biggestBufferKey = entry.getKey(); 974 } 975 } 976 if (biggestBufferKey == null) { 977 return null; 978 } 979 980 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 981 currentlyWriting.add(biggestBufferKey); 982 return buffer; 983 } 984 985 void doneWriting(RegionEntryBuffer buffer) { 986 synchronized (this) { 987 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 988 assert removed; 989 } 990 long size = buffer.heapSize(); 991 992 synchronized (controller.dataAvailable) { 993 totalBuffered -= size; 994 // We may unblock writers 995 controller.dataAvailable.notifyAll(); 996 } 997 } 998 999 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 1000 return currentlyWriting.contains(region); 1001 } 1002 1003 public void waitUntilDrained() { 1004 synchronized (controller.dataAvailable) { 1005 while (totalBuffered > 0) { 1006 try { 1007 controller.dataAvailable.wait(2000); 1008 } catch (InterruptedException e) { 1009 LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); 1010 Thread.interrupted(); 1011 break; 1012 } 1013 } 1014 } 1015 } 1016 } 1017 1018 /** 1019 * A buffer of some number of edits for a given region. 1020 * This accumulates edits and also provides a memory optimization in order to 1021 * share a single byte array instance for the table and region name. 1022 * Also tracks memory usage of the accumulated edits. 1023 */ 1024 public static class RegionEntryBuffer implements HeapSize { 1025 long heapInBuffer = 0; 1026 List<Entry> entryBuffer; 1027 TableName tableName; 1028 byte[] encodedRegionName; 1029 1030 RegionEntryBuffer(TableName tableName, byte[] region) { 1031 this.tableName = tableName; 1032 this.encodedRegionName = region; 1033 this.entryBuffer = new ArrayList<>(); 1034 } 1035 1036 long appendEntry(Entry entry) { 1037 internify(entry); 1038 entryBuffer.add(entry); 1039 long incrHeap = entry.getEdit().heapSize() + 1040 ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers 1041 0; // TODO linkedlist entry 1042 heapInBuffer += incrHeap; 1043 return incrHeap; 1044 } 1045 1046 private void internify(Entry entry) { 1047 WALKeyImpl k = entry.getKey(); 1048 k.internTableName(this.tableName); 1049 k.internEncodedRegionName(this.encodedRegionName); 1050 } 1051 1052 @Override 1053 public long heapSize() { 1054 return heapInBuffer; 1055 } 1056 1057 public byte[] getEncodedRegionName() { 1058 return encodedRegionName; 1059 } 1060 1061 public List<Entry> getEntryBuffer() { 1062 return entryBuffer; 1063 } 1064 1065 public TableName getTableName() { 1066 return tableName; 1067 } 1068 } 1069 1070 public static class WriterThread extends Thread { 1071 private volatile boolean shouldStop = false; 1072 private PipelineController controller; 1073 private EntryBuffers entryBuffers; 1074 private OutputSink outputSink = null; 1075 1076 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){ 1077 super(Thread.currentThread().getName() + "-Writer-" + i); 1078 this.controller = controller; 1079 this.entryBuffers = entryBuffers; 1080 outputSink = sink; 1081 } 1082 1083 @Override 1084 public void run() { 1085 try { 1086 doRun(); 1087 } catch (Throwable t) { 1088 LOG.error("Exiting thread", t); 1089 controller.writerThreadError(t); 1090 } 1091 } 1092 1093 private void doRun() throws IOException { 1094 LOG.trace("Writer thread starting"); 1095 while (true) { 1096 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 1097 if (buffer == null) { 1098 // No data currently available, wait on some more to show up 1099 synchronized (controller.dataAvailable) { 1100 if (shouldStop && !this.outputSink.flush()) { 1101 return; 1102 } 1103 try { 1104 controller.dataAvailable.wait(500); 1105 } catch (InterruptedException ie) { 1106 if (!shouldStop) { 1107 throw new RuntimeException(ie); 1108 } 1109 } 1110 } 1111 continue; 1112 } 1113 1114 assert buffer != null; 1115 try { 1116 writeBuffer(buffer); 1117 } finally { 1118 entryBuffers.doneWriting(buffer); 1119 } 1120 } 1121 } 1122 1123 private void writeBuffer(RegionEntryBuffer buffer) throws IOException { 1124 outputSink.append(buffer); 1125 } 1126 1127 void finish() { 1128 synchronized (controller.dataAvailable) { 1129 shouldStop = true; 1130 controller.dataAvailable.notifyAll(); 1131 } 1132 } 1133 } 1134 1135 /** 1136 * The following class is an abstraction class to provide a common interface to support 1137 * different ways of consuming recovered edits. 1138 */ 1139 public static abstract class OutputSink { 1140 1141 protected PipelineController controller; 1142 protected EntryBuffers entryBuffers; 1143 1144 protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>(); 1145 protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum = 1146 new ConcurrentHashMap<>(); 1147 1148 1149 protected final List<WriterThread> writerThreads = Lists.newArrayList(); 1150 1151 /* Set of regions which we've decided should not output edits */ 1152 protected final Set<byte[]> blacklistedRegions = Collections 1153 .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR)); 1154 1155 protected boolean closeAndCleanCompleted = false; 1156 1157 protected boolean writersClosed = false; 1158 1159 protected final int numThreads; 1160 1161 protected CancelableProgressable reporter = null; 1162 1163 protected AtomicLong skippedEdits = new AtomicLong(); 1164 1165 protected List<Path> splits = null; 1166 1167 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 1168 numThreads = numWriters; 1169 this.controller = controller; 1170 this.entryBuffers = entryBuffers; 1171 } 1172 1173 void setReporter(CancelableProgressable reporter) { 1174 this.reporter = reporter; 1175 } 1176 1177 /** 1178 * Start the threads that will pump data from the entryBuffers to the output files. 1179 */ 1180 public synchronized void startWriterThreads() { 1181 for (int i = 0; i < numThreads; i++) { 1182 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 1183 t.start(); 1184 writerThreads.add(t); 1185 } 1186 } 1187 1188 /** 1189 * 1190 * Update region's maximum edit log SeqNum. 1191 */ 1192 void updateRegionMaximumEditLogSeqNum(Entry entry) { 1193 synchronized (regionMaximumEditLogSeqNum) { 1194 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 1195 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 1196 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 1197 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 1198 } 1199 } 1200 } 1201 1202 /** 1203 * @return the number of currently opened writers 1204 */ 1205 int getNumOpenWriters() { 1206 return this.writers.size(); 1207 } 1208 1209 long getSkippedEdits() { 1210 return this.skippedEdits.get(); 1211 } 1212 1213 /** 1214 * Wait for writer threads to dump all info to the sink 1215 * @return true when there is no error 1216 * @throws IOException 1217 */ 1218 protected boolean finishWriting(boolean interrupt) throws IOException { 1219 LOG.debug("Waiting for split writer threads to finish"); 1220 boolean progress_failed = false; 1221 for (WriterThread t : writerThreads) { 1222 t.finish(); 1223 } 1224 if (interrupt) { 1225 for (WriterThread t : writerThreads) { 1226 t.interrupt(); // interrupt the writer threads. We are stopping now. 1227 } 1228 } 1229 1230 for (WriterThread t : writerThreads) { 1231 if (!progress_failed && reporter != null && !reporter.progress()) { 1232 progress_failed = true; 1233 } 1234 try { 1235 t.join(); 1236 } catch (InterruptedException ie) { 1237 IOException iie = new InterruptedIOException(); 1238 iie.initCause(ie); 1239 throw iie; 1240 } 1241 } 1242 controller.checkForErrors(); 1243 LOG.info("{} split writers finished; closing.", this.writerThreads.size()); 1244 return (!progress_failed); 1245 } 1246 1247 public abstract List<Path> finishWritingAndClose() throws IOException; 1248 1249 /** 1250 * @return a map from encoded region ID to the number of edits written out for that region. 1251 */ 1252 public abstract Map<byte[], Long> getOutputCounts(); 1253 1254 /** 1255 * @return number of regions we've recovered 1256 */ 1257 public abstract int getNumberOfRecoveredRegions(); 1258 1259 /** 1260 * @param buffer A WAL Edit Entry 1261 * @throws IOException 1262 */ 1263 public abstract void append(RegionEntryBuffer buffer) throws IOException; 1264 1265 /** 1266 * WriterThread call this function to help flush internal remaining edits in buffer before close 1267 * @return true when underlying sink has something to flush 1268 */ 1269 public boolean flush() throws IOException { 1270 return false; 1271 } 1272 1273 /** 1274 * Some WALEdit's contain only KV's for account on what happened to a region. 1275 * Not all sinks will want to get all of those edits. 1276 * 1277 * @return Return true if this sink wants to accept this region-level WALEdit. 1278 */ 1279 public abstract boolean keepRegionEvent(Entry entry); 1280 } 1281 1282 /** 1283 * Class that manages the output streams from the log splitting process. 1284 */ 1285 class LogRecoveredEditsOutputSink extends OutputSink { 1286 1287 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers, 1288 int numWriters) { 1289 // More threads could potentially write faster at the expense 1290 // of causing more disk seeks as the logs are split. 1291 // 3. After a certain setting (probably around 3) the 1292 // process will be bound on the reader in the current 1293 // implementation anyway. 1294 super(controller, entryBuffers, numWriters); 1295 } 1296 1297 /** 1298 * @return null if failed to report progress 1299 * @throws IOException 1300 */ 1301 @Override 1302 public List<Path> finishWritingAndClose() throws IOException { 1303 boolean isSuccessful = false; 1304 List<Path> result = null; 1305 try { 1306 isSuccessful = finishWriting(false); 1307 } finally { 1308 result = close(); 1309 List<IOException> thrown = closeLogWriters(null); 1310 if (CollectionUtils.isNotEmpty(thrown)) { 1311 throw MultipleIOException.createIOException(thrown); 1312 } 1313 } 1314 if (isSuccessful) { 1315 splits = result; 1316 } 1317 return splits; 1318 } 1319 1320 // delete the one with fewer wal entries 1321 private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) 1322 throws IOException { 1323 long dstMinLogSeqNum = -1L; 1324 try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { 1325 WAL.Entry entry = reader.next(); 1326 if (entry != null) { 1327 dstMinLogSeqNum = entry.getKey().getSequenceId(); 1328 } 1329 } catch (EOFException e) { 1330 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", 1331 dst, e); 1332 } 1333 if (wap.minLogSeqNum < dstMinLogSeqNum) { 1334 LOG.warn("Found existing old edits file. It could be the result of a previous failed" 1335 + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" 1336 + walFS.getFileStatus(dst).getLen()); 1337 if (!walFS.delete(dst, false)) { 1338 LOG.warn("Failed deleting of old {}", dst); 1339 throw new IOException("Failed deleting of old " + dst); 1340 } 1341 } else { 1342 LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p 1343 + ", length=" + walFS.getFileStatus(wap.p).getLen()); 1344 if (!walFS.delete(wap.p, false)) { 1345 LOG.warn("Failed deleting of {}", wap.p); 1346 throw new IOException("Failed deleting of " + wap.p); 1347 } 1348 } 1349 } 1350 1351 /** 1352 * Close all of the output streams. 1353 * @return the list of paths written. 1354 */ 1355 List<Path> close() throws IOException { 1356 Preconditions.checkState(!closeAndCleanCompleted); 1357 1358 final List<Path> paths = new ArrayList<>(); 1359 final List<IOException> thrown = Lists.newArrayList(); 1360 ThreadPoolExecutor closeThreadPool = Threads 1361 .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { 1362 private int count = 1; 1363 1364 @Override public Thread newThread(Runnable r) { 1365 Thread t = new Thread(r, "split-log-closeStream-" + count++); 1366 return t; 1367 } 1368 }); 1369 CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool); 1370 boolean progress_failed; 1371 try { 1372 progress_failed = executeCloseTask(completionService, thrown, paths); 1373 } catch (InterruptedException e) { 1374 IOException iie = new InterruptedIOException(); 1375 iie.initCause(e); 1376 throw iie; 1377 } catch (ExecutionException e) { 1378 throw new IOException(e.getCause()); 1379 } finally { 1380 closeThreadPool.shutdownNow(); 1381 } 1382 if (!thrown.isEmpty()) { 1383 throw MultipleIOException.createIOException(thrown); 1384 } 1385 writersClosed = true; 1386 closeAndCleanCompleted = true; 1387 if (progress_failed) { 1388 return null; 1389 } 1390 return paths; 1391 } 1392 1393 /** 1394 * @param completionService threadPool to execute the closing tasks 1395 * @param thrown store the exceptions 1396 * @param paths arrayList to store the paths written 1397 * @return if close tasks executed successful 1398 */ 1399 boolean executeCloseTask(CompletionService<Void> completionService, 1400 List<IOException> thrown, List<Path> paths) 1401 throws InterruptedException, ExecutionException { 1402 for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) { 1403 if (LOG.isTraceEnabled()) { 1404 LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p); 1405 } 1406 completionService.submit(new Callable<Void>() { 1407 @Override public Void call() throws Exception { 1408 WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); 1409 Path dst = closeWriter(writersEntry.getKey(), wap, thrown); 1410 paths.add(dst); 1411 return null; 1412 } 1413 }); 1414 } 1415 boolean progress_failed = false; 1416 for (int i = 0, n = this.writers.size(); i < n; i++) { 1417 Future<Void> future = completionService.take(); 1418 future.get(); 1419 if (!progress_failed && reporter != null && !reporter.progress()) { 1420 progress_failed = true; 1421 } 1422 } 1423 return progress_failed; 1424 } 1425 1426 Path closeWriter(String encodedRegionName, WriterAndPath wap, 1427 List<IOException> thrown) throws IOException{ 1428 LOG.trace("Closing " + wap.p); 1429 try { 1430 wap.w.close(); 1431 } catch (IOException ioe) { 1432 LOG.error("Couldn't close log at " + wap.p, ioe); 1433 thrown.add(ioe); 1434 return null; 1435 } 1436 if (LOG.isDebugEnabled()) { 1437 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten 1438 + " edits, skipped " + wap.editsSkipped + " edits in " 1439 + (wap.nanosSpent / 1000 / 1000) + "ms"); 1440 } 1441 if (wap.editsWritten == 0) { 1442 // just remove the empty recovered.edits file 1443 if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { 1444 LOG.warn("Failed deleting empty " + wap.p); 1445 throw new IOException("Failed deleting empty " + wap.p); 1446 } 1447 return null; 1448 } 1449 1450 Path dst = getCompletedRecoveredEditsFilePath(wap.p, 1451 regionMaximumEditLogSeqNum.get(encodedRegionName)); 1452 try { 1453 if (!dst.equals(wap.p) && walFS.exists(dst)) { 1454 deleteOneWithFewerEntries(wap, dst); 1455 } 1456 // Skip the unit tests which create a splitter that reads and 1457 // writes the data without touching disk. 1458 // TestHLogSplit#testThreading is an example. 1459 if (walFS.exists(wap.p)) { 1460 if (!walFS.rename(wap.p, dst)) { 1461 throw new IOException("Failed renaming " + wap.p + " to " + dst); 1462 } 1463 LOG.info("Rename " + wap.p + " to " + dst); 1464 } 1465 } catch (IOException ioe) { 1466 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); 1467 thrown.add(ioe); 1468 return null; 1469 } 1470 return dst; 1471 } 1472 1473 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException { 1474 if (writersClosed) { 1475 return thrown; 1476 } 1477 if (thrown == null) { 1478 thrown = Lists.newArrayList(); 1479 } 1480 try { 1481 for (WriterThread t : writerThreads) { 1482 while (t.isAlive()) { 1483 t.shouldStop = true; 1484 t.interrupt(); 1485 try { 1486 t.join(10); 1487 } catch (InterruptedException e) { 1488 IOException iie = new InterruptedIOException(); 1489 iie.initCause(e); 1490 throw iie; 1491 } 1492 } 1493 } 1494 } finally { 1495 WriterAndPath wap = null; 1496 for (SinkWriter tmpWAP : writers.values()) { 1497 try { 1498 wap = (WriterAndPath) tmpWAP; 1499 wap.w.close(); 1500 } catch (IOException ioe) { 1501 LOG.error("Couldn't close log at " + wap.p, ioe); 1502 thrown.add(ioe); 1503 continue; 1504 } 1505 LOG.info( 1506 "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent 1507 / 1000 / 1000) + "ms)"); 1508 } 1509 writersClosed = true; 1510 } 1511 1512 return thrown; 1513 } 1514 1515 /** 1516 * Get a writer and path for a log starting at the given entry. This function is threadsafe so 1517 * long as multiple threads are always acting on different regions. 1518 * @return null if this region shouldn't output any logs 1519 */ 1520 WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException { 1521 byte region[] = entry.getKey().getEncodedRegionName(); 1522 String regionName = Bytes.toString(region); 1523 WriterAndPath ret = (WriterAndPath) writers.get(regionName); 1524 if (ret != null) { 1525 return ret; 1526 } 1527 // If we already decided that this region doesn't get any output 1528 // we don't need to check again. 1529 if (blacklistedRegions.contains(region)) { 1530 return null; 1531 } 1532 ret = createWAP(region, entry); 1533 if (ret == null) { 1534 blacklistedRegions.add(region); 1535 return null; 1536 } 1537 if(reusable) { 1538 writers.put(regionName, ret); 1539 } 1540 return ret; 1541 } 1542 1543 /** 1544 * @return a path with a write for that path. caller should close. 1545 */ 1546 WriterAndPath createWAP(byte[] region, Entry entry) throws IOException { 1547 Path regionedits = getRegionSplitEditsPath(entry, 1548 fileBeingSplit.getPath().getName(), conf); 1549 if (regionedits == null) { 1550 return null; 1551 } 1552 FileSystem walFs = FSUtils.getWALFileSystem(conf); 1553 if (walFs.exists(regionedits)) { 1554 LOG.warn("Found old edits file. It could be the " 1555 + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" 1556 + walFs.getFileStatus(regionedits).getLen()); 1557 if (!walFs.delete(regionedits, false)) { 1558 LOG.warn("Failed delete of old {}", regionedits); 1559 } 1560 } 1561 Writer w = createWriter(regionedits); 1562 LOG.debug("Creating writer path={}", regionedits); 1563 return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); 1564 } 1565 1566 void filterCellByStore(Entry logEntry) { 1567 Map<byte[], Long> maxSeqIdInStores = 1568 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 1569 if (MapUtils.isEmpty(maxSeqIdInStores)) { 1570 return; 1571 } 1572 // Create the array list for the cells that aren't filtered. 1573 // We make the assumption that most cells will be kept. 1574 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 1575 for (Cell cell : logEntry.getEdit().getCells()) { 1576 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { 1577 keptCells.add(cell); 1578 } else { 1579 byte[] family = CellUtil.cloneFamily(cell); 1580 Long maxSeqId = maxSeqIdInStores.get(family); 1581 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 1582 // or the master was crashed before and we can not get the information. 1583 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 1584 keptCells.add(cell); 1585 } 1586 } 1587 } 1588 1589 // Anything in the keptCells array list is still live. 1590 // So rather than removing the cells from the array list 1591 // which would be an O(n^2) operation, we just replace the list 1592 logEntry.getEdit().setCells(keptCells); 1593 } 1594 1595 @Override 1596 public void append(RegionEntryBuffer buffer) throws IOException { 1597 appendBuffer(buffer, true); 1598 } 1599 1600 WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ 1601 List<Entry> entries = buffer.entryBuffer; 1602 if (entries.isEmpty()) { 1603 LOG.warn("got an empty buffer, skipping"); 1604 return null; 1605 } 1606 1607 WriterAndPath wap = null; 1608 1609 long startTime = System.nanoTime(); 1610 try { 1611 int editsCount = 0; 1612 1613 for (Entry logEntry : entries) { 1614 if (wap == null) { 1615 wap = getWriterAndPath(logEntry, reusable); 1616 if (wap == null) { 1617 if (LOG.isTraceEnabled()) { 1618 // This log spews the full edit. Can be massive in the log. Enable only debugging 1619 // WAL lost edit issues. 1620 LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); 1621 } 1622 return null; 1623 } 1624 } 1625 filterCellByStore(logEntry); 1626 if (!logEntry.getEdit().isEmpty()) { 1627 wap.w.append(logEntry); 1628 this.updateRegionMaximumEditLogSeqNum(logEntry); 1629 editsCount++; 1630 } else { 1631 wap.incrementSkippedEdits(1); 1632 } 1633 } 1634 // Pass along summary statistics 1635 wap.incrementEdits(editsCount); 1636 wap.incrementNanoTime(System.nanoTime() - startTime); 1637 } catch (IOException e) { 1638 e = e instanceof RemoteException ? 1639 ((RemoteException)e).unwrapRemoteException() : e; 1640 LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); 1641 throw e; 1642 } 1643 return wap; 1644 } 1645 1646 @Override 1647 public boolean keepRegionEvent(Entry entry) { 1648 ArrayList<Cell> cells = entry.getEdit().getCells(); 1649 for (Cell cell : cells) { 1650 if (WALEdit.isCompactionMarker(cell)) { 1651 return true; 1652 } 1653 } 1654 return false; 1655 } 1656 1657 /** 1658 * @return a map from encoded region ID to the number of edits written out for that region. 1659 */ 1660 @Override 1661 public Map<byte[], Long> getOutputCounts() { 1662 TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1663 for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) { 1664 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); 1665 } 1666 return ret; 1667 } 1668 1669 @Override 1670 public int getNumberOfRecoveredRegions() { 1671 return writers.size(); 1672 } 1673 } 1674 1675 /** 1676 * 1677 */ 1678 class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { 1679 1680 private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>(); 1681 1682 public BoundedLogWriterCreationOutputSink(PipelineController controller, 1683 EntryBuffers entryBuffers, int numWriters) { 1684 super(controller, entryBuffers, numWriters); 1685 } 1686 1687 @Override 1688 public List<Path> finishWritingAndClose() throws IOException { 1689 boolean isSuccessful; 1690 List<Path> result; 1691 try { 1692 isSuccessful = finishWriting(false); 1693 } finally { 1694 result = close(); 1695 } 1696 if (isSuccessful) { 1697 splits = result; 1698 } 1699 return splits; 1700 } 1701 1702 @Override 1703 boolean executeCloseTask(CompletionService<Void> completionService, 1704 List<IOException> thrown, List<Path> paths) 1705 throws InterruptedException, ExecutionException { 1706 for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) { 1707 LOG.info("Submitting writeThenClose of {}", 1708 Arrays.toString(buffer.getValue().encodedRegionName)); 1709 completionService.submit(new Callable<Void>() { 1710 @Override 1711 public Void call() throws Exception { 1712 Path dst = writeThenClose(buffer.getValue()); 1713 paths.add(dst); 1714 return null; 1715 } 1716 }); 1717 } 1718 boolean progress_failed = false; 1719 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 1720 Future<Void> future = completionService.take(); 1721 future.get(); 1722 if (!progress_failed && reporter != null && !reporter.progress()) { 1723 progress_failed = true; 1724 } 1725 } 1726 1727 return progress_failed; 1728 } 1729 1730 /** 1731 * since the splitting process may create multiple output files, we need a map 1732 * regionRecoverStatMap to track the output count of each region. 1733 * @return a map from encoded region ID to the number of edits written out for that region. 1734 */ 1735 @Override 1736 public Map<byte[], Long> getOutputCounts() { 1737 Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>(); 1738 for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){ 1739 regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); 1740 } 1741 return regionRecoverStatMapResult; 1742 } 1743 1744 /** 1745 * @return the number of recovered regions 1746 */ 1747 @Override 1748 public int getNumberOfRecoveredRegions() { 1749 return regionRecoverStatMap.size(); 1750 } 1751 1752 /** 1753 * Append the buffer to a new recovered edits file, then close it after all done 1754 * @param buffer contain all entries of a certain region 1755 * @throws IOException when closeWriter failed 1756 */ 1757 @Override 1758 public void append(RegionEntryBuffer buffer) throws IOException { 1759 writeThenClose(buffer); 1760 } 1761 1762 private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { 1763 WriterAndPath wap = appendBuffer(buffer, false); 1764 if(wap != null) { 1765 String encodedRegionName = Bytes.toString(buffer.encodedRegionName); 1766 Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); 1767 if (value != null) { 1768 Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; 1769 regionRecoverStatMap.put(encodedRegionName, newValue); 1770 } 1771 } 1772 1773 Path dst = null; 1774 List<IOException> thrown = new ArrayList<>(); 1775 if(wap != null){ 1776 dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); 1777 } 1778 if (!thrown.isEmpty()) { 1779 throw MultipleIOException.createIOException(thrown); 1780 } 1781 return dst; 1782 } 1783 } 1784 1785 /** 1786 * Class wraps the actual writer which writes data out and related statistics 1787 */ 1788 public abstract static class SinkWriter { 1789 /* Count of edits written to this path */ 1790 long editsWritten = 0; 1791 /* Count of edits skipped to this path */ 1792 long editsSkipped = 0; 1793 /* Number of nanos spent writing to this log */ 1794 long nanosSpent = 0; 1795 1796 void incrementEdits(int edits) { 1797 editsWritten += edits; 1798 } 1799 1800 void incrementSkippedEdits(int skipped) { 1801 editsSkipped += skipped; 1802 } 1803 1804 void incrementNanoTime(long nanos) { 1805 nanosSpent += nanos; 1806 } 1807 } 1808 1809 /** 1810 * Private data structure that wraps a Writer and its Path, also collecting statistics about the 1811 * data written to this output. 1812 */ 1813 private final static class WriterAndPath extends SinkWriter { 1814 final Path p; 1815 final Writer w; 1816 final long minLogSeqNum; 1817 1818 WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) { 1819 this.p = p; 1820 this.w = w; 1821 this.minLogSeqNum = minLogSeqNum; 1822 } 1823 } 1824 1825 static class CorruptedLogFileException extends Exception { 1826 private static final long serialVersionUID = 1L; 1827 1828 CorruptedLogFileException(String s) { 1829 super(s); 1830 } 1831 } 1832 1833 /** A struct used by getMutationsFromWALEntry */ 1834 public static class MutationReplay implements Comparable<MutationReplay> { 1835 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { 1836 this.type = type; 1837 this.mutation = mutation; 1838 if(this.mutation.getDurability() != Durability.SKIP_WAL) { 1839 // using ASYNC_WAL for relay 1840 this.mutation.setDurability(Durability.ASYNC_WAL); 1841 } 1842 this.nonceGroup = nonceGroup; 1843 this.nonce = nonce; 1844 } 1845 1846 public final MutationType type; 1847 public final Mutation mutation; 1848 public final long nonceGroup; 1849 public final long nonce; 1850 1851 @Override 1852 public int compareTo(final MutationReplay d) { 1853 return this.mutation.compareTo(d.mutation); 1854 } 1855 1856 @Override 1857 public boolean equals(Object obj) { 1858 if(!(obj instanceof MutationReplay)) { 1859 return false; 1860 } else { 1861 return this.compareTo((MutationReplay)obj) == 0; 1862 } 1863 } 1864 1865 @Override 1866 public int hashCode() { 1867 return this.mutation.hashCode(); 1868 } 1869 } 1870 1871 /** 1872 * This function is used to construct mutations from a WALEntry. It also 1873 * reconstructs WALKey & WALEdit from the passed in WALEntry 1874 * @param entry 1875 * @param cells 1876 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 1877 * extracted from the passed in WALEntry. 1878 * @return list of Pair<MutationType, Mutation> to be replayed 1879 * @throws IOException 1880 */ 1881 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells, 1882 Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException { 1883 if (entry == null) { 1884 // return an empty array 1885 return Collections.emptyList(); 1886 } 1887 1888 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 1889 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 1890 int count = entry.getAssociatedCellCount(); 1891 List<MutationReplay> mutations = new ArrayList<>(); 1892 Cell previousCell = null; 1893 Mutation m = null; 1894 WALKeyImpl key = null; 1895 WALEdit val = null; 1896 if (logEntry != null) { 1897 val = new WALEdit(); 1898 } 1899 1900 for (int i = 0; i < count; i++) { 1901 // Throw index out of bounds if our cell count is off 1902 if (!cells.advance()) { 1903 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 1904 } 1905 Cell cell = cells.current(); 1906 if (val != null) val.add(cell); 1907 1908 boolean isNewRowOrType = 1909 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 1910 || !CellUtil.matchingRows(previousCell, cell); 1911 if (isNewRowOrType) { 1912 // Create new mutation 1913 if (CellUtil.isDelete(cell)) { 1914 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 1915 // Deletes don't have nonces. 1916 mutations.add(new MutationReplay( 1917 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE)); 1918 } else { 1919 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 1920 // Puts might come from increment or append, thus we need nonces. 1921 long nonceGroup = entry.getKey().hasNonceGroup() 1922 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 1923 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 1924 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce)); 1925 } 1926 } 1927 if (CellUtil.isDelete(cell)) { 1928 ((Delete) m).add(cell); 1929 } else { 1930 ((Put) m).add(cell); 1931 } 1932 m.setDurability(durability); 1933 previousCell = cell; 1934 } 1935 1936 // reconstruct WALKey 1937 if (logEntry != null) { 1938 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 1939 entry.getKey(); 1940 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 1941 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 1942 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 1943 } 1944 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( 1945 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), 1946 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null); 1947 logEntry.setFirst(key); 1948 logEntry.setSecond(val); 1949 } 1950 1951 return mutations; 1952 } 1953}