001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.HashSet; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Set; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.function.BiConsumer; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataInputStream; 040import org.apache.hadoop.fs.FSDataOutputStream; 041import org.apache.hadoop.fs.FileChecksum; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.fs.permission.FsPermission; 046import org.apache.hadoop.hbase.HBaseConfiguration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.io.FileLink; 051import org.apache.hadoop.hbase.io.HFileLink; 052import org.apache.hadoop.hbase.io.WALLink; 053import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 055import org.apache.hadoop.hbase.mob.MobUtils; 056import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 057import org.apache.hadoop.hbase.util.AbstractHBaseTool; 058import org.apache.hadoop.hbase.util.CommonFSUtils; 059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 060import org.apache.hadoop.hbase.util.FSUtils; 061import org.apache.hadoop.hbase.util.HFileArchiveUtil; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.hbase.util.Strings; 064import org.apache.hadoop.io.BytesWritable; 065import org.apache.hadoop.io.NullWritable; 066import org.apache.hadoop.io.Writable; 067import org.apache.hadoop.mapreduce.InputFormat; 068import org.apache.hadoop.mapreduce.InputSplit; 069import org.apache.hadoop.mapreduce.Job; 070import org.apache.hadoop.mapreduce.JobContext; 071import org.apache.hadoop.mapreduce.Mapper; 072import org.apache.hadoop.mapreduce.RecordReader; 073import org.apache.hadoop.mapreduce.TaskAttemptContext; 074import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 075import org.apache.hadoop.mapreduce.security.TokenCache; 076import org.apache.hadoop.util.StringUtils; 077import org.apache.hadoop.util.Tool; 078import org.apache.yetus.audience.InterfaceAudience; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 083import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 084 085import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 089 090/** 091 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 092 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 093 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 094 */ 095@InterfaceAudience.Public 096public class ExportSnapshot extends AbstractHBaseTool implements Tool { 097 public static final String NAME = "exportsnapshot"; 098 /** Configuration prefix for overrides for the source filesystem */ 099 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 100 /** Configuration prefix for overrides for the destination filesystem */ 101 public static final String CONF_DEST_PREFIX = NAME + ".to."; 102 103 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 104 105 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 106 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 107 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 108 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 109 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 110 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 111 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 112 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 113 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 114 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 115 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 116 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 117 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 118 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 119 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 120 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 121 private static final String CONF_COPY_MANIFEST_THREADS = 122 "snapshot.export.copy.references.threads"; 123 private static final int DEFAULT_COPY_MANIFEST_THREADS = 124 Runtime.getRuntime().availableProcessors(); 125 126 static class Testing { 127 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 128 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 129 int failuresCountToInject = 0; 130 int injectedFailureCount = 0; 131 } 132 133 // Command line options and defaults. 134 static final class Options { 135 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 136 static final Option TARGET_NAME = 137 new Option(null, "target", true, "Target name for the snapshot."); 138 static final Option COPY_TO = 139 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 140 static final Option COPY_FROM = 141 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 142 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 143 "Do not verify checksum, use name+length only."); 144 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 145 "Do not verify the exported snapshot's expiration status and integrity."); 146 static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false, 147 "Do not verify the source snapshot's expiration status and integrity."); 148 static final Option OVERWRITE = 149 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 150 static final Option CHUSER = 151 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 152 static final Option CHGROUP = 153 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 154 static final Option CHMOD = 155 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 156 static final Option MAPPERS = new Option(null, "mappers", true, 157 "Number of mappers to use during the copy (mapreduce.job.maps)."); 158 static final Option BANDWIDTH = 159 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 160 static final Option RESET_TTL = 161 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 162 } 163 164 // Export Map-Reduce Counters, to keep track of the progress 165 public enum Counter { 166 MISSING_FILES, 167 FILES_COPIED, 168 FILES_SKIPPED, 169 COPY_FAILED, 170 BYTES_EXPECTED, 171 BYTES_SKIPPED, 172 BYTES_COPIED 173 } 174 175 /** 176 * Indicates the checksum comparison result. 177 */ 178 public enum ChecksumComparison { 179 TRUE, // checksum comparison is compatible and true. 180 FALSE, // checksum comparison is compatible and false. 181 INCOMPATIBLE, // checksum comparison is not compatible. 182 } 183 184 private static class ExportMapper 185 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 186 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 187 final static int REPORT_SIZE = 1 * 1024 * 1024; 188 final static int BUFFER_SIZE = 64 * 1024; 189 190 private boolean verifyChecksum; 191 private String filesGroup; 192 private String filesUser; 193 private short filesMode; 194 private int bufferSize; 195 private int reportSize; 196 197 private FileSystem outputFs; 198 private Path outputArchive; 199 private Path outputRoot; 200 201 private FileSystem inputFs; 202 private Path inputArchive; 203 private Path inputRoot; 204 205 private static Testing testing = new Testing(); 206 207 @Override 208 public void setup(Context context) throws IOException { 209 Configuration conf = context.getConfiguration(); 210 211 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 212 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 213 214 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 215 216 filesGroup = conf.get(CONF_FILES_GROUP); 217 filesUser = conf.get(CONF_FILES_USER); 218 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 219 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 220 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 221 222 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 223 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 224 225 try { 226 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 227 } catch (IOException e) { 228 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 229 } 230 231 try { 232 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 233 } catch (IOException e) { 234 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 235 } 236 237 // Use the default block size of the outputFs if bigger 238 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 239 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 240 LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize)); 241 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 242 243 for (Counter c : Counter.values()) { 244 context.getCounter(c).increment(0); 245 } 246 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 247 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 248 // Get number of times we have already injected failure based on attempt number of this 249 // task. 250 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 251 } 252 } 253 254 @Override 255 public void map(BytesWritable key, NullWritable value, Context context) 256 throws InterruptedException, IOException { 257 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 258 Path outputPath = getOutputPath(inputInfo); 259 260 copyFile(context, inputInfo, outputPath); 261 } 262 263 /** 264 * Returns the location where the inputPath will be copied. 265 */ 266 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 267 Path path = null; 268 switch (inputInfo.getType()) { 269 case HFILE: 270 Path inputPath = new Path(inputInfo.getHfile()); 271 String family = inputPath.getParent().getName(); 272 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 273 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 274 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 275 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 276 new Path(region, new Path(family, hfile))); 277 break; 278 case WAL: 279 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 280 break; 281 default: 282 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 283 } 284 return new Path(outputArchive, path); 285 } 286 287 @SuppressWarnings("checkstyle:linelength") 288 /** 289 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 290 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 291 */ 292 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 293 throws IOException { 294 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 295 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 296 testing.injectedFailureCount++; 297 context.getCounter(Counter.COPY_FAILED).increment(1); 298 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 299 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 300 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 301 } 302 303 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 304 final Path outputPath) throws IOException { 305 // Get the file information 306 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 307 308 // Verify if the output file exists and is the same that we want to copy 309 if (outputFs.exists(outputPath)) { 310 FileStatus outputStat = outputFs.getFileStatus(outputPath); 311 if (outputStat != null && sameFile(inputStat, outputStat)) { 312 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 313 context.getCounter(Counter.FILES_SKIPPED).increment(1); 314 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 315 return; 316 } 317 } 318 319 InputStream in = openSourceFile(context, inputInfo); 320 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 321 if (Integer.MAX_VALUE != bandwidthMB) { 322 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 323 } 324 325 Path inputPath = inputStat.getPath(); 326 try { 327 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 328 329 // Ensure that the output folder is there and copy the file 330 createOutputPath(outputPath.getParent()); 331 FSDataOutputStream out = outputFs.create(outputPath, true); 332 333 long stime = EnvironmentEdgeManager.currentTime(); 334 long totalBytesWritten = 335 copyData(context, inputPath, in, outputPath, out, inputStat.getLen()); 336 337 // Verify the file length and checksum 338 verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath)); 339 340 long etime = EnvironmentEdgeManager.currentTime(); 341 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 342 LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten) 343 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec", 344 (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 345 context.getCounter(Counter.FILES_COPIED).increment(1); 346 347 // Try to Preserve attributes 348 if (!preserveAttributes(outputPath, inputStat)) { 349 LOG.warn("You may have to run manually chown on: " + outputPath); 350 } 351 } catch (IOException e) { 352 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 353 context.getCounter(Counter.COPY_FAILED).increment(1); 354 throw e; 355 } finally { 356 injectTestFailure(context, inputInfo); 357 } 358 } 359 360 /** 361 * Create the output folder and optionally set ownership. 362 */ 363 private void createOutputPath(final Path path) throws IOException { 364 if (filesUser == null && filesGroup == null) { 365 outputFs.mkdirs(path); 366 } else { 367 Path parent = path.getParent(); 368 if (!outputFs.exists(parent) && !parent.isRoot()) { 369 createOutputPath(parent); 370 } 371 outputFs.mkdirs(path); 372 if (filesUser != null || filesGroup != null) { 373 // override the owner when non-null user/group is specified 374 outputFs.setOwner(path, filesUser, filesGroup); 375 } 376 if (filesMode > 0) { 377 outputFs.setPermission(path, new FsPermission(filesMode)); 378 } 379 } 380 } 381 382 /** 383 * Try to Preserve the files attribute selected by the user copying them from the source file 384 * This is only required when you are exporting as a different user than "hbase" or on a system 385 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 386 * can force a chmod with the user that knows is available on the system. 387 */ 388 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 389 FileStatus stat; 390 try { 391 stat = outputFs.getFileStatus(path); 392 } catch (IOException e) { 393 LOG.warn("Unable to get the status for file=" + path); 394 return false; 395 } 396 397 try { 398 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 399 outputFs.setPermission(path, new FsPermission(filesMode)); 400 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 401 outputFs.setPermission(path, refStat.getPermission()); 402 } 403 } catch (IOException e) { 404 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 405 return false; 406 } 407 408 boolean hasRefStat = (refStat != null); 409 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 410 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 411 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 412 try { 413 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 414 outputFs.setOwner(path, user, group); 415 } 416 } catch (IOException e) { 417 LOG.warn( 418 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 419 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 420 + " group=" + group); 421 return false; 422 } 423 } 424 425 return true; 426 } 427 428 private boolean stringIsNotEmpty(final String str) { 429 return str != null && str.length() > 0; 430 } 431 432 private long copyData(final Context context, final Path inputPath, final InputStream in, 433 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 434 throws IOException { 435 final String statusMessage = 436 "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)"; 437 438 try { 439 byte[] buffer = new byte[bufferSize]; 440 long totalBytesWritten = 0; 441 int reportBytes = 0; 442 int bytesRead; 443 444 while ((bytesRead = in.read(buffer)) > 0) { 445 out.write(buffer, 0, bytesRead); 446 totalBytesWritten += bytesRead; 447 reportBytes += bytesRead; 448 449 if (reportBytes >= reportSize) { 450 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 451 context 452 .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 453 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 454 + " to " + outputPath); 455 reportBytes = 0; 456 } 457 } 458 459 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 460 context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 461 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 462 + outputPath); 463 464 return totalBytesWritten; 465 } finally { 466 out.close(); 467 in.close(); 468 } 469 } 470 471 /** 472 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 473 * fail or if the file is not found. 474 */ 475 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 476 throws IOException { 477 try { 478 Configuration conf = context.getConfiguration(); 479 FileLink link = null; 480 switch (fileInfo.getType()) { 481 case HFILE: 482 Path inputPath = new Path(fileInfo.getHfile()); 483 link = getFileLink(inputPath, conf); 484 break; 485 case WAL: 486 String serverName = fileInfo.getWalServer(); 487 String logName = fileInfo.getWalName(); 488 link = new WALLink(inputRoot, serverName, logName); 489 break; 490 default: 491 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 492 } 493 return link.open(inputFs); 494 } catch (IOException e) { 495 context.getCounter(Counter.MISSING_FILES).increment(1); 496 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 497 throw e; 498 } 499 } 500 501 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 502 throws IOException { 503 try { 504 Configuration conf = context.getConfiguration(); 505 FileLink link = null; 506 switch (fileInfo.getType()) { 507 case HFILE: 508 Path inputPath = new Path(fileInfo.getHfile()); 509 link = getFileLink(inputPath, conf); 510 break; 511 case WAL: 512 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 513 break; 514 default: 515 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 516 } 517 return link.getFileStatus(inputFs); 518 } catch (FileNotFoundException e) { 519 context.getCounter(Counter.MISSING_FILES).increment(1); 520 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 521 throw e; 522 } catch (IOException e) { 523 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 524 throw e; 525 } 526 } 527 528 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 529 String regionName = HFileLink.getReferencedRegionName(path.getName()); 530 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 531 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 532 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 533 HFileArchiveUtil.getArchivePath(conf), path); 534 } 535 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 536 } 537 538 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 539 try { 540 return fs.getFileChecksum(path); 541 } catch (IOException e) { 542 LOG.warn("Unable to get checksum for file=" + path, e); 543 return null; 544 } 545 } 546 547 /** 548 * Utility to compare the file length and checksums for the paths specified. 549 */ 550 private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat) 551 throws IOException { 552 long inputLen = inputStat.getLen(); 553 long outputLen = outputStat.getLen(); 554 Path inputPath = inputStat.getPath(); 555 Path outputPath = outputStat.getPath(); 556 557 if (inputLen != outputLen) { 558 throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen 559 + ") and output:" + outputPath + " (" + outputLen + ")"); 560 } 561 562 // If length==0, we will skip checksum 563 if (inputLen != 0 && verifyChecksum) { 564 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 565 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 566 567 ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum); 568 if (!checksumComparison.equals(ChecksumComparison.TRUE)) { 569 StringBuilder errMessage = new StringBuilder("Checksum mismatch between ") 570 .append(inputPath).append(" and ").append(outputPath).append("."); 571 572 boolean addSkipHint = false; 573 String inputScheme = inputFs.getScheme(); 574 String outputScheme = outputFs.getScheme(); 575 if (!inputScheme.equals(outputScheme)) { 576 errMessage.append(" Input and output filesystems are of different types.\n") 577 .append("Their checksum algorithms may be incompatible."); 578 addSkipHint = true; 579 } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) { 580 errMessage.append(" Input and output differ in block-size."); 581 addSkipHint = true; 582 } else if ( 583 inChecksum != null && outChecksum != null 584 && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 585 ) { 586 errMessage.append(" Input and output checksum algorithms are of different types."); 587 addSkipHint = true; 588 } 589 if (addSkipHint) { 590 errMessage 591 .append(" You can choose file-level checksum validation via " 592 + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" 593 + " or filesystems are different.\n") 594 .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,") 595 .append( 596 " for the table backup scenario, you should use -i option to skip checksum-checks.\n") 597 .append(" (NOTE: By skipping checksums, one runs the risk of " 598 + "masking data-corruption during file-transfer.)\n"); 599 } 600 throw new IOException(errMessage.toString()); 601 } 602 } 603 } 604 605 /** 606 * Utility to compare checksums 607 */ 608 private ChecksumComparison verifyChecksum(final FileChecksum inChecksum, 609 final FileChecksum outChecksum) { 610 // If the input or output checksum is null, or the algorithms of input and output are not 611 // equal, that means there is no comparison 612 // and return not compatible. else if matched, return compatible with the matched result. 613 if ( 614 inChecksum == null || outChecksum == null 615 || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 616 ) { 617 return ChecksumComparison.INCOMPATIBLE; 618 } else if (inChecksum.equals(outChecksum)) { 619 return ChecksumComparison.TRUE; 620 } 621 return ChecksumComparison.FALSE; 622 } 623 624 /** 625 * Check if the two files are equal by looking at the file length, and at the checksum (if user 626 * has specified the verifyChecksum flag). 627 */ 628 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 629 // Not matching length 630 if (inputStat.getLen() != outputStat.getLen()) return false; 631 632 // Mark files as equals, since user asked for no checksum verification 633 if (!verifyChecksum) return true; 634 635 // If checksums are not available, files are not the same. 636 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 637 if (inChecksum == null) return false; 638 639 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 640 if (outChecksum == null) return false; 641 642 return inChecksum.equals(outChecksum); 643 } 644 } 645 646 // ========================================================================== 647 // Input Format 648 // ========================================================================== 649 650 /** 651 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 652 * @return list of files referenced by the snapshot (pair of path and size) 653 */ 654 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 655 final FileSystem fs, final Path snapshotDir) throws IOException { 656 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 657 658 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 659 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 660 661 // Get snapshot files 662 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 663 Set<String> addedFiles = new HashSet<>(); 664 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 665 new SnapshotReferenceUtil.SnapshotVisitor() { 666 @Override 667 public void storeFile(final RegionInfo regionInfo, final String family, 668 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 669 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 670 if (!storeFile.hasReference()) { 671 String region = regionInfo.getEncodedName(); 672 String hfile = storeFile.getName(); 673 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 674 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 675 } else { 676 Pair<String, String> referredToRegionAndFile = 677 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 678 String referencedRegion = referredToRegionAndFile.getFirst(); 679 String referencedHFile = referredToRegionAndFile.getSecond(); 680 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 681 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 682 } 683 String fileToExport = snapshotFileAndSize.getFirst().getHfile(); 684 if (!addedFiles.contains(fileToExport)) { 685 files.add(snapshotFileAndSize); 686 addedFiles.add(fileToExport); 687 } else { 688 LOG.debug("Skip the existing file: {}.", fileToExport); 689 } 690 } 691 }); 692 693 return files; 694 } 695 696 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 697 Configuration conf, TableName table, String region, String family, String hfile, long size) 698 throws IOException { 699 Path path = HFileLink.createPath(table, region, family, hfile); 700 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 701 .setHfile(path.toString()).build(); 702 if (size == -1) { 703 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 704 } 705 return new Pair<>(fileInfo, size); 706 } 707 708 /** 709 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 710 * The groups created will have similar amounts of bytes. 711 * <p> 712 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 713 * group fetch the bigger file available, iterating through groups alternating the direction. 714 */ 715 static List<List<Pair<SnapshotFileInfo, Long>>> 716 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 717 // Sort files by size, from small to big 718 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 719 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 720 long r = a.getSecond() - b.getSecond(); 721 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 722 } 723 }); 724 725 // create balanced groups 726 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 727 long[] sizeGroups = new long[ngroups]; 728 int hi = files.size() - 1; 729 int lo = 0; 730 731 List<Pair<SnapshotFileInfo, Long>> group; 732 int dir = 1; 733 int g = 0; 734 735 while (hi >= lo) { 736 if (g == fileGroups.size()) { 737 group = new LinkedList<>(); 738 fileGroups.add(group); 739 } else { 740 group = fileGroups.get(g); 741 } 742 743 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 744 745 // add the hi one 746 sizeGroups[g] += fileInfo.getSecond(); 747 group.add(fileInfo); 748 749 // change direction when at the end or the beginning 750 g += dir; 751 if (g == ngroups) { 752 dir = -1; 753 g = ngroups - 1; 754 } else if (g < 0) { 755 dir = 1; 756 g = 0; 757 } 758 } 759 760 if (LOG.isDebugEnabled()) { 761 for (int i = 0; i < sizeGroups.length; ++i) { 762 LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i])); 763 } 764 } 765 766 return fileGroups; 767 } 768 769 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 770 @Override 771 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 772 TaskAttemptContext tac) throws IOException, InterruptedException { 773 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 774 } 775 776 @Override 777 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 778 Configuration conf = context.getConfiguration(); 779 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 780 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 781 782 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 783 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 784 if (mappers == 0 && snapshotFiles.size() > 0) { 785 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 786 mappers = Math.min(mappers, snapshotFiles.size()); 787 conf.setInt(CONF_NUM_SPLITS, mappers); 788 conf.setInt(MR_NUM_MAPS, mappers); 789 } 790 791 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 792 List<InputSplit> splits = new ArrayList(groups.size()); 793 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 794 splits.add(new ExportSnapshotInputSplit(files)); 795 } 796 return splits; 797 } 798 799 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 800 private List<Pair<BytesWritable, Long>> files; 801 private long length; 802 803 public ExportSnapshotInputSplit() { 804 this.files = null; 805 } 806 807 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 808 this.files = new ArrayList(snapshotFiles.size()); 809 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 810 this.files.add( 811 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 812 this.length += fileInfo.getSecond(); 813 } 814 } 815 816 private List<Pair<BytesWritable, Long>> getSplitKeys() { 817 return files; 818 } 819 820 @Override 821 public long getLength() throws IOException, InterruptedException { 822 return length; 823 } 824 825 @Override 826 public String[] getLocations() throws IOException, InterruptedException { 827 return new String[] {}; 828 } 829 830 @Override 831 public void readFields(DataInput in) throws IOException { 832 int count = in.readInt(); 833 files = new ArrayList<>(count); 834 length = 0; 835 for (int i = 0; i < count; ++i) { 836 BytesWritable fileInfo = new BytesWritable(); 837 fileInfo.readFields(in); 838 long size = in.readLong(); 839 files.add(new Pair<>(fileInfo, size)); 840 length += size; 841 } 842 } 843 844 @Override 845 public void write(DataOutput out) throws IOException { 846 out.writeInt(files.size()); 847 for (final Pair<BytesWritable, Long> fileInfo : files) { 848 fileInfo.getFirst().write(out); 849 out.writeLong(fileInfo.getSecond()); 850 } 851 } 852 } 853 854 private static class ExportSnapshotRecordReader 855 extends RecordReader<BytesWritable, NullWritable> { 856 private final List<Pair<BytesWritable, Long>> files; 857 private long totalSize = 0; 858 private long procSize = 0; 859 private int index = -1; 860 861 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 862 this.files = files; 863 for (Pair<BytesWritable, Long> fileInfo : files) { 864 totalSize += fileInfo.getSecond(); 865 } 866 } 867 868 @Override 869 public void close() { 870 } 871 872 @Override 873 public BytesWritable getCurrentKey() { 874 return files.get(index).getFirst(); 875 } 876 877 @Override 878 public NullWritable getCurrentValue() { 879 return NullWritable.get(); 880 } 881 882 @Override 883 public float getProgress() { 884 return (float) procSize / totalSize; 885 } 886 887 @Override 888 public void initialize(InputSplit split, TaskAttemptContext tac) { 889 } 890 891 @Override 892 public boolean nextKeyValue() { 893 if (index >= 0) { 894 procSize += files.get(index).getSecond(); 895 } 896 return (++index < files.size()); 897 } 898 } 899 } 900 901 // ========================================================================== 902 // Tool 903 // ========================================================================== 904 905 /** 906 * Run Map-Reduce Job to perform the files copy. 907 */ 908 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 909 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 910 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 911 throws IOException, InterruptedException, ClassNotFoundException { 912 Configuration conf = getConf(); 913 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 914 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 915 if (mappers > 0) { 916 conf.setInt(CONF_NUM_SPLITS, mappers); 917 conf.setInt(MR_NUM_MAPS, mappers); 918 } 919 conf.setInt(CONF_FILES_MODE, filesMode); 920 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 921 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 922 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 923 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 924 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 925 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 926 927 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 928 Job job = new Job(conf); 929 job.setJobName(jobname); 930 job.setJarByClass(ExportSnapshot.class); 931 TableMapReduceUtil.addDependencyJars(job); 932 job.setMapperClass(ExportMapper.class); 933 job.setInputFormatClass(ExportSnapshotInputFormat.class); 934 job.setOutputFormatClass(NullOutputFormat.class); 935 job.setMapSpeculativeExecution(false); 936 job.setNumReduceTasks(0); 937 938 // Acquire the delegation Tokens 939 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 940 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 941 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 942 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 943 944 // Run the MR Job 945 if (!job.waitForCompletion(true)) { 946 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 947 } 948 } 949 950 private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf, 951 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 952 // Update the conf with the current root dir, since may be a different cluster 953 Configuration conf = new Configuration(baseConf); 954 CommonFSUtils.setRootDir(conf, rootDir); 955 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 956 boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), 957 snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); 958 if (isExpired) { 959 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); 960 } 961 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 962 } 963 964 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 965 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 966 ExecutorService pool = Executors 967 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 968 List<Future<Void>> futures = new ArrayList<>(); 969 for (Path dstPath : traversedPath) { 970 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 971 futures.add(future); 972 } 973 try { 974 for (Future<Void> future : futures) { 975 future.get(); 976 } 977 } catch (InterruptedException | ExecutionException e) { 978 throw new IOException(e); 979 } finally { 980 pool.shutdownNow(); 981 } 982 } 983 984 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 985 Configuration conf, List<Path> traversedPath) throws IOException { 986 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 987 try { 988 fs.setOwner(path, filesUser, filesGroup); 989 } catch (IOException e) { 990 throw new RuntimeException( 991 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 992 } 993 }, conf); 994 } 995 996 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 997 final List<Path> traversedPath, final Configuration conf) throws IOException { 998 if (filesMode <= 0) { 999 return; 1000 } 1001 FsPermission perm = new FsPermission(filesMode); 1002 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1003 try { 1004 fs.setPermission(path, perm); 1005 } catch (IOException e) { 1006 throw new RuntimeException( 1007 "set permission for file " + path + " to " + filesMode + " failed", e); 1008 } 1009 }, conf); 1010 } 1011 1012 private boolean verifyTarget = true; 1013 private boolean verifySource = true; 1014 private boolean verifyChecksum = true; 1015 private String snapshotName = null; 1016 private String targetName = null; 1017 private boolean overwrite = false; 1018 private String filesGroup = null; 1019 private String filesUser = null; 1020 private Path outputRoot = null; 1021 private Path inputRoot = null; 1022 private int bandwidthMB = Integer.MAX_VALUE; 1023 private int filesMode = 0; 1024 private int mappers = 0; 1025 private boolean resetTtl = false; 1026 1027 @Override 1028 protected void processOptions(CommandLine cmd) { 1029 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 1030 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 1031 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 1032 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 1033 } 1034 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 1035 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 1036 } 1037 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 1038 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 1039 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 1040 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 1041 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 1042 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 1043 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 1044 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 1045 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 1046 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 1047 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 1048 } 1049 1050 /** 1051 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 1052 * @return 0 on success, and != 0 upon failure. 1053 */ 1054 @Override 1055 public int doWork() throws IOException { 1056 Configuration conf = getConf(); 1057 1058 // Check user options 1059 if (snapshotName == null) { 1060 System.err.println("Snapshot name not provided."); 1061 LOG.error("Use -h or --help for usage instructions."); 1062 return EXIT_FAILURE; 1063 } 1064 1065 if (outputRoot == null) { 1066 System.err 1067 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 1068 LOG.error("Use -h or --help for usage instructions."); 1069 return EXIT_FAILURE; 1070 } 1071 1072 if (targetName == null) { 1073 targetName = snapshotName; 1074 } 1075 if (inputRoot == null) { 1076 inputRoot = CommonFSUtils.getRootDir(conf); 1077 } else { 1078 CommonFSUtils.setRootDir(conf, inputRoot); 1079 } 1080 1081 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1082 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 1083 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1084 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 1085 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 1086 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1087 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1088 Path snapshotTmpDir = 1089 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1090 Path outputSnapshotDir = 1091 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1092 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1093 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1094 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1095 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1096 1097 // throw CorruptedSnapshotException if we can't read the snapshot info. 1098 SnapshotDescription sourceSnapshotDesc = 1099 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir); 1100 1101 // Verify snapshot source before copying files 1102 if (verifySource) { 1103 LOG.info("Verify the source snapshot's expiration status and integrity."); 1104 verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir); 1105 } 1106 1107 // Find the necessary directory which need to change owner and group 1108 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1109 if (outputFs.exists(needSetOwnerDir)) { 1110 if (skipTmp) { 1111 needSetOwnerDir = outputSnapshotDir; 1112 } else { 1113 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1114 if (outputFs.exists(needSetOwnerDir)) { 1115 needSetOwnerDir = snapshotTmpDir; 1116 } 1117 } 1118 } 1119 1120 // Check if the snapshot already exists 1121 if (outputFs.exists(outputSnapshotDir)) { 1122 if (overwrite) { 1123 if (!outputFs.delete(outputSnapshotDir, true)) { 1124 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1125 return EXIT_FAILURE; 1126 } 1127 } else { 1128 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1129 + outputSnapshotDir); 1130 return EXIT_FAILURE; 1131 } 1132 } 1133 1134 if (!skipTmp) { 1135 // Check if the snapshot already in-progress 1136 if (outputFs.exists(snapshotTmpDir)) { 1137 if (overwrite) { 1138 if (!outputFs.delete(snapshotTmpDir, true)) { 1139 System.err 1140 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1141 return EXIT_FAILURE; 1142 } 1143 } else { 1144 System.err 1145 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1146 System.err 1147 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1148 System.err 1149 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1150 return EXIT_FAILURE; 1151 } 1152 } 1153 } 1154 1155 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1156 // The snapshot references must be copied before the hfiles otherwise the cleaner 1157 // will remove them because they are unreferenced. 1158 List<Path> travesedPaths = new ArrayList<>(); 1159 boolean copySucceeded = false; 1160 try { 1161 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1162 travesedPaths = 1163 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1164 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1165 copySucceeded = true; 1166 } catch (IOException e) { 1167 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1168 + " to=" + initialOutputSnapshotDir, e); 1169 } finally { 1170 if (copySucceeded) { 1171 if (filesUser != null || filesGroup != null) { 1172 LOG.warn( 1173 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1174 + (filesGroup == null 1175 ? "" 1176 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1177 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1178 } 1179 if (filesMode > 0) { 1180 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1181 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1182 } 1183 } 1184 } 1185 1186 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1187 // reset TTL for target snapshot. 1188 if (!targetName.equals(snapshotName) || resetTtl) { 1189 SnapshotDescription.Builder snapshotDescBuilder = 1190 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1191 if (!targetName.equals(snapshotName)) { 1192 snapshotDescBuilder.setName(targetName); 1193 } 1194 if (resetTtl) { 1195 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1196 } 1197 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1198 initialOutputSnapshotDir, outputFs); 1199 if (filesUser != null || filesGroup != null) { 1200 outputFs.setOwner( 1201 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1202 filesGroup); 1203 } 1204 if (filesMode > 0) { 1205 outputFs.setPermission( 1206 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1207 new FsPermission((short) filesMode)); 1208 } 1209 } 1210 1211 // Step 2 - Start MR Job to copy files 1212 // The snapshot references must be copied before the files otherwise the files gets removed 1213 // by the HFileArchiver, since they have no references. 1214 try { 1215 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1216 filesGroup, filesMode, mappers, bandwidthMB); 1217 1218 LOG.info("Finalize the Snapshot Export"); 1219 if (!skipTmp) { 1220 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1221 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1222 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1223 + snapshotTmpDir + " to=" + outputSnapshotDir); 1224 } 1225 } 1226 1227 // Step 4 - Verify snapshot integrity 1228 if (verifyTarget) { 1229 LOG.info("Verify the exported snapshot's expiration status and integrity."); 1230 SnapshotDescription targetSnapshotDesc = 1231 SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir); 1232 verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir); 1233 } 1234 1235 LOG.info("Export Completed: " + targetName); 1236 return EXIT_SUCCESS; 1237 } catch (Exception e) { 1238 LOG.error("Snapshot export failed", e); 1239 if (!skipTmp) { 1240 outputFs.delete(snapshotTmpDir, true); 1241 } 1242 outputFs.delete(outputSnapshotDir, true); 1243 return EXIT_FAILURE; 1244 } 1245 } 1246 1247 @Override 1248 protected void printUsage() { 1249 super.printUsage(); 1250 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1251 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1252 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1253 + " hbase snapshot export \\\n" 1254 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1255 + " --copy-to hdfs://srv1:50070/hbase"); 1256 } 1257 1258 @Override 1259 protected void addOptions() { 1260 addRequiredOption(Options.SNAPSHOT); 1261 addOption(Options.COPY_TO); 1262 addOption(Options.COPY_FROM); 1263 addOption(Options.TARGET_NAME); 1264 addOption(Options.NO_CHECKSUM_VERIFY); 1265 addOption(Options.NO_TARGET_VERIFY); 1266 addOption(Options.NO_SOURCE_VERIFY); 1267 addOption(Options.OVERWRITE); 1268 addOption(Options.CHUSER); 1269 addOption(Options.CHGROUP); 1270 addOption(Options.CHMOD); 1271 addOption(Options.MAPPERS); 1272 addOption(Options.BANDWIDTH); 1273 addOption(Options.RESET_TTL); 1274 } 1275 1276 public static void main(String[] args) { 1277 new ExportSnapshot().doStaticMain(args); 1278 } 1279}