001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.EOFException; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.text.ParseException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031import java.util.NavigableSet; 032import java.util.Set; 033import java.util.TreeMap; 034import java.util.TreeSet; 035import java.util.UUID; 036import java.util.concurrent.Callable; 037import java.util.concurrent.CompletionService; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ExecutorCompletionService; 041import java.util.concurrent.Future; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.ThreadPoolExecutor; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.atomic.AtomicLong; 046import java.util.concurrent.atomic.AtomicReference; 047import java.util.regex.Matcher; 048import java.util.regex.Pattern; 049import org.apache.commons.lang3.ArrayUtils; 050import org.apache.hadoop.conf.Configuration; 051import org.apache.hadoop.fs.FileAlreadyExistsException; 052import org.apache.hadoop.fs.FileStatus; 053import org.apache.hadoop.fs.FileSystem; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.PathFilter; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellScanner; 058import org.apache.hadoop.hbase.CellUtil; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HConstants; 061import org.apache.hadoop.hbase.TableName; 062import org.apache.hadoop.hbase.client.Delete; 063import org.apache.hadoop.hbase.client.Durability; 064import org.apache.hadoop.hbase.client.Mutation; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.RegionInfo; 067import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 068import org.apache.hadoop.hbase.io.HeapSize; 069import org.apache.hadoop.hbase.log.HBaseMarkers; 070import org.apache.hadoop.hbase.master.SplitLogManager; 071import org.apache.hadoop.hbase.monitoring.MonitoredTask; 072import org.apache.hadoop.hbase.monitoring.TaskMonitor; 073import org.apache.hadoop.hbase.regionserver.HRegion; 074import org.apache.hadoop.hbase.procedure2.util.StringUtils; 075import org.apache.hadoop.hbase.regionserver.LastSequenceId; 076import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 077import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 078import org.apache.hadoop.hbase.util.Bytes; 079import org.apache.hadoop.hbase.util.CancelableProgressable; 080import org.apache.hadoop.hbase.util.ClassSize; 081import org.apache.hadoop.hbase.util.CollectionUtils.IOExceptionSupplier; 082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 083import org.apache.hadoop.hbase.util.FSUtils; 084import org.apache.hadoop.hbase.util.Pair; 085import org.apache.hadoop.hbase.util.Threads; 086import org.apache.hadoop.hbase.wal.WAL.Entry; 087import org.apache.hadoop.hbase.wal.WAL.Reader; 088import org.apache.hadoop.hbase.wal.WALProvider.Writer; 089import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; 090import org.apache.hadoop.io.MultipleIOException; 091import org.apache.hadoop.ipc.RemoteException; 092import org.apache.yetus.audience.InterfaceAudience; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; 095 096import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 097import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 098import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 099import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 100import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 101import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 102 103import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 108/** 109 * This class is responsible for splitting up a bunch of regionserver commit log 110 * files that are no longer being written to, into new files, one per region, for 111 * recovering data on startup. Delete the old log files when finished. 112 */ 113@InterfaceAudience.Private 114public class WALSplitter { 115 private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class); 116 117 /** By default we retry errors in splitting, rather than skipping. */ 118 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; 119 120 // Parameters for split process 121 protected final Path walDir; 122 protected final FileSystem walFS; 123 protected final Configuration conf; 124 125 // Major subcomponents of the split process. 126 // These are separated into inner classes to make testing easier. 127 OutputSink outputSink; 128 private EntryBuffers entryBuffers; 129 130 private SplitLogWorkerCoordination splitLogWorkerCoordination; 131 private final WALFactory walFactory; 132 133 private MonitoredTask status; 134 135 // For checking the latest flushed sequence id 136 protected final LastSequenceId sequenceIdChecker; 137 138 // Map encodedRegionName -> lastFlushedSequenceId 139 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<>(); 140 141 // Map encodedRegionName -> maxSeqIdInStores 142 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); 143 144 // the file being split currently 145 private FileStatus fileBeingSplit; 146 147 // if we limit the number of writers opened for sinking recovered edits 148 private final boolean splitWriterCreationBounded; 149 150 public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; 151 152 153 @VisibleForTesting 154 WALSplitter(final WALFactory factory, Configuration conf, Path walDir, 155 FileSystem walFS, LastSequenceId idChecker, 156 SplitLogWorkerCoordination splitLogWorkerCoordination) { 157 this.conf = HBaseConfiguration.create(conf); 158 String codecClassName = conf 159 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); 160 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); 161 this.walDir = walDir; 162 this.walFS = walFS; 163 this.sequenceIdChecker = idChecker; 164 this.splitLogWorkerCoordination = splitLogWorkerCoordination; 165 166 this.walFactory = factory; 167 PipelineController controller = new PipelineController(); 168 169 this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); 170 171 entryBuffers = new EntryBuffers(controller, 172 this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), 173 splitWriterCreationBounded); 174 175 int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); 176 if(splitWriterCreationBounded){ 177 outputSink = new BoundedLogWriterCreationOutputSink( 178 controller, entryBuffers, numWriterThreads); 179 }else { 180 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); 181 } 182 } 183 184 /** 185 * Splits a WAL file into region's recovered-edits directory. 186 * This is the main entry point for distributed log splitting from SplitLogWorker. 187 * <p> 188 * If the log file has N regions then N recovered.edits files will be produced. 189 * <p> 190 * @return false if it is interrupted by the progress-able. 191 */ 192 public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, 193 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, 194 SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) 195 throws IOException { 196 WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, 197 splitLogWorkerCoordination); 198 return s.splitLogFile(logfile, reporter); 199 } 200 201 // A wrapper to split one log folder using the method used by distributed 202 // log splitting. Used by tools and unit tests. It should be package private. 203 // It is public only because TestWALObserver is in a different package, 204 // which uses this method to do log splitting. 205 @VisibleForTesting 206 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, 207 FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { 208 final FileStatus[] logfiles = SplitLogManager.getFileList(conf, 209 Collections.singletonList(logDir), null); 210 List<Path> splits = new ArrayList<>(); 211 if (ArrayUtils.isNotEmpty(logfiles)) { 212 for (FileStatus logfile: logfiles) { 213 WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); 214 if (s.splitLogFile(logfile, null)) { 215 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); 216 if (s.outputSink.splits != null) { 217 splits.addAll(s.outputSink.splits); 218 } 219 } 220 } 221 } 222 if (!walFS.delete(logDir, true)) { 223 throw new IOException("Unable to delete src dir: " + logDir); 224 } 225 return splits; 226 } 227 228 /** 229 * log splitting implementation, splits one log file. 230 * @param logfile should be an actual log file. 231 */ 232 @VisibleForTesting 233 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { 234 Preconditions.checkState(status == null); 235 Preconditions.checkArgument(logfile.isFile(), 236 "passed in file status is for something other than a regular file."); 237 boolean isCorrupted = false; 238 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", 239 SPLIT_SKIP_ERRORS_DEFAULT); 240 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); 241 Path logPath = logfile.getPath(); 242 boolean outputSinkStarted = false; 243 boolean progress_failed = false; 244 int editsCount = 0; 245 int editsSkipped = 0; 246 247 status = TaskMonitor.get().createStatus( 248 "Splitting log file " + logfile.getPath() + "into a temporary staging area."); 249 Reader logFileReader = null; 250 this.fileBeingSplit = logfile; 251 long startTS = EnvironmentEdgeManager.currentTime(); 252 try { 253 long logLength = logfile.getLen(); 254 LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength), 255 logLength); 256 status.setStatus("Opening log file"); 257 if (reporter != null && !reporter.progress()) { 258 progress_failed = true; 259 return false; 260 } 261 logFileReader = getReader(logfile, skipErrors, reporter); 262 if (logFileReader == null) { 263 LOG.warn("Nothing to split in WAL={}", logPath); 264 return true; 265 } 266 long openCost = EnvironmentEdgeManager.currentTime() - startTS; 267 LOG.info("Open WAL={} cost {} ms", logPath, openCost); 268 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); 269 int numOpenedFilesLastCheck = 0; 270 outputSink.setReporter(reporter); 271 outputSink.startWriterThreads(); 272 outputSinkStarted = true; 273 Entry entry; 274 Long lastFlushedSequenceId = -1L; 275 startTS = EnvironmentEdgeManager.currentTime(); 276 while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { 277 byte[] region = entry.getKey().getEncodedRegionName(); 278 String encodedRegionNameAsStr = Bytes.toString(region); 279 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); 280 if (lastFlushedSequenceId == null) { 281 if (sequenceIdChecker != null) { 282 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); 283 Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); 284 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { 285 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), 286 storeSeqId.getSequenceId()); 287 } 288 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); 289 lastFlushedSequenceId = ids.getLastFlushedSequenceId(); 290 if (LOG.isDebugEnabled()) { 291 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + 292 TextFormat.shortDebugString(ids)); 293 } 294 } 295 if (lastFlushedSequenceId == null) { 296 lastFlushedSequenceId = -1L; 297 } 298 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); 299 } 300 if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { 301 editsSkipped++; 302 continue; 303 } 304 // Don't send Compaction/Close/Open region events to recovered edit type sinks. 305 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvent(entry)) { 306 editsSkipped++; 307 continue; 308 } 309 entryBuffers.appendEntry(entry); 310 editsCount++; 311 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck; 312 // If sufficient edits have passed, check if we should report progress. 313 if (editsCount % interval == 0 314 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { 315 numOpenedFilesLastCheck = this.getNumOpenWriters(); 316 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits())) 317 + " edits, skipped " + editsSkipped + " edits."; 318 status.setStatus("Split " + countsStr); 319 if (reporter != null && !reporter.progress()) { 320 progress_failed = true; 321 return false; 322 } 323 } 324 } 325 } catch (InterruptedException ie) { 326 IOException iie = new InterruptedIOException(); 327 iie.initCause(ie); 328 throw iie; 329 } catch (CorruptedLogFileException e) { 330 LOG.warn("Could not parse, corrupted WAL={}", logPath, e); 331 if (splitLogWorkerCoordination != null) { 332 // Some tests pass in a csm of null. 333 splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); 334 } else { 335 // for tests only 336 ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); 337 } 338 isCorrupted = true; 339 } catch (IOException e) { 340 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 341 throw e; 342 } finally { 343 LOG.debug("Finishing writing output logs and closing down"); 344 try { 345 if (null != logFileReader) { 346 logFileReader.close(); 347 } 348 } catch (IOException exception) { 349 LOG.warn("Could not close WAL reader", exception); 350 } 351 try { 352 if (outputSinkStarted) { 353 // Set progress_failed to true as the immediate following statement will reset its value 354 // when finishWritingAndClose() throws exception, progress_failed has the right value 355 progress_failed = true; 356 progress_failed = outputSink.finishWritingAndClose() == null; 357 } 358 } finally { 359 long processCost = EnvironmentEdgeManager.currentTime() - startTS; 360 // See if length got updated post lease recovery 361 String msg = "Processed " + editsCount + " edits across " + 362 outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost + 363 " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" + 364 StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() + 365 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; 366 LOG.info(msg); 367 status.markComplete(msg); 368 } 369 } 370 return !progress_failed; 371 } 372 373 /** 374 * Completes the work done by splitLogFile by archiving logs 375 * <p> 376 * It is invoked by SplitLogManager once it knows that one of the 377 * SplitLogWorkers have completed the splitLogFile() part. If the master 378 * crashes then this function might get called multiple times. 379 * <p> 380 * @param logfile 381 * @param conf 382 * @throws IOException 383 */ 384 public static void finishSplitLogFile(String logfile, 385 Configuration conf) throws IOException { 386 Path walDir = FSUtils.getWALRootDir(conf); 387 Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); 388 Path logPath; 389 if (FSUtils.isStartingWithPath(walDir, logfile)) { 390 logPath = new Path(logfile); 391 } else { 392 logPath = new Path(walDir, logfile); 393 } 394 finishSplitLogFile(walDir, oldLogDir, logPath, conf); 395 } 396 397 private static void finishSplitLogFile(Path walDir, Path oldLogDir, 398 Path logPath, Configuration conf) throws IOException { 399 List<Path> processedLogs = new ArrayList<>(); 400 List<Path> corruptedLogs = new ArrayList<>(); 401 FileSystem walFS = walDir.getFileSystem(conf); 402 if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { 403 corruptedLogs.add(logPath); 404 } else { 405 processedLogs.add(logPath); 406 } 407 archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); 408 Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); 409 walFS.delete(stagingDir, true); 410 } 411 412 /** 413 * Moves processed logs to a oldLogDir after successful processing Moves 414 * corrupted logs (any log that couldn't be successfully parsed to corruptDir 415 * (.corrupt) for later investigation 416 * 417 * @param corruptedLogs 418 * @param processedLogs 419 * @param oldLogDir 420 * @param walFS WAL FileSystem to archive files on. 421 * @param conf 422 * @throws IOException 423 */ 424 private static void archiveLogs( 425 final List<Path> corruptedLogs, 426 final List<Path> processedLogs, final Path oldLogDir, 427 final FileSystem walFS, final Configuration conf) throws IOException { 428 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); 429 if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { 430 LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", 431 corruptDir); 432 } 433 if (!walFS.mkdirs(corruptDir)) { 434 LOG.info("Unable to mkdir {}", corruptDir); 435 } 436 walFS.mkdirs(oldLogDir); 437 438 // this method can get restarted or called multiple times for archiving 439 // the same log files. 440 for (Path corrupted : corruptedLogs) { 441 Path p = new Path(corruptDir, corrupted.getName()); 442 if (walFS.exists(corrupted)) { 443 if (!walFS.rename(corrupted, p)) { 444 LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); 445 } else { 446 LOG.warn("Moved corrupted log {} to {}", corrupted, p); 447 } 448 } 449 } 450 451 for (Path p : processedLogs) { 452 Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); 453 if (walFS.exists(p)) { 454 if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { 455 LOG.warn("Unable to move {} to {}", p, newPath); 456 } else { 457 LOG.info("Archived processed log {} to {}", p, newPath); 458 } 459 } 460 } 461 } 462 463 /** 464 * Path to a file under RECOVERED_EDITS_DIR directory of the region found in 465 * <code>logEntry</code> named for the sequenceid in the passed 466 * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332. 467 * This method also ensures existence of RECOVERED_EDITS_DIR under the region 468 * creating it if necessary. 469 * @param logEntry 470 * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. 471 * @param tmpDirName of the directory used to sideline old recovered edits file 472 * @param conf 473 * @return Path to file into which to dump split log edits. 474 * @throws IOException 475 */ 476 @SuppressWarnings("deprecation") 477 @VisibleForTesting 478 static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, 479 String tmpDirName, Configuration conf) throws IOException { 480 FileSystem walFS = FSUtils.getWALFileSystem(conf); 481 Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName()); 482 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); 483 Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); 484 Path dir = getRegionDirRecoveredEditsDir(regionDir); 485 486 487 if (walFS.exists(dir) && walFS.isFile(dir)) { 488 Path tmp = new Path(tmpDirName); 489 if (!walFS.exists(tmp)) { 490 walFS.mkdirs(tmp); 491 } 492 tmp = new Path(tmp, 493 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); 494 LOG.warn("Found existing old file: {}. It could be some " 495 + "leftover of an old installation. It should be a folder instead. " 496 + "So moving it to {}", dir, tmp); 497 if (!walFS.rename(dir, tmp)) { 498 LOG.warn("Failed to sideline old file {}", dir); 499 } 500 } 501 502 if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { 503 LOG.warn("mkdir failed on {}", dir); 504 } 505 // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. 506 // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure 507 // region's replayRecoveredEdits will not delete it 508 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId()); 509 fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); 510 return new Path(dir, fileName); 511 } 512 513 private static String getTmpRecoveredEditsFileName(String fileName) { 514 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; 515 } 516 517 /** 518 * Get the completed recovered edits file path, renaming it to be by last edit 519 * in the file from its first edit. Then we could use the name to skip 520 * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}. 521 * @param srcPath 522 * @param maximumEditLogSeqNum 523 * @return dstPath take file's last edit log seq num as the name 524 */ 525 private static Path getCompletedRecoveredEditsFilePath(Path srcPath, 526 long maximumEditLogSeqNum) { 527 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); 528 return new Path(srcPath.getParent(), fileName); 529 } 530 531 @VisibleForTesting 532 static String formatRecoveredEditsFileName(final long seqid) { 533 return String.format("%019d", seqid); 534 } 535 536 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); 537 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; 538 539 /** 540 * @param regionDir 541 * This regions directory in the filesystem. 542 * @return The directory that holds recovered edits files for the region 543 * <code>regionDir</code> 544 */ 545 public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { 546 return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); 547 } 548 549 /** 550 * Check whether there is recovered.edits in the region dir 551 * @param conf conf 552 * @param regionInfo the region to check 553 * @throws IOException IOException 554 * @return true if recovered.edits exist in the region dir 555 */ 556 public static boolean hasRecoveredEdits(final Configuration conf, 557 final RegionInfo regionInfo) throws IOException { 558 // No recovered.edits for non default replica regions 559 if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { 560 return false; 561 } 562 // Only default replica region can reach here, so we can use regioninfo 563 // directly without converting it to default replica's regioninfo. 564 Path regionWALDir = 565 FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 566 Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), regionInfo); 567 Path wrongRegionWALDir = 568 FSUtils.getWrongWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); 569 FileSystem walFs = FSUtils.getWALFileSystem(conf); 570 FileSystem rootFs = FSUtils.getRootDirFileSystem(conf); 571 NavigableSet<Path> files = getSplitEditFilesSorted(walFs, regionWALDir); 572 if (!files.isEmpty()) { 573 return true; 574 } 575 files = getSplitEditFilesSorted(rootFs, regionDir); 576 if (!files.isEmpty()) { 577 return true; 578 } 579 files = getSplitEditFilesSorted(walFs, wrongRegionWALDir); 580 return !files.isEmpty(); 581 } 582 583 584 /** 585 * Returns sorted set of edit files made by splitter, excluding files 586 * with '.temp' suffix. 587 * 588 * @param walFS WAL FileSystem used to retrieving split edits files. 589 * @param regionDir WAL region dir to look for recovered edits files under. 590 * @return Files in passed <code>regionDir</code> as a sorted set. 591 * @throws IOException 592 */ 593 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS, 594 final Path regionDir) throws IOException { 595 NavigableSet<Path> filesSorted = new TreeSet<>(); 596 Path editsdir = getRegionDirRecoveredEditsDir(regionDir); 597 if (!walFS.exists(editsdir)) { 598 return filesSorted; 599 } 600 FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { 601 @Override 602 public boolean accept(Path p) { 603 boolean result = false; 604 try { 605 // Return files and only files that match the editfile names pattern. 606 // There can be other files in this directory other than edit files. 607 // In particular, on error, we'll move aside the bad edit file giving 608 // it a timestamp suffix. See moveAsideBadEditsFile. 609 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); 610 result = walFS.isFile(p) && m.matches(); 611 // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, 612 // because it means splitwal thread is writting this file. 613 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { 614 result = false; 615 } 616 // Skip SeqId Files 617 if (isSequenceIdFile(p)) { 618 result = false; 619 } 620 } catch (IOException e) { 621 LOG.warn("Failed isFile check on {}", p, e); 622 } 623 return result; 624 } 625 }); 626 if (ArrayUtils.isNotEmpty(files)) { 627 Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); 628 } 629 return filesSorted; 630 } 631 632 /** 633 * Move aside a bad edits file. 634 * 635 * @param walFS WAL FileSystem used to rename bad edits file. 636 * @param edits 637 * Edits file to move aside. 638 * @return The name of the moved aside file. 639 * @throws IOException 640 */ 641 public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) 642 throws IOException { 643 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." 644 + System.currentTimeMillis()); 645 if (!walFS.rename(edits, moveAsideName)) { 646 LOG.warn("Rename failed from {} to {}", edits, moveAsideName); 647 } 648 return moveAsideName; 649 } 650 651 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; 652 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; 653 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); 654 655 /** 656 * Is the given file a region open sequence id file. 657 */ 658 @VisibleForTesting 659 public static boolean isSequenceIdFile(final Path file) { 660 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) 661 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); 662 } 663 664 private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) 665 throws IOException { 666 // TODO: Why are we using a method in here as part of our normal region open where 667 // there is no splitting involved? Fix. St.Ack 01/20/2017. 668 Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); 669 try { 670 FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile); 671 return files != null ? files : new FileStatus[0]; 672 } catch (FileNotFoundException e) { 673 return new FileStatus[0]; 674 } 675 } 676 677 private static long getMaxSequenceId(FileStatus[] files) { 678 long maxSeqId = -1L; 679 for (FileStatus file : files) { 680 String fileName = file.getPath().getName(); 681 try { 682 maxSeqId = Math.max(maxSeqId, Long 683 .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); 684 } catch (NumberFormatException ex) { 685 LOG.warn("Invalid SeqId File Name={}", fileName); 686 } 687 } 688 return maxSeqId; 689 } 690 691 /** 692 * Get the max sequence id which is stored in the region directory. -1 if none. 693 */ 694 public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { 695 return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); 696 } 697 698 /** 699 * Create a file with name as region's max sequence id 700 */ 701 public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) 702 throws IOException { 703 FileStatus[] files = getSequenceIdFiles(walFS, regionDir); 704 long maxSeqId = getMaxSequenceId(files); 705 if (maxSeqId > newMaxSeqId) { 706 throw new IOException("The new max sequence id " + newMaxSeqId + 707 " is less than the old max sequence id " + maxSeqId); 708 } 709 // write a new seqId file 710 Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir), 711 newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); 712 if (newMaxSeqId != maxSeqId) { 713 try { 714 if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { 715 throw new IOException("Failed to create SeqId file:" + newSeqIdFile); 716 } 717 LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, 718 maxSeqId); 719 } catch (FileAlreadyExistsException ignored) { 720 // latest hdfs throws this exception. it's all right if newSeqIdFile already exists 721 } 722 } 723 // remove old ones 724 for (FileStatus status : files) { 725 if (!newSeqIdFile.equals(status.getPath())) { 726 walFS.delete(status.getPath(), false); 727 } 728 } 729 } 730 731 /** 732 * This method will check 3 places for finding the max sequence id file. One is the expected 733 * place, another is the old place under the region directory, and the last one is the wrong one 734 * we introduced in HBASE-20734. See HBASE-22617 for more details. 735 * <p/> 736 * Notice that, you should always call this method instead of 737 * {@link #getMaxRegionSequenceId(FileSystem, Path)} until 4.0.0 release. 738 * @deprecated Only for compatibility, will be removed in 4.0.0. 739 */ 740 @Deprecated 741 public static long getMaxRegionSequenceId(Configuration conf, RegionInfo region, 742 IOExceptionSupplier<FileSystem> rootFsSupplier, IOExceptionSupplier<FileSystem> walFsSupplier) 743 throws IOException { 744 FileSystem rootFs = rootFsSupplier.get(); 745 FileSystem walFs = walFsSupplier.get(); 746 Path regionWALDir = FSUtils.getWALRegionDir(conf, region.getTable(), region.getEncodedName()); 747 // This is the old place where we store max sequence id file 748 Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), region); 749 // This is for HBASE-20734, where we use a wrong directory, see HBASE-22617 for more details. 750 Path wrongRegionWALDir = 751 FSUtils.getWrongWALRegionDir(conf, region.getTable(), region.getEncodedName()); 752 long maxSeqId = getMaxRegionSequenceId(walFs, regionWALDir); 753 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(rootFs, regionDir)); 754 maxSeqId = Math.max(maxSeqId, getMaxRegionSequenceId(walFs, wrongRegionWALDir)); 755 return maxSeqId; 756 } 757 758 /** 759 * Create a new {@link Reader} for reading logs to split. 760 * 761 * @param file 762 * @return A new Reader instance, caller should close 763 * @throws IOException 764 * @throws CorruptedLogFileException 765 */ 766 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) 767 throws IOException, CorruptedLogFileException { 768 Path path = file.getPath(); 769 long length = file.getLen(); 770 Reader in; 771 772 // Check for possibly empty file. With appends, currently Hadoop reports a 773 // zero length even if the file has been sync'd. Revisit if HDFS-376 or 774 // HDFS-878 is committed. 775 if (length <= 0) { 776 LOG.warn("File {} might be still open, length is 0", path); 777 } 778 779 try { 780 FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); 781 try { 782 in = getReader(path, reporter); 783 } catch (EOFException e) { 784 if (length <= 0) { 785 // TODO should we ignore an empty, not-last log file if skip.errors 786 // is false? Either way, the caller should decide what to do. E.g. 787 // ignore if this is the last log in sequence. 788 // TODO is this scenario still possible if the log has been 789 // recovered (i.e. closed) 790 LOG.warn("Could not open {} for reading. File is empty", path, e); 791 } 792 // EOFException being ignored 793 return null; 794 } 795 } catch (IOException e) { 796 if (e instanceof FileNotFoundException) { 797 // A wal file may not exist anymore. Nothing can be recovered so move on 798 LOG.warn("File {} does not exist anymore", path, e); 799 return null; 800 } 801 if (!skipErrors || e instanceof InterruptedIOException) { 802 throw e; // Don't mark the file corrupted if interrupted, or not skipErrors 803 } 804 CorruptedLogFileException t = 805 new CorruptedLogFileException("skipErrors=true Could not open wal " + 806 path + " ignoring"); 807 t.initCause(e); 808 throw t; 809 } 810 return in; 811 } 812 813 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) 814 throws CorruptedLogFileException, IOException { 815 try { 816 return in.next(); 817 } catch (EOFException eof) { 818 // truncated files are expected if a RS crashes (see HBASE-2643) 819 LOG.info("EOF from wal {}. Continuing.", path); 820 return null; 821 } catch (IOException e) { 822 // If the IOE resulted from bad file format, 823 // then this problem is idempotent and retrying won't help 824 if (e.getCause() != null && 825 (e.getCause() instanceof ParseException || 826 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { 827 LOG.warn("Parse exception from wal {}. Continuing", path, e); 828 return null; 829 } 830 if (!skipErrors) { 831 throw e; 832 } 833 CorruptedLogFileException t = 834 new CorruptedLogFileException("skipErrors=true Ignoring exception" + 835 " while parsing wal " + path + ". Marking as corrupted"); 836 t.initCause(e); 837 throw t; 838 } 839 } 840 841 /** 842 * Create a new {@link Writer} for writing log splits. 843 * @return a new Writer instance, caller should close 844 */ 845 protected Writer createWriter(Path logfile) 846 throws IOException { 847 return walFactory.createRecoveredEditsWriter(walFS, logfile); 848 } 849 850 /** 851 * Create a new {@link Reader} for reading logs to split. 852 * @return new Reader instance, caller should close 853 */ 854 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { 855 return walFactory.createReader(walFS, curLogFile, reporter); 856 } 857 858 /** 859 * Get current open writers 860 */ 861 private int getNumOpenWriters() { 862 int result = 0; 863 if (this.outputSink != null) { 864 result += this.outputSink.getNumOpenWriters(); 865 } 866 return result; 867 } 868 869 /** 870 * Contains some methods to control WAL-entries producer / consumer interactions 871 */ 872 public static class PipelineController { 873 // If an exception is thrown by one of the other threads, it will be 874 // stored here. 875 AtomicReference<Throwable> thrown = new AtomicReference<>(); 876 877 // Wait/notify for when data has been produced by the writer thread, 878 // consumed by the reader thread, or an exception occurred 879 public final Object dataAvailable = new Object(); 880 881 void writerThreadError(Throwable t) { 882 thrown.compareAndSet(null, t); 883 } 884 885 /** 886 * Check for errors in the writer threads. If any is found, rethrow it. 887 */ 888 void checkForErrors() throws IOException { 889 Throwable thrown = this.thrown.get(); 890 if (thrown == null) return; 891 if (thrown instanceof IOException) { 892 throw new IOException(thrown); 893 } else { 894 throw new RuntimeException(thrown); 895 } 896 } 897 } 898 899 /** 900 * Class which accumulates edits and separates them into a buffer per region 901 * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses 902 * a predefined threshold. 903 * 904 * Writer threads then pull region-specific buffers from this class. 905 */ 906 public static class EntryBuffers { 907 PipelineController controller; 908 909 Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); 910 911 /* Track which regions are currently in the middle of writing. We don't allow 912 an IO thread to pick up bytes from a region if we're already writing 913 data for that region in a different IO thread. */ 914 Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); 915 916 long totalBuffered = 0; 917 long maxHeapUsage; 918 boolean splitWriterCreationBounded; 919 920 public EntryBuffers(PipelineController controller, long maxHeapUsage) { 921 this(controller, maxHeapUsage, false); 922 } 923 924 public EntryBuffers(PipelineController controller, long maxHeapUsage, 925 boolean splitWriterCreationBounded){ 926 this.controller = controller; 927 this.maxHeapUsage = maxHeapUsage; 928 this.splitWriterCreationBounded = splitWriterCreationBounded; 929 } 930 931 /** 932 * Append a log entry into the corresponding region buffer. 933 * Blocks if the total heap usage has crossed the specified threshold. 934 * 935 * @throws InterruptedException 936 * @throws IOException 937 */ 938 public void appendEntry(Entry entry) throws InterruptedException, IOException { 939 WALKey key = entry.getKey(); 940 941 RegionEntryBuffer buffer; 942 long incrHeap; 943 synchronized (this) { 944 buffer = buffers.get(key.getEncodedRegionName()); 945 if (buffer == null) { 946 buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); 947 buffers.put(key.getEncodedRegionName(), buffer); 948 } 949 incrHeap= buffer.appendEntry(entry); 950 } 951 952 // If we crossed the chunk threshold, wait for more space to be available 953 synchronized (controller.dataAvailable) { 954 totalBuffered += incrHeap; 955 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { 956 LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); 957 controller.dataAvailable.wait(2000); 958 } 959 controller.dataAvailable.notifyAll(); 960 } 961 controller.checkForErrors(); 962 } 963 964 /** 965 * @return RegionEntryBuffer a buffer of edits to be written. 966 */ 967 synchronized RegionEntryBuffer getChunkToWrite() { 968 // The core part of limiting opening writers is it doesn't return chunk only if the 969 // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each 970 // region during splitting. It will flush all the logs in the buffer after splitting 971 // through a threadpool, which means the number of writers it created is under control. 972 if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { 973 return null; 974 } 975 long biggestSize = 0; 976 byte[] biggestBufferKey = null; 977 978 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) { 979 long size = entry.getValue().heapSize(); 980 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { 981 biggestSize = size; 982 biggestBufferKey = entry.getKey(); 983 } 984 } 985 if (biggestBufferKey == null) { 986 return null; 987 } 988 989 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); 990 currentlyWriting.add(biggestBufferKey); 991 return buffer; 992 } 993 994 void doneWriting(RegionEntryBuffer buffer) { 995 synchronized (this) { 996 boolean removed = currentlyWriting.remove(buffer.encodedRegionName); 997 assert removed; 998 } 999 long size = buffer.heapSize(); 1000 1001 synchronized (controller.dataAvailable) { 1002 totalBuffered -= size; 1003 // We may unblock writers 1004 controller.dataAvailable.notifyAll(); 1005 } 1006 } 1007 1008 synchronized boolean isRegionCurrentlyWriting(byte[] region) { 1009 return currentlyWriting.contains(region); 1010 } 1011 1012 public void waitUntilDrained() { 1013 synchronized (controller.dataAvailable) { 1014 while (totalBuffered > 0) { 1015 try { 1016 controller.dataAvailable.wait(2000); 1017 } catch (InterruptedException e) { 1018 LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); 1019 Thread.interrupted(); 1020 break; 1021 } 1022 } 1023 } 1024 } 1025 } 1026 1027 /** 1028 * A buffer of some number of edits for a given region. 1029 * This accumulates edits and also provides a memory optimization in order to 1030 * share a single byte array instance for the table and region name. 1031 * Also tracks memory usage of the accumulated edits. 1032 */ 1033 public static class RegionEntryBuffer implements HeapSize { 1034 long heapInBuffer = 0; 1035 List<Entry> entryBuffer; 1036 TableName tableName; 1037 byte[] encodedRegionName; 1038 1039 RegionEntryBuffer(TableName tableName, byte[] region) { 1040 this.tableName = tableName; 1041 this.encodedRegionName = region; 1042 this.entryBuffer = new ArrayList<>(); 1043 } 1044 1045 long appendEntry(Entry entry) { 1046 internify(entry); 1047 entryBuffer.add(entry); 1048 long incrHeap = entry.getEdit().heapSize() + 1049 ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers 1050 0; // TODO linkedlist entry 1051 heapInBuffer += incrHeap; 1052 return incrHeap; 1053 } 1054 1055 private void internify(Entry entry) { 1056 WALKeyImpl k = entry.getKey(); 1057 k.internTableName(this.tableName); 1058 k.internEncodedRegionName(this.encodedRegionName); 1059 } 1060 1061 @Override 1062 public long heapSize() { 1063 return heapInBuffer; 1064 } 1065 1066 public byte[] getEncodedRegionName() { 1067 return encodedRegionName; 1068 } 1069 1070 public List<Entry> getEntryBuffer() { 1071 return entryBuffer; 1072 } 1073 1074 public TableName getTableName() { 1075 return tableName; 1076 } 1077 } 1078 1079 public static class WriterThread extends Thread { 1080 private volatile boolean shouldStop = false; 1081 private PipelineController controller; 1082 private EntryBuffers entryBuffers; 1083 private OutputSink outputSink = null; 1084 1085 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){ 1086 super(Thread.currentThread().getName() + "-Writer-" + i); 1087 this.controller = controller; 1088 this.entryBuffers = entryBuffers; 1089 outputSink = sink; 1090 } 1091 1092 @Override 1093 public void run() { 1094 try { 1095 doRun(); 1096 } catch (Throwable t) { 1097 LOG.error("Exiting thread", t); 1098 controller.writerThreadError(t); 1099 } 1100 } 1101 1102 private void doRun() throws IOException { 1103 LOG.trace("Writer thread starting"); 1104 while (true) { 1105 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); 1106 if (buffer == null) { 1107 // No data currently available, wait on some more to show up 1108 synchronized (controller.dataAvailable) { 1109 if (shouldStop && !this.outputSink.flush()) { 1110 return; 1111 } 1112 try { 1113 controller.dataAvailable.wait(500); 1114 } catch (InterruptedException ie) { 1115 if (!shouldStop) { 1116 throw new RuntimeException(ie); 1117 } 1118 } 1119 } 1120 continue; 1121 } 1122 1123 assert buffer != null; 1124 try { 1125 writeBuffer(buffer); 1126 } finally { 1127 entryBuffers.doneWriting(buffer); 1128 } 1129 } 1130 } 1131 1132 private void writeBuffer(RegionEntryBuffer buffer) throws IOException { 1133 outputSink.append(buffer); 1134 } 1135 1136 void finish() { 1137 synchronized (controller.dataAvailable) { 1138 shouldStop = true; 1139 controller.dataAvailable.notifyAll(); 1140 } 1141 } 1142 } 1143 1144 /** 1145 * The following class is an abstraction class to provide a common interface to support 1146 * different ways of consuming recovered edits. 1147 */ 1148 public static abstract class OutputSink { 1149 1150 protected PipelineController controller; 1151 protected EntryBuffers entryBuffers; 1152 1153 protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>(); 1154 protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum = 1155 new ConcurrentHashMap<>(); 1156 1157 1158 protected final List<WriterThread> writerThreads = Lists.newArrayList(); 1159 1160 /* Set of regions which we've decided should not output edits */ 1161 protected final Set<byte[]> blacklistedRegions = Collections 1162 .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR)); 1163 1164 protected boolean closeAndCleanCompleted = false; 1165 1166 protected boolean writersClosed = false; 1167 1168 protected final int numThreads; 1169 1170 protected CancelableProgressable reporter = null; 1171 1172 protected AtomicLong skippedEdits = new AtomicLong(); 1173 1174 protected List<Path> splits = null; 1175 1176 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 1177 numThreads = numWriters; 1178 this.controller = controller; 1179 this.entryBuffers = entryBuffers; 1180 } 1181 1182 void setReporter(CancelableProgressable reporter) { 1183 this.reporter = reporter; 1184 } 1185 1186 /** 1187 * Start the threads that will pump data from the entryBuffers to the output files. 1188 */ 1189 public synchronized void startWriterThreads() { 1190 for (int i = 0; i < numThreads; i++) { 1191 WriterThread t = new WriterThread(controller, entryBuffers, this, i); 1192 t.start(); 1193 writerThreads.add(t); 1194 } 1195 } 1196 1197 /** 1198 * 1199 * Update region's maximum edit log SeqNum. 1200 */ 1201 void updateRegionMaximumEditLogSeqNum(Entry entry) { 1202 synchronized (regionMaximumEditLogSeqNum) { 1203 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 1204 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 1205 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 1206 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 1207 } 1208 } 1209 } 1210 1211 /** 1212 * @return the number of currently opened writers 1213 */ 1214 int getNumOpenWriters() { 1215 return this.writers.size(); 1216 } 1217 1218 long getSkippedEdits() { 1219 return this.skippedEdits.get(); 1220 } 1221 1222 /** 1223 * Wait for writer threads to dump all info to the sink 1224 * @return true when there is no error 1225 * @throws IOException 1226 */ 1227 protected boolean finishWriting(boolean interrupt) throws IOException { 1228 LOG.debug("Waiting for split writer threads to finish"); 1229 boolean progress_failed = false; 1230 for (WriterThread t : writerThreads) { 1231 t.finish(); 1232 } 1233 if (interrupt) { 1234 for (WriterThread t : writerThreads) { 1235 t.interrupt(); // interrupt the writer threads. We are stopping now. 1236 } 1237 } 1238 1239 for (WriterThread t : writerThreads) { 1240 if (!progress_failed && reporter != null && !reporter.progress()) { 1241 progress_failed = true; 1242 } 1243 try { 1244 t.join(); 1245 } catch (InterruptedException ie) { 1246 IOException iie = new InterruptedIOException(); 1247 iie.initCause(ie); 1248 throw iie; 1249 } 1250 } 1251 controller.checkForErrors(); 1252 LOG.info("{} split writers finished; closing.", this.writerThreads.size()); 1253 return (!progress_failed); 1254 } 1255 1256 public abstract List<Path> finishWritingAndClose() throws IOException; 1257 1258 /** 1259 * @return a map from encoded region ID to the number of edits written out for that region. 1260 */ 1261 public abstract Map<byte[], Long> getOutputCounts(); 1262 1263 /** 1264 * @return number of regions we've recovered 1265 */ 1266 public abstract int getNumberOfRecoveredRegions(); 1267 1268 /** 1269 * @param buffer A WAL Edit Entry 1270 * @throws IOException 1271 */ 1272 public abstract void append(RegionEntryBuffer buffer) throws IOException; 1273 1274 /** 1275 * WriterThread call this function to help flush internal remaining edits in buffer before close 1276 * @return true when underlying sink has something to flush 1277 */ 1278 public boolean flush() throws IOException { 1279 return false; 1280 } 1281 1282 /** 1283 * Some WALEdit's contain only KV's for account on what happened to a region. 1284 * Not all sinks will want to get all of those edits. 1285 * 1286 * @return Return true if this sink wants to accept this region-level WALEdit. 1287 */ 1288 public abstract boolean keepRegionEvent(Entry entry); 1289 } 1290 1291 /** 1292 * Class that manages the output streams from the log splitting process. 1293 */ 1294 class LogRecoveredEditsOutputSink extends OutputSink { 1295 1296 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers, 1297 int numWriters) { 1298 // More threads could potentially write faster at the expense 1299 // of causing more disk seeks as the logs are split. 1300 // 3. After a certain setting (probably around 3) the 1301 // process will be bound on the reader in the current 1302 // implementation anyway. 1303 super(controller, entryBuffers, numWriters); 1304 } 1305 1306 /** 1307 * @return null if failed to report progress 1308 * @throws IOException 1309 */ 1310 @Override 1311 public List<Path> finishWritingAndClose() throws IOException { 1312 boolean isSuccessful = false; 1313 List<Path> result = null; 1314 try { 1315 isSuccessful = finishWriting(false); 1316 } finally { 1317 result = close(); 1318 List<IOException> thrown = closeLogWriters(null); 1319 if (CollectionUtils.isNotEmpty(thrown)) { 1320 throw MultipleIOException.createIOException(thrown); 1321 } 1322 } 1323 if (isSuccessful) { 1324 splits = result; 1325 } 1326 return splits; 1327 } 1328 1329 // delete the one with fewer wal entries 1330 private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) 1331 throws IOException { 1332 long dstMinLogSeqNum = -1L; 1333 try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { 1334 WAL.Entry entry = reader.next(); 1335 if (entry != null) { 1336 dstMinLogSeqNum = entry.getKey().getSequenceId(); 1337 } 1338 } catch (EOFException e) { 1339 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", 1340 dst, e); 1341 } 1342 if (wap.minLogSeqNum < dstMinLogSeqNum) { 1343 LOG.warn("Found existing old edits file. It could be the result of a previous failed" 1344 + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" 1345 + walFS.getFileStatus(dst).getLen()); 1346 if (!walFS.delete(dst, false)) { 1347 LOG.warn("Failed deleting of old {}", dst); 1348 throw new IOException("Failed deleting of old " + dst); 1349 } 1350 } else { 1351 LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p 1352 + ", length=" + walFS.getFileStatus(wap.p).getLen()); 1353 if (!walFS.delete(wap.p, false)) { 1354 LOG.warn("Failed deleting of {}", wap.p); 1355 throw new IOException("Failed deleting of " + wap.p); 1356 } 1357 } 1358 } 1359 1360 /** 1361 * Close all of the output streams. 1362 * @return the list of paths written. 1363 */ 1364 List<Path> close() throws IOException { 1365 Preconditions.checkState(!closeAndCleanCompleted); 1366 1367 final List<Path> paths = new ArrayList<>(); 1368 final List<IOException> thrown = Lists.newArrayList(); 1369 ThreadPoolExecutor closeThreadPool = Threads 1370 .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { 1371 private int count = 1; 1372 1373 @Override public Thread newThread(Runnable r) { 1374 Thread t = new Thread(r, "split-log-closeStream-" + count++); 1375 return t; 1376 } 1377 }); 1378 CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool); 1379 boolean progress_failed; 1380 try { 1381 progress_failed = executeCloseTask(completionService, thrown, paths); 1382 } catch (InterruptedException e) { 1383 IOException iie = new InterruptedIOException(); 1384 iie.initCause(e); 1385 throw iie; 1386 } catch (ExecutionException e) { 1387 throw new IOException(e.getCause()); 1388 } finally { 1389 closeThreadPool.shutdownNow(); 1390 } 1391 if (!thrown.isEmpty()) { 1392 throw MultipleIOException.createIOException(thrown); 1393 } 1394 writersClosed = true; 1395 closeAndCleanCompleted = true; 1396 if (progress_failed) { 1397 return null; 1398 } 1399 return paths; 1400 } 1401 1402 /** 1403 * @param completionService threadPool to execute the closing tasks 1404 * @param thrown store the exceptions 1405 * @param paths arrayList to store the paths written 1406 * @return if close tasks executed successful 1407 */ 1408 boolean executeCloseTask(CompletionService<Void> completionService, 1409 List<IOException> thrown, List<Path> paths) 1410 throws InterruptedException, ExecutionException { 1411 for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) { 1412 if (LOG.isTraceEnabled()) { 1413 LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p); 1414 } 1415 completionService.submit(new Callable<Void>() { 1416 @Override public Void call() throws Exception { 1417 WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); 1418 Path dst = closeWriter(writersEntry.getKey(), wap, thrown); 1419 paths.add(dst); 1420 return null; 1421 } 1422 }); 1423 } 1424 boolean progress_failed = false; 1425 for (int i = 0, n = this.writers.size(); i < n; i++) { 1426 Future<Void> future = completionService.take(); 1427 future.get(); 1428 if (!progress_failed && reporter != null && !reporter.progress()) { 1429 progress_failed = true; 1430 } 1431 } 1432 return progress_failed; 1433 } 1434 1435 Path closeWriter(String encodedRegionName, WriterAndPath wap, 1436 List<IOException> thrown) throws IOException{ 1437 LOG.trace("Closing " + wap.p); 1438 try { 1439 wap.w.close(); 1440 } catch (IOException ioe) { 1441 LOG.error("Couldn't close log at " + wap.p, ioe); 1442 thrown.add(ioe); 1443 return null; 1444 } 1445 if (LOG.isDebugEnabled()) { 1446 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten 1447 + " edits, skipped " + wap.editsSkipped + " edits in " 1448 + (wap.nanosSpent / 1000 / 1000) + "ms"); 1449 } 1450 if (wap.editsWritten == 0) { 1451 // just remove the empty recovered.edits file 1452 if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { 1453 LOG.warn("Failed deleting empty " + wap.p); 1454 throw new IOException("Failed deleting empty " + wap.p); 1455 } 1456 return null; 1457 } 1458 1459 Path dst = getCompletedRecoveredEditsFilePath(wap.p, 1460 regionMaximumEditLogSeqNum.get(encodedRegionName)); 1461 try { 1462 if (!dst.equals(wap.p) && walFS.exists(dst)) { 1463 deleteOneWithFewerEntries(wap, dst); 1464 } 1465 // Skip the unit tests which create a splitter that reads and 1466 // writes the data without touching disk. 1467 // TestHLogSplit#testThreading is an example. 1468 if (walFS.exists(wap.p)) { 1469 if (!walFS.rename(wap.p, dst)) { 1470 throw new IOException("Failed renaming " + wap.p + " to " + dst); 1471 } 1472 LOG.info("Rename " + wap.p + " to " + dst); 1473 } 1474 } catch (IOException ioe) { 1475 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); 1476 thrown.add(ioe); 1477 return null; 1478 } 1479 return dst; 1480 } 1481 1482 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException { 1483 if (writersClosed) { 1484 return thrown; 1485 } 1486 if (thrown == null) { 1487 thrown = Lists.newArrayList(); 1488 } 1489 try { 1490 for (WriterThread t : writerThreads) { 1491 while (t.isAlive()) { 1492 t.shouldStop = true; 1493 t.interrupt(); 1494 try { 1495 t.join(10); 1496 } catch (InterruptedException e) { 1497 IOException iie = new InterruptedIOException(); 1498 iie.initCause(e); 1499 throw iie; 1500 } 1501 } 1502 } 1503 } finally { 1504 WriterAndPath wap = null; 1505 for (SinkWriter tmpWAP : writers.values()) { 1506 try { 1507 wap = (WriterAndPath) tmpWAP; 1508 wap.w.close(); 1509 } catch (IOException ioe) { 1510 LOG.error("Couldn't close log at " + wap.p, ioe); 1511 thrown.add(ioe); 1512 continue; 1513 } 1514 LOG.info( 1515 "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent 1516 / 1000 / 1000) + "ms)"); 1517 } 1518 writersClosed = true; 1519 } 1520 1521 return thrown; 1522 } 1523 1524 /** 1525 * Get a writer and path for a log starting at the given entry. This function is threadsafe so 1526 * long as multiple threads are always acting on different regions. 1527 * @return null if this region shouldn't output any logs 1528 */ 1529 WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException { 1530 byte region[] = entry.getKey().getEncodedRegionName(); 1531 String regionName = Bytes.toString(region); 1532 WriterAndPath ret = (WriterAndPath) writers.get(regionName); 1533 if (ret != null) { 1534 return ret; 1535 } 1536 // If we already decided that this region doesn't get any output 1537 // we don't need to check again. 1538 if (blacklistedRegions.contains(region)) { 1539 return null; 1540 } 1541 ret = createWAP(region, entry); 1542 if (ret == null) { 1543 blacklistedRegions.add(region); 1544 return null; 1545 } 1546 if(reusable) { 1547 writers.put(regionName, ret); 1548 } 1549 return ret; 1550 } 1551 1552 /** 1553 * @return a path with a write for that path. caller should close. 1554 */ 1555 WriterAndPath createWAP(byte[] region, Entry entry) throws IOException { 1556 String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 1557 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); 1558 Path regionedits = getRegionSplitEditsPath(entry, 1559 fileBeingSplit.getPath().getName(), tmpDirName, conf); 1560 if (regionedits == null) { 1561 return null; 1562 } 1563 FileSystem walFs = FSUtils.getWALFileSystem(conf); 1564 if (walFs.exists(regionedits)) { 1565 LOG.warn("Found old edits file. It could be the " 1566 + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" 1567 + walFs.getFileStatus(regionedits).getLen()); 1568 if (!walFs.delete(regionedits, false)) { 1569 LOG.warn("Failed delete of old {}", regionedits); 1570 } 1571 } 1572 Writer w = createWriter(regionedits); 1573 LOG.debug("Creating writer path={}", regionedits); 1574 return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); 1575 } 1576 1577 void filterCellByStore(Entry logEntry) { 1578 Map<byte[], Long> maxSeqIdInStores = 1579 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 1580 if (MapUtils.isEmpty(maxSeqIdInStores)) { 1581 return; 1582 } 1583 // Create the array list for the cells that aren't filtered. 1584 // We make the assumption that most cells will be kept. 1585 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 1586 for (Cell cell : logEntry.getEdit().getCells()) { 1587 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { 1588 keptCells.add(cell); 1589 } else { 1590 byte[] family = CellUtil.cloneFamily(cell); 1591 Long maxSeqId = maxSeqIdInStores.get(family); 1592 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 1593 // or the master was crashed before and we can not get the information. 1594 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 1595 keptCells.add(cell); 1596 } 1597 } 1598 } 1599 1600 // Anything in the keptCells array list is still live. 1601 // So rather than removing the cells from the array list 1602 // which would be an O(n^2) operation, we just replace the list 1603 logEntry.getEdit().setCells(keptCells); 1604 } 1605 1606 @Override 1607 public void append(RegionEntryBuffer buffer) throws IOException { 1608 appendBuffer(buffer, true); 1609 } 1610 1611 WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ 1612 List<Entry> entries = buffer.entryBuffer; 1613 if (entries.isEmpty()) { 1614 LOG.warn("got an empty buffer, skipping"); 1615 return null; 1616 } 1617 1618 WriterAndPath wap = null; 1619 1620 long startTime = System.nanoTime(); 1621 try { 1622 int editsCount = 0; 1623 1624 for (Entry logEntry : entries) { 1625 if (wap == null) { 1626 wap = getWriterAndPath(logEntry, reusable); 1627 if (wap == null) { 1628 if (LOG.isTraceEnabled()) { 1629 // This log spews the full edit. Can be massive in the log. Enable only debugging 1630 // WAL lost edit issues. 1631 LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); 1632 } 1633 return null; 1634 } 1635 } 1636 filterCellByStore(logEntry); 1637 if (!logEntry.getEdit().isEmpty()) { 1638 wap.w.append(logEntry); 1639 this.updateRegionMaximumEditLogSeqNum(logEntry); 1640 editsCount++; 1641 } else { 1642 wap.incrementSkippedEdits(1); 1643 } 1644 } 1645 // Pass along summary statistics 1646 wap.incrementEdits(editsCount); 1647 wap.incrementNanoTime(System.nanoTime() - startTime); 1648 } catch (IOException e) { 1649 e = e instanceof RemoteException ? 1650 ((RemoteException)e).unwrapRemoteException() : e; 1651 LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); 1652 throw e; 1653 } 1654 return wap; 1655 } 1656 1657 @Override 1658 public boolean keepRegionEvent(Entry entry) { 1659 ArrayList<Cell> cells = entry.getEdit().getCells(); 1660 for (Cell cell : cells) { 1661 if (WALEdit.isCompactionMarker(cell)) { 1662 return true; 1663 } 1664 } 1665 return false; 1666 } 1667 1668 /** 1669 * @return a map from encoded region ID to the number of edits written out for that region. 1670 */ 1671 @Override 1672 public Map<byte[], Long> getOutputCounts() { 1673 TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1674 for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) { 1675 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); 1676 } 1677 return ret; 1678 } 1679 1680 @Override 1681 public int getNumberOfRecoveredRegions() { 1682 return writers.size(); 1683 } 1684 } 1685 1686 /** 1687 * 1688 */ 1689 class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { 1690 1691 private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>(); 1692 1693 public BoundedLogWriterCreationOutputSink(PipelineController controller, 1694 EntryBuffers entryBuffers, int numWriters) { 1695 super(controller, entryBuffers, numWriters); 1696 } 1697 1698 @Override 1699 public List<Path> finishWritingAndClose() throws IOException { 1700 boolean isSuccessful; 1701 List<Path> result; 1702 try { 1703 isSuccessful = finishWriting(false); 1704 } finally { 1705 result = close(); 1706 } 1707 if (isSuccessful) { 1708 splits = result; 1709 } 1710 return splits; 1711 } 1712 1713 @Override 1714 boolean executeCloseTask(CompletionService<Void> completionService, 1715 List<IOException> thrown, List<Path> paths) 1716 throws InterruptedException, ExecutionException { 1717 for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) { 1718 LOG.info("Submitting writeThenClose of {}", 1719 Bytes.toString(buffer.getValue().encodedRegionName)); 1720 completionService.submit(new Callable<Void>() { 1721 @Override 1722 public Void call() throws Exception { 1723 Path dst = writeThenClose(buffer.getValue()); 1724 paths.add(dst); 1725 return null; 1726 } 1727 }); 1728 } 1729 boolean progress_failed = false; 1730 for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { 1731 Future<Void> future = completionService.take(); 1732 future.get(); 1733 if (!progress_failed && reporter != null && !reporter.progress()) { 1734 progress_failed = true; 1735 } 1736 } 1737 1738 return progress_failed; 1739 } 1740 1741 /** 1742 * since the splitting process may create multiple output files, we need a map 1743 * regionRecoverStatMap to track the output count of each region. 1744 * @return a map from encoded region ID to the number of edits written out for that region. 1745 */ 1746 @Override 1747 public Map<byte[], Long> getOutputCounts() { 1748 Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>(); 1749 for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){ 1750 regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); 1751 } 1752 return regionRecoverStatMapResult; 1753 } 1754 1755 /** 1756 * @return the number of recovered regions 1757 */ 1758 @Override 1759 public int getNumberOfRecoveredRegions() { 1760 return regionRecoverStatMap.size(); 1761 } 1762 1763 /** 1764 * Append the buffer to a new recovered edits file, then close it after all done 1765 * @param buffer contain all entries of a certain region 1766 * @throws IOException when closeWriter failed 1767 */ 1768 @Override 1769 public void append(RegionEntryBuffer buffer) throws IOException { 1770 writeThenClose(buffer); 1771 } 1772 1773 private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { 1774 WriterAndPath wap = appendBuffer(buffer, false); 1775 if(wap != null) { 1776 String encodedRegionName = Bytes.toString(buffer.encodedRegionName); 1777 Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); 1778 if (value != null) { 1779 Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; 1780 regionRecoverStatMap.put(encodedRegionName, newValue); 1781 } 1782 } 1783 1784 Path dst = null; 1785 List<IOException> thrown = new ArrayList<>(); 1786 if(wap != null){ 1787 dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); 1788 } 1789 if (!thrown.isEmpty()) { 1790 throw MultipleIOException.createIOException(thrown); 1791 } 1792 return dst; 1793 } 1794 } 1795 1796 /** 1797 * Class wraps the actual writer which writes data out and related statistics 1798 */ 1799 public abstract static class SinkWriter { 1800 /* Count of edits written to this path */ 1801 long editsWritten = 0; 1802 /* Count of edits skipped to this path */ 1803 long editsSkipped = 0; 1804 /* Number of nanos spent writing to this log */ 1805 long nanosSpent = 0; 1806 1807 void incrementEdits(int edits) { 1808 editsWritten += edits; 1809 } 1810 1811 void incrementSkippedEdits(int skipped) { 1812 editsSkipped += skipped; 1813 } 1814 1815 void incrementNanoTime(long nanos) { 1816 nanosSpent += nanos; 1817 } 1818 } 1819 1820 /** 1821 * Private data structure that wraps a Writer and its Path, also collecting statistics about the 1822 * data written to this output. 1823 */ 1824 private final static class WriterAndPath extends SinkWriter { 1825 final Path p; 1826 final Writer w; 1827 final long minLogSeqNum; 1828 1829 WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) { 1830 this.p = p; 1831 this.w = w; 1832 this.minLogSeqNum = minLogSeqNum; 1833 } 1834 } 1835 1836 static class CorruptedLogFileException extends Exception { 1837 private static final long serialVersionUID = 1L; 1838 1839 CorruptedLogFileException(String s) { 1840 super(s); 1841 } 1842 } 1843 1844 /** A struct used by getMutationsFromWALEntry */ 1845 public static class MutationReplay implements Comparable<MutationReplay> { 1846 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { 1847 this.type = type; 1848 this.mutation = mutation; 1849 if(this.mutation.getDurability() != Durability.SKIP_WAL) { 1850 // using ASYNC_WAL for relay 1851 this.mutation.setDurability(Durability.ASYNC_WAL); 1852 } 1853 this.nonceGroup = nonceGroup; 1854 this.nonce = nonce; 1855 } 1856 1857 public final MutationType type; 1858 public final Mutation mutation; 1859 public final long nonceGroup; 1860 public final long nonce; 1861 1862 @Override 1863 public int compareTo(final MutationReplay d) { 1864 return this.mutation.compareTo(d.mutation); 1865 } 1866 1867 @Override 1868 public boolean equals(Object obj) { 1869 if(!(obj instanceof MutationReplay)) { 1870 return false; 1871 } else { 1872 return this.compareTo((MutationReplay)obj) == 0; 1873 } 1874 } 1875 1876 @Override 1877 public int hashCode() { 1878 return this.mutation.hashCode(); 1879 } 1880 } 1881 1882 /** 1883 * This function is used to construct mutations from a WALEntry. It also 1884 * reconstructs WALKey & WALEdit from the passed in WALEntry 1885 * @param entry 1886 * @param cells 1887 * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances 1888 * extracted from the passed in WALEntry. 1889 * @return list of Pair<MutationType, Mutation> to be replayed 1890 * @throws IOException 1891 */ 1892 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells, 1893 Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException { 1894 if (entry == null) { 1895 // return an empty array 1896 return Collections.emptyList(); 1897 } 1898 1899 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 1900 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); 1901 int count = entry.getAssociatedCellCount(); 1902 List<MutationReplay> mutations = new ArrayList<>(); 1903 Cell previousCell = null; 1904 Mutation m = null; 1905 WALKeyImpl key = null; 1906 WALEdit val = null; 1907 if (logEntry != null) { 1908 val = new WALEdit(); 1909 } 1910 1911 for (int i = 0; i < count; i++) { 1912 // Throw index out of bounds if our cell count is off 1913 if (!cells.advance()) { 1914 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); 1915 } 1916 Cell cell = cells.current(); 1917 if (val != null) val.add(cell); 1918 1919 boolean isNewRowOrType = 1920 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() 1921 || !CellUtil.matchingRows(previousCell, cell); 1922 if (isNewRowOrType) { 1923 // Create new mutation 1924 if (CellUtil.isDelete(cell)) { 1925 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 1926 // Deletes don't have nonces. 1927 mutations.add(new MutationReplay( 1928 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE)); 1929 } else { 1930 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); 1931 // Puts might come from increment or append, thus we need nonces. 1932 long nonceGroup = entry.getKey().hasNonceGroup() 1933 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; 1934 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; 1935 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce)); 1936 } 1937 } 1938 if (CellUtil.isDelete(cell)) { 1939 ((Delete) m).add(cell); 1940 } else { 1941 ((Put) m).add(cell); 1942 } 1943 m.setDurability(durability); 1944 previousCell = cell; 1945 } 1946 1947 // reconstruct WALKey 1948 if (logEntry != null) { 1949 org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = 1950 entry.getKey(); 1951 List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); 1952 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { 1953 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); 1954 } 1955 key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( 1956 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), 1957 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null); 1958 logEntry.setFirst(key); 1959 logEntry.setSecond(val); 1960 } 1961 1962 return mutations; 1963 } 1964}