001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.function.BiConsumer; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FSDataOutputStream; 039import org.apache.hadoop.fs.FileChecksum; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsPermission; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.io.FileLink; 049import org.apache.hadoop.hbase.io.HFileLink; 050import org.apache.hadoop.hbase.io.WALLink; 051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.mob.MobUtils; 054import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 055import org.apache.hadoop.hbase.util.AbstractHBaseTool; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.hbase.util.HFileArchiveUtil; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.io.NullWritable; 063import org.apache.hadoop.io.Writable; 064import org.apache.hadoop.mapreduce.InputFormat; 065import org.apache.hadoop.mapreduce.InputSplit; 066import org.apache.hadoop.mapreduce.Job; 067import org.apache.hadoop.mapreduce.JobContext; 068import org.apache.hadoop.mapreduce.Mapper; 069import org.apache.hadoop.mapreduce.RecordReader; 070import org.apache.hadoop.mapreduce.TaskAttemptContext; 071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 072import org.apache.hadoop.mapreduce.security.TokenCache; 073import org.apache.hadoop.util.StringUtils; 074import org.apache.hadoop.util.Tool; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 081 082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 086 087/** 088 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 089 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 090 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 091 */ 092@InterfaceAudience.Public 093public class ExportSnapshot extends AbstractHBaseTool implements Tool { 094 public static final String NAME = "exportsnapshot"; 095 /** Configuration prefix for overrides for the source filesystem */ 096 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 097 /** Configuration prefix for overrides for the destination filesystem */ 098 public static final String CONF_DEST_PREFIX = NAME + ".to."; 099 100 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 101 102 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 103 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 104 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 105 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 106 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 107 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 108 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 109 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 110 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 111 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 112 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 113 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 114 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 115 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 116 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 117 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 118 private static final String CONF_COPY_MANIFEST_THREADS = 119 "snapshot.export.copy.references.threads"; 120 private static final int DEFAULT_COPY_MANIFEST_THREADS = 121 Runtime.getRuntime().availableProcessors(); 122 123 static class Testing { 124 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 125 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 126 int failuresCountToInject = 0; 127 int injectedFailureCount = 0; 128 } 129 130 // Command line options and defaults. 131 static final class Options { 132 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 133 static final Option TARGET_NAME = 134 new Option(null, "target", true, "Target name for the snapshot."); 135 static final Option COPY_TO = 136 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 137 static final Option COPY_FROM = 138 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 139 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 140 "Do not verify checksum, use name+length only."); 141 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 142 "Do not verify the exported snapshot's expiration status and integrity."); 143 static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false, 144 "Do not verify the source snapshot's expiration status and integrity."); 145 static final Option OVERWRITE = 146 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 147 static final Option CHUSER = 148 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 149 static final Option CHGROUP = 150 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 151 static final Option CHMOD = 152 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 153 static final Option MAPPERS = new Option(null, "mappers", true, 154 "Number of mappers to use during the copy (mapreduce.job.maps)."); 155 static final Option BANDWIDTH = 156 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 157 static final Option RESET_TTL = 158 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 159 } 160 161 // Export Map-Reduce Counters, to keep track of the progress 162 public enum Counter { 163 MISSING_FILES, 164 FILES_COPIED, 165 FILES_SKIPPED, 166 COPY_FAILED, 167 BYTES_EXPECTED, 168 BYTES_SKIPPED, 169 BYTES_COPIED 170 } 171 172 /** 173 * Indicates the checksum comparison result. 174 */ 175 public enum ChecksumComparison { 176 TRUE, // checksum comparison is compatible and true. 177 FALSE, // checksum comparison is compatible and false. 178 INCOMPATIBLE, // checksum comparison is not compatible. 179 } 180 181 private static class ExportMapper 182 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 183 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 184 final static int REPORT_SIZE = 1 * 1024 * 1024; 185 final static int BUFFER_SIZE = 64 * 1024; 186 187 private boolean verifyChecksum; 188 private String filesGroup; 189 private String filesUser; 190 private short filesMode; 191 private int bufferSize; 192 private int reportSize; 193 194 private FileSystem outputFs; 195 private Path outputArchive; 196 private Path outputRoot; 197 198 private FileSystem inputFs; 199 private Path inputArchive; 200 private Path inputRoot; 201 202 private static Testing testing = new Testing(); 203 204 @Override 205 public void setup(Context context) throws IOException { 206 Configuration conf = context.getConfiguration(); 207 208 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 209 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 210 211 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 212 213 filesGroup = conf.get(CONF_FILES_GROUP); 214 filesUser = conf.get(CONF_FILES_USER); 215 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 216 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 217 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 218 219 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 220 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 221 222 try { 223 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 224 } catch (IOException e) { 225 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 226 } 227 228 try { 229 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 230 } catch (IOException e) { 231 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 232 } 233 234 // Use the default block size of the outputFs if bigger 235 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 236 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 237 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 238 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 239 240 for (Counter c : Counter.values()) { 241 context.getCounter(c).increment(0); 242 } 243 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 244 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 245 // Get number of times we have already injected failure based on attempt number of this 246 // task. 247 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 248 } 249 } 250 251 @Override 252 public void map(BytesWritable key, NullWritable value, Context context) 253 throws InterruptedException, IOException { 254 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 255 Path outputPath = getOutputPath(inputInfo); 256 257 copyFile(context, inputInfo, outputPath); 258 } 259 260 /** 261 * Returns the location where the inputPath will be copied. 262 */ 263 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 264 Path path = null; 265 switch (inputInfo.getType()) { 266 case HFILE: 267 Path inputPath = new Path(inputInfo.getHfile()); 268 String family = inputPath.getParent().getName(); 269 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 270 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 271 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 272 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 273 new Path(region, new Path(family, hfile))); 274 break; 275 case WAL: 276 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 277 break; 278 default: 279 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 280 } 281 return new Path(outputArchive, path); 282 } 283 284 @SuppressWarnings("checkstyle:linelength") 285 /** 286 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 287 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 288 */ 289 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 290 throws IOException { 291 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 292 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 293 testing.injectedFailureCount++; 294 context.getCounter(Counter.COPY_FAILED).increment(1); 295 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 296 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 297 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 298 } 299 300 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 301 final Path outputPath) throws IOException { 302 // Get the file information 303 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 304 305 // Verify if the output file exists and is the same that we want to copy 306 if (outputFs.exists(outputPath)) { 307 FileStatus outputStat = outputFs.getFileStatus(outputPath); 308 if (outputStat != null && sameFile(inputStat, outputStat)) { 309 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 310 context.getCounter(Counter.FILES_SKIPPED).increment(1); 311 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 312 return; 313 } 314 } 315 316 InputStream in = openSourceFile(context, inputInfo); 317 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 318 if (Integer.MAX_VALUE != bandwidthMB) { 319 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 320 } 321 322 Path inputPath = inputStat.getPath(); 323 try { 324 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 325 326 // Ensure that the output folder is there and copy the file 327 createOutputPath(outputPath.getParent()); 328 FSDataOutputStream out = outputFs.create(outputPath, true); 329 330 long stime = EnvironmentEdgeManager.currentTime(); 331 long totalBytesWritten = 332 copyData(context, inputPath, in, outputPath, out, inputStat.getLen()); 333 334 // Verify the file length and checksum 335 verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath)); 336 337 long etime = EnvironmentEdgeManager.currentTime(); 338 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 339 LOG 340 .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten) 341 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String 342 .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 343 context.getCounter(Counter.FILES_COPIED).increment(1); 344 345 // Try to Preserve attributes 346 if (!preserveAttributes(outputPath, inputStat)) { 347 LOG.warn("You may have to run manually chown on: " + outputPath); 348 } 349 } catch (IOException e) { 350 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 351 context.getCounter(Counter.COPY_FAILED).increment(1); 352 throw e; 353 } finally { 354 injectTestFailure(context, inputInfo); 355 } 356 } 357 358 /** 359 * Create the output folder and optionally set ownership. 360 */ 361 private void createOutputPath(final Path path) throws IOException { 362 if (filesUser == null && filesGroup == null) { 363 outputFs.mkdirs(path); 364 } else { 365 Path parent = path.getParent(); 366 if (!outputFs.exists(parent) && !parent.isRoot()) { 367 createOutputPath(parent); 368 } 369 outputFs.mkdirs(path); 370 if (filesUser != null || filesGroup != null) { 371 // override the owner when non-null user/group is specified 372 outputFs.setOwner(path, filesUser, filesGroup); 373 } 374 if (filesMode > 0) { 375 outputFs.setPermission(path, new FsPermission(filesMode)); 376 } 377 } 378 } 379 380 /** 381 * Try to Preserve the files attribute selected by the user copying them from the source file 382 * This is only required when you are exporting as a different user than "hbase" or on a system 383 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 384 * can force a chmod with the user that knows is available on the system. 385 */ 386 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 387 FileStatus stat; 388 try { 389 stat = outputFs.getFileStatus(path); 390 } catch (IOException e) { 391 LOG.warn("Unable to get the status for file=" + path); 392 return false; 393 } 394 395 try { 396 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 397 outputFs.setPermission(path, new FsPermission(filesMode)); 398 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 399 outputFs.setPermission(path, refStat.getPermission()); 400 } 401 } catch (IOException e) { 402 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 403 return false; 404 } 405 406 boolean hasRefStat = (refStat != null); 407 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 408 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 409 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 410 try { 411 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 412 outputFs.setOwner(path, user, group); 413 } 414 } catch (IOException e) { 415 LOG.warn( 416 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 417 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 418 + " group=" + group); 419 return false; 420 } 421 } 422 423 return true; 424 } 425 426 private boolean stringIsNotEmpty(final String str) { 427 return str != null && str.length() > 0; 428 } 429 430 private long copyData(final Context context, final Path inputPath, final InputStream in, 431 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 432 throws IOException { 433 final String statusMessage = 434 "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)"; 435 436 try { 437 byte[] buffer = new byte[bufferSize]; 438 long totalBytesWritten = 0; 439 int reportBytes = 0; 440 int bytesRead; 441 442 while ((bytesRead = in.read(buffer)) > 0) { 443 out.write(buffer, 0, bytesRead); 444 totalBytesWritten += bytesRead; 445 reportBytes += bytesRead; 446 447 if (reportBytes >= reportSize) { 448 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 449 context.setStatus( 450 String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 451 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 452 + " to " + outputPath); 453 reportBytes = 0; 454 } 455 } 456 457 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 458 context 459 .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 460 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 461 + outputPath); 462 463 return totalBytesWritten; 464 } finally { 465 out.close(); 466 in.close(); 467 } 468 } 469 470 /** 471 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 472 * fail or if the file is not found. 473 */ 474 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 475 throws IOException { 476 try { 477 Configuration conf = context.getConfiguration(); 478 FileLink link = null; 479 switch (fileInfo.getType()) { 480 case HFILE: 481 Path inputPath = new Path(fileInfo.getHfile()); 482 link = getFileLink(inputPath, conf); 483 break; 484 case WAL: 485 String serverName = fileInfo.getWalServer(); 486 String logName = fileInfo.getWalName(); 487 link = new WALLink(inputRoot, serverName, logName); 488 break; 489 default: 490 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 491 } 492 return link.open(inputFs); 493 } catch (IOException e) { 494 context.getCounter(Counter.MISSING_FILES).increment(1); 495 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 496 throw e; 497 } 498 } 499 500 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 501 throws IOException { 502 try { 503 Configuration conf = context.getConfiguration(); 504 FileLink link = null; 505 switch (fileInfo.getType()) { 506 case HFILE: 507 Path inputPath = new Path(fileInfo.getHfile()); 508 link = getFileLink(inputPath, conf); 509 break; 510 case WAL: 511 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 512 break; 513 default: 514 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 515 } 516 return link.getFileStatus(inputFs); 517 } catch (FileNotFoundException e) { 518 context.getCounter(Counter.MISSING_FILES).increment(1); 519 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 520 throw e; 521 } catch (IOException e) { 522 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 523 throw e; 524 } 525 } 526 527 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 528 String regionName = HFileLink.getReferencedRegionName(path.getName()); 529 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 530 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 531 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 532 HFileArchiveUtil.getArchivePath(conf), path); 533 } 534 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 535 } 536 537 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 538 try { 539 return fs.getFileChecksum(path); 540 } catch (IOException e) { 541 LOG.warn("Unable to get checksum for file=" + path, e); 542 return null; 543 } 544 } 545 546 /** 547 * Utility to compare the file length and checksums for the paths specified. 548 */ 549 private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat) 550 throws IOException { 551 long inputLen = inputStat.getLen(); 552 long outputLen = outputStat.getLen(); 553 Path inputPath = inputStat.getPath(); 554 Path outputPath = outputStat.getPath(); 555 556 if (inputLen != outputLen) { 557 throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen 558 + ") and output:" + outputPath + " (" + outputLen + ")"); 559 } 560 561 // If length==0, we will skip checksum 562 if (inputLen != 0 && verifyChecksum) { 563 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 564 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 565 566 ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum); 567 if (!checksumComparison.equals(ChecksumComparison.TRUE)) { 568 StringBuilder errMessage = new StringBuilder("Checksum mismatch between ") 569 .append(inputPath).append(" and ").append(outputPath).append("."); 570 571 boolean addSkipHint = false; 572 String inputScheme = inputFs.getScheme(); 573 String outputScheme = outputFs.getScheme(); 574 if (!inputScheme.equals(outputScheme)) { 575 errMessage.append(" Input and output filesystems are of different types.\n") 576 .append("Their checksum algorithms may be incompatible."); 577 addSkipHint = true; 578 } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) { 579 errMessage.append(" Input and output differ in block-size."); 580 addSkipHint = true; 581 } else if ( 582 inChecksum != null && outChecksum != null 583 && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 584 ) { 585 errMessage.append(" Input and output checksum algorithms are of different types."); 586 addSkipHint = true; 587 } 588 if (addSkipHint) { 589 errMessage 590 .append(" You can choose file-level checksum validation via " 591 + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" 592 + " or filesystems are different.\n") 593 .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,") 594 .append( 595 " for the table backup scenario, you should use -i option to skip checksum-checks.\n") 596 .append(" (NOTE: By skipping checksums, one runs the risk of " 597 + "masking data-corruption during file-transfer.)\n"); 598 } 599 throw new IOException(errMessage.toString()); 600 } 601 } 602 } 603 604 /** 605 * Utility to compare checksums 606 */ 607 private ChecksumComparison verifyChecksum(final FileChecksum inChecksum, 608 final FileChecksum outChecksum) { 609 // If the input or output checksum is null, or the algorithms of input and output are not 610 // equal, that means there is no comparison 611 // and return not compatible. else if matched, return compatible with the matched result. 612 if ( 613 inChecksum == null || outChecksum == null 614 || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 615 ) { 616 return ChecksumComparison.INCOMPATIBLE; 617 } else if (inChecksum.equals(outChecksum)) { 618 return ChecksumComparison.TRUE; 619 } 620 return ChecksumComparison.FALSE; 621 } 622 623 /** 624 * Check if the two files are equal by looking at the file length, and at the checksum (if user 625 * has specified the verifyChecksum flag). 626 */ 627 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 628 // Not matching length 629 if (inputStat.getLen() != outputStat.getLen()) return false; 630 631 // Mark files as equals, since user asked for no checksum verification 632 if (!verifyChecksum) return true; 633 634 // If checksums are not available, files are not the same. 635 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 636 if (inChecksum == null) return false; 637 638 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 639 if (outChecksum == null) return false; 640 641 return inChecksum.equals(outChecksum); 642 } 643 } 644 645 // ========================================================================== 646 // Input Format 647 // ========================================================================== 648 649 /** 650 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 651 * @return list of files referenced by the snapshot (pair of path and size) 652 */ 653 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 654 final FileSystem fs, final Path snapshotDir) throws IOException { 655 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 656 657 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 658 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 659 660 // Get snapshot files 661 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 662 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 663 new SnapshotReferenceUtil.SnapshotVisitor() { 664 @Override 665 public void storeFile(final RegionInfo regionInfo, final String family, 666 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 667 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 668 if (!storeFile.hasReference()) { 669 String region = regionInfo.getEncodedName(); 670 String hfile = storeFile.getName(); 671 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 672 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 673 } else { 674 Pair<String, String> referredToRegionAndFile = 675 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 676 String referencedRegion = referredToRegionAndFile.getFirst(); 677 String referencedHFile = referredToRegionAndFile.getSecond(); 678 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 679 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 680 } 681 files.add(snapshotFileAndSize); 682 } 683 }); 684 685 return files; 686 } 687 688 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 689 Configuration conf, TableName table, String region, String family, String hfile, long size) 690 throws IOException { 691 Path path = HFileLink.createPath(table, region, family, hfile); 692 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 693 .setHfile(path.toString()).build(); 694 if (size == -1) { 695 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 696 } 697 return new Pair<>(fileInfo, size); 698 } 699 700 /** 701 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 702 * The groups created will have similar amounts of bytes. 703 * <p> 704 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 705 * group fetch the bigger file available, iterating through groups alternating the direction. 706 */ 707 static List<List<Pair<SnapshotFileInfo, Long>>> 708 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 709 // Sort files by size, from small to big 710 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 711 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 712 long r = a.getSecond() - b.getSecond(); 713 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 714 } 715 }); 716 717 // create balanced groups 718 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 719 long[] sizeGroups = new long[ngroups]; 720 int hi = files.size() - 1; 721 int lo = 0; 722 723 List<Pair<SnapshotFileInfo, Long>> group; 724 int dir = 1; 725 int g = 0; 726 727 while (hi >= lo) { 728 if (g == fileGroups.size()) { 729 group = new LinkedList<>(); 730 fileGroups.add(group); 731 } else { 732 group = fileGroups.get(g); 733 } 734 735 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 736 737 // add the hi one 738 sizeGroups[g] += fileInfo.getSecond(); 739 group.add(fileInfo); 740 741 // change direction when at the end or the beginning 742 g += dir; 743 if (g == ngroups) { 744 dir = -1; 745 g = ngroups - 1; 746 } else if (g < 0) { 747 dir = 1; 748 g = 0; 749 } 750 } 751 752 if (LOG.isDebugEnabled()) { 753 for (int i = 0; i < sizeGroups.length; ++i) { 754 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 755 } 756 } 757 758 return fileGroups; 759 } 760 761 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 762 @Override 763 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 764 TaskAttemptContext tac) throws IOException, InterruptedException { 765 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 766 } 767 768 @Override 769 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 770 Configuration conf = context.getConfiguration(); 771 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 772 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 773 774 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 775 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 776 if (mappers == 0 && snapshotFiles.size() > 0) { 777 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 778 mappers = Math.min(mappers, snapshotFiles.size()); 779 conf.setInt(CONF_NUM_SPLITS, mappers); 780 conf.setInt(MR_NUM_MAPS, mappers); 781 } 782 783 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 784 List<InputSplit> splits = new ArrayList(groups.size()); 785 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 786 splits.add(new ExportSnapshotInputSplit(files)); 787 } 788 return splits; 789 } 790 791 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 792 private List<Pair<BytesWritable, Long>> files; 793 private long length; 794 795 public ExportSnapshotInputSplit() { 796 this.files = null; 797 } 798 799 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 800 this.files = new ArrayList(snapshotFiles.size()); 801 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 802 this.files.add( 803 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 804 this.length += fileInfo.getSecond(); 805 } 806 } 807 808 private List<Pair<BytesWritable, Long>> getSplitKeys() { 809 return files; 810 } 811 812 @Override 813 public long getLength() throws IOException, InterruptedException { 814 return length; 815 } 816 817 @Override 818 public String[] getLocations() throws IOException, InterruptedException { 819 return new String[] {}; 820 } 821 822 @Override 823 public void readFields(DataInput in) throws IOException { 824 int count = in.readInt(); 825 files = new ArrayList<>(count); 826 length = 0; 827 for (int i = 0; i < count; ++i) { 828 BytesWritable fileInfo = new BytesWritable(); 829 fileInfo.readFields(in); 830 long size = in.readLong(); 831 files.add(new Pair<>(fileInfo, size)); 832 length += size; 833 } 834 } 835 836 @Override 837 public void write(DataOutput out) throws IOException { 838 out.writeInt(files.size()); 839 for (final Pair<BytesWritable, Long> fileInfo : files) { 840 fileInfo.getFirst().write(out); 841 out.writeLong(fileInfo.getSecond()); 842 } 843 } 844 } 845 846 private static class ExportSnapshotRecordReader 847 extends RecordReader<BytesWritable, NullWritable> { 848 private final List<Pair<BytesWritable, Long>> files; 849 private long totalSize = 0; 850 private long procSize = 0; 851 private int index = -1; 852 853 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 854 this.files = files; 855 for (Pair<BytesWritable, Long> fileInfo : files) { 856 totalSize += fileInfo.getSecond(); 857 } 858 } 859 860 @Override 861 public void close() { 862 } 863 864 @Override 865 public BytesWritable getCurrentKey() { 866 return files.get(index).getFirst(); 867 } 868 869 @Override 870 public NullWritable getCurrentValue() { 871 return NullWritable.get(); 872 } 873 874 @Override 875 public float getProgress() { 876 return (float) procSize / totalSize; 877 } 878 879 @Override 880 public void initialize(InputSplit split, TaskAttemptContext tac) { 881 } 882 883 @Override 884 public boolean nextKeyValue() { 885 if (index >= 0) { 886 procSize += files.get(index).getSecond(); 887 } 888 return (++index < files.size()); 889 } 890 } 891 } 892 893 // ========================================================================== 894 // Tool 895 // ========================================================================== 896 897 /** 898 * Run Map-Reduce Job to perform the files copy. 899 */ 900 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 901 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 902 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 903 throws IOException, InterruptedException, ClassNotFoundException { 904 Configuration conf = getConf(); 905 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 906 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 907 if (mappers > 0) { 908 conf.setInt(CONF_NUM_SPLITS, mappers); 909 conf.setInt(MR_NUM_MAPS, mappers); 910 } 911 conf.setInt(CONF_FILES_MODE, filesMode); 912 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 913 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 914 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 915 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 916 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 917 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 918 919 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 920 Job job = new Job(conf); 921 job.setJobName(jobname); 922 job.setJarByClass(ExportSnapshot.class); 923 TableMapReduceUtil.addDependencyJars(job); 924 job.setMapperClass(ExportMapper.class); 925 job.setInputFormatClass(ExportSnapshotInputFormat.class); 926 job.setOutputFormatClass(NullOutputFormat.class); 927 job.setMapSpeculativeExecution(false); 928 job.setNumReduceTasks(0); 929 930 // Acquire the delegation Tokens 931 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 932 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 933 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 934 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 935 936 // Run the MR Job 937 if (!job.waitForCompletion(true)) { 938 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 939 } 940 } 941 942 private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf, 943 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 944 // Update the conf with the current root dir, since may be a different cluster 945 Configuration conf = new Configuration(baseConf); 946 CommonFSUtils.setRootDir(conf, rootDir); 947 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 948 boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), 949 snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); 950 if (isExpired) { 951 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); 952 } 953 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 954 } 955 956 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 957 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 958 ExecutorService pool = Executors 959 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 960 List<Future<Void>> futures = new ArrayList<>(); 961 for (Path dstPath : traversedPath) { 962 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 963 futures.add(future); 964 } 965 try { 966 for (Future<Void> future : futures) { 967 future.get(); 968 } 969 } catch (InterruptedException | ExecutionException e) { 970 throw new IOException(e); 971 } finally { 972 pool.shutdownNow(); 973 } 974 } 975 976 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 977 Configuration conf, List<Path> traversedPath) throws IOException { 978 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 979 try { 980 fs.setOwner(path, filesUser, filesGroup); 981 } catch (IOException e) { 982 throw new RuntimeException( 983 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 984 } 985 }, conf); 986 } 987 988 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 989 final List<Path> traversedPath, final Configuration conf) throws IOException { 990 if (filesMode <= 0) { 991 return; 992 } 993 FsPermission perm = new FsPermission(filesMode); 994 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 995 try { 996 fs.setPermission(path, perm); 997 } catch (IOException e) { 998 throw new RuntimeException( 999 "set permission for file " + path + " to " + filesMode + " failed", e); 1000 } 1001 }, conf); 1002 } 1003 1004 private boolean verifyTarget = true; 1005 private boolean verifySource = true; 1006 private boolean verifyChecksum = true; 1007 private String snapshotName = null; 1008 private String targetName = null; 1009 private boolean overwrite = false; 1010 private String filesGroup = null; 1011 private String filesUser = null; 1012 private Path outputRoot = null; 1013 private Path inputRoot = null; 1014 private int bandwidthMB = Integer.MAX_VALUE; 1015 private int filesMode = 0; 1016 private int mappers = 0; 1017 private boolean resetTtl = false; 1018 1019 @Override 1020 protected void processOptions(CommandLine cmd) { 1021 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 1022 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 1023 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 1024 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 1025 } 1026 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 1027 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 1028 } 1029 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 1030 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 1031 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 1032 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 1033 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 1034 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 1035 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 1036 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 1037 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 1038 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 1039 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 1040 } 1041 1042 /** 1043 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 1044 * @return 0 on success, and != 0 upon failure. 1045 */ 1046 @Override 1047 public int doWork() throws IOException { 1048 Configuration conf = getConf(); 1049 1050 // Check user options 1051 if (snapshotName == null) { 1052 System.err.println("Snapshot name not provided."); 1053 LOG.error("Use -h or --help for usage instructions."); 1054 return EXIT_FAILURE; 1055 } 1056 1057 if (outputRoot == null) { 1058 System.err 1059 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 1060 LOG.error("Use -h or --help for usage instructions."); 1061 return EXIT_FAILURE; 1062 } 1063 1064 if (targetName == null) { 1065 targetName = snapshotName; 1066 } 1067 if (inputRoot == null) { 1068 inputRoot = CommonFSUtils.getRootDir(conf); 1069 } else { 1070 CommonFSUtils.setRootDir(conf, inputRoot); 1071 } 1072 1073 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1074 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 1075 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1076 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 1077 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 1078 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1079 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1080 Path snapshotTmpDir = 1081 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1082 Path outputSnapshotDir = 1083 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1084 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1085 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1086 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1087 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1088 1089 // throw CorruptedSnapshotException if we can't read the snapshot info. 1090 SnapshotDescription sourceSnapshotDesc = 1091 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir); 1092 1093 // Verify snapshot source before copying files 1094 if (verifySource) { 1095 LOG.info("Verify the source snapshot's expiration status and integrity."); 1096 verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir); 1097 } 1098 1099 // Find the necessary directory which need to change owner and group 1100 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1101 if (outputFs.exists(needSetOwnerDir)) { 1102 if (skipTmp) { 1103 needSetOwnerDir = outputSnapshotDir; 1104 } else { 1105 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1106 if (outputFs.exists(needSetOwnerDir)) { 1107 needSetOwnerDir = snapshotTmpDir; 1108 } 1109 } 1110 } 1111 1112 // Check if the snapshot already exists 1113 if (outputFs.exists(outputSnapshotDir)) { 1114 if (overwrite) { 1115 if (!outputFs.delete(outputSnapshotDir, true)) { 1116 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1117 return EXIT_FAILURE; 1118 } 1119 } else { 1120 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1121 + outputSnapshotDir); 1122 return EXIT_FAILURE; 1123 } 1124 } 1125 1126 if (!skipTmp) { 1127 // Check if the snapshot already in-progress 1128 if (outputFs.exists(snapshotTmpDir)) { 1129 if (overwrite) { 1130 if (!outputFs.delete(snapshotTmpDir, true)) { 1131 System.err 1132 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1133 return EXIT_FAILURE; 1134 } 1135 } else { 1136 System.err 1137 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1138 System.err 1139 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1140 System.err 1141 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1142 return EXIT_FAILURE; 1143 } 1144 } 1145 } 1146 1147 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1148 // The snapshot references must be copied before the hfiles otherwise the cleaner 1149 // will remove them because they are unreferenced. 1150 List<Path> travesedPaths = new ArrayList<>(); 1151 boolean copySucceeded = false; 1152 try { 1153 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1154 travesedPaths = 1155 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1156 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1157 copySucceeded = true; 1158 } catch (IOException e) { 1159 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1160 + " to=" + initialOutputSnapshotDir, e); 1161 } finally { 1162 if (copySucceeded) { 1163 if (filesUser != null || filesGroup != null) { 1164 LOG.warn( 1165 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1166 + (filesGroup == null 1167 ? "" 1168 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1169 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1170 } 1171 if (filesMode > 0) { 1172 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1173 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1174 } 1175 } 1176 } 1177 1178 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1179 // reset TTL for target snapshot. 1180 if (!targetName.equals(snapshotName) || resetTtl) { 1181 SnapshotDescription.Builder snapshotDescBuilder = 1182 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1183 if (!targetName.equals(snapshotName)) { 1184 snapshotDescBuilder.setName(targetName); 1185 } 1186 if (resetTtl) { 1187 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1188 } 1189 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1190 initialOutputSnapshotDir, outputFs); 1191 if (filesUser != null || filesGroup != null) { 1192 outputFs.setOwner( 1193 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1194 filesGroup); 1195 } 1196 if (filesMode > 0) { 1197 outputFs.setPermission( 1198 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1199 new FsPermission((short) filesMode)); 1200 } 1201 } 1202 1203 // Step 2 - Start MR Job to copy files 1204 // The snapshot references must be copied before the files otherwise the files gets removed 1205 // by the HFileArchiver, since they have no references. 1206 try { 1207 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1208 filesGroup, filesMode, mappers, bandwidthMB); 1209 1210 LOG.info("Finalize the Snapshot Export"); 1211 if (!skipTmp) { 1212 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1213 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1214 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1215 + snapshotTmpDir + " to=" + outputSnapshotDir); 1216 } 1217 } 1218 1219 // Step 4 - Verify snapshot integrity 1220 if (verifyTarget) { 1221 LOG.info("Verify the exported snapshot's expiration status and integrity."); 1222 SnapshotDescription targetSnapshotDesc = 1223 SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir); 1224 verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir); 1225 } 1226 1227 LOG.info("Export Completed: " + targetName); 1228 return EXIT_SUCCESS; 1229 } catch (Exception e) { 1230 LOG.error("Snapshot export failed", e); 1231 if (!skipTmp) { 1232 outputFs.delete(snapshotTmpDir, true); 1233 } 1234 outputFs.delete(outputSnapshotDir, true); 1235 return EXIT_FAILURE; 1236 } 1237 } 1238 1239 @Override 1240 protected void printUsage() { 1241 super.printUsage(); 1242 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1243 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1244 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1245 + " hbase snapshot export \\\n" 1246 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1247 + " --copy-to hdfs://srv1:50070/hbase"); 1248 } 1249 1250 @Override 1251 protected void addOptions() { 1252 addRequiredOption(Options.SNAPSHOT); 1253 addOption(Options.COPY_TO); 1254 addOption(Options.COPY_FROM); 1255 addOption(Options.TARGET_NAME); 1256 addOption(Options.NO_CHECKSUM_VERIFY); 1257 addOption(Options.NO_TARGET_VERIFY); 1258 addOption(Options.NO_SOURCE_VERIFY); 1259 addOption(Options.OVERWRITE); 1260 addOption(Options.CHUSER); 1261 addOption(Options.CHGROUP); 1262 addOption(Options.CHMOD); 1263 addOption(Options.MAPPERS); 1264 addOption(Options.BANDWIDTH); 1265 addOption(Options.RESET_TTL); 1266 } 1267 1268 public static void main(String[] args) { 1269 new ExportSnapshot().doStaticMain(args); 1270 } 1271}