001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.snapshot; 020 021import java.io.BufferedInputStream; 022import java.io.DataInput; 023import java.io.DataOutput; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import java.util.function.BiConsumer; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.fs.FSDataInputStream; 039import org.apache.hadoop.fs.FSDataOutputStream; 040import org.apache.hadoop.fs.FileChecksum; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.fs.permission.FsPermission; 045import org.apache.hadoop.hbase.HBaseConfiguration; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.io.FileLink; 050import org.apache.hadoop.hbase.io.HFileLink; 051import org.apache.hadoop.hbase.io.WALLink; 052import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 053import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 054import org.apache.hadoop.hbase.mob.MobUtils; 055import org.apache.hadoop.hbase.util.AbstractHBaseTool; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.FSUtils; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.apache.hadoop.hbase.util.Pair; 060import org.apache.hadoop.io.BytesWritable; 061import org.apache.hadoop.io.IOUtils; 062import org.apache.hadoop.io.NullWritable; 063import org.apache.hadoop.io.Writable; 064import org.apache.hadoop.mapreduce.InputFormat; 065import org.apache.hadoop.mapreduce.InputSplit; 066import org.apache.hadoop.mapreduce.Job; 067import org.apache.hadoop.mapreduce.JobContext; 068import org.apache.hadoop.mapreduce.Mapper; 069import org.apache.hadoop.mapreduce.RecordReader; 070import org.apache.hadoop.mapreduce.TaskAttemptContext; 071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 072import org.apache.hadoop.mapreduce.security.TokenCache; 073import org.apache.hadoop.util.StringUtils; 074import org.apache.hadoop.util.Tool; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 085 086/** 087 * Export the specified snapshot to a given FileSystem. 088 * 089 * The .snapshot/name folder is copied to the destination cluster 090 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location. 091 * When everything is done, the second cluster can restore the snapshot. 092 */ 093@InterfaceAudience.Public 094public class ExportSnapshot extends AbstractHBaseTool implements Tool { 095 public static final String NAME = "exportsnapshot"; 096 /** Configuration prefix for overrides for the source filesystem */ 097 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 098 /** Configuration prefix for overrides for the destination filesystem */ 099 public static final String CONF_DEST_PREFIX = NAME + ".to."; 100 101 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 102 103 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 104 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 105 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 106 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 107 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 108 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 109 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 110 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 111 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 112 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 113 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 114 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 115 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 116 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 117 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 118 private static final String CONF_COPY_MANIFEST_THREADS = 119 "snapshot.export.copy.references.threads"; 120 private static final int DEFAULT_COPY_MANIFEST_THREADS = 121 Runtime.getRuntime().availableProcessors(); 122 123 static class Testing { 124 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 125 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 126 int failuresCountToInject = 0; 127 int injectedFailureCount = 0; 128 } 129 130 // Command line options and defaults. 131 static final class Options { 132 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 133 static final Option TARGET_NAME = new Option(null, "target", true, 134 "Target name for the snapshot."); 135 static final Option COPY_TO = new Option(null, "copy-to", true, "Remote " 136 + "destination hdfs://"); 137 static final Option COPY_FROM = new Option(null, "copy-from", true, 138 "Input folder hdfs:// (default hbase.rootdir)"); 139 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 140 "Do not verify checksum, use name+length only."); 141 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 142 "Do not verify the integrity of the exported snapshot."); 143 static final Option OVERWRITE = new Option(null, "overwrite", false, 144 "Rewrite the snapshot manifest if already exists."); 145 static final Option CHUSER = new Option(null, "chuser", true, 146 "Change the owner of the files to the specified one."); 147 static final Option CHGROUP = new Option(null, "chgroup", true, 148 "Change the group of the files to the specified one."); 149 static final Option CHMOD = new Option(null, "chmod", true, 150 "Change the permission of the files to the specified one."); 151 static final Option MAPPERS = new Option(null, "mappers", true, 152 "Number of mappers to use during the copy (mapreduce.job.maps)."); 153 static final Option BANDWIDTH = new Option(null, "bandwidth", true, 154 "Limit bandwidth to this value in MB/second."); 155 } 156 157 // Export Map-Reduce Counters, to keep track of the progress 158 public enum Counter { 159 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED, 160 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED 161 } 162 163 private static class ExportMapper extends Mapper<BytesWritable, NullWritable, 164 NullWritable, NullWritable> { 165 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 166 final static int REPORT_SIZE = 1 * 1024 * 1024; 167 final static int BUFFER_SIZE = 64 * 1024; 168 169 private boolean verifyChecksum; 170 private String filesGroup; 171 private String filesUser; 172 private short filesMode; 173 private int bufferSize; 174 175 private FileSystem outputFs; 176 private Path outputArchive; 177 private Path outputRoot; 178 179 private FileSystem inputFs; 180 private Path inputArchive; 181 private Path inputRoot; 182 183 private static Testing testing = new Testing(); 184 185 @Override 186 public void setup(Context context) throws IOException { 187 Configuration conf = context.getConfiguration(); 188 189 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 190 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 191 192 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 193 194 filesGroup = conf.get(CONF_FILES_GROUP); 195 filesUser = conf.get(CONF_FILES_USER); 196 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); 197 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 198 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 199 200 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 201 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 202 203 try { 204 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 205 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 206 } catch (IOException e) { 207 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 208 } 209 210 try { 211 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 212 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 213 } catch (IOException e) { 214 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); 215 } 216 217 // Use the default block size of the outputFs if bigger 218 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 219 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 220 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 221 222 for (Counter c : Counter.values()) { 223 context.getCounter(c).increment(0); 224 } 225 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 226 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 227 // Get number of times we have already injected failure based on attempt number of this 228 // task. 229 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 230 } 231 } 232 233 @Override 234 protected void cleanup(Context context) { 235 IOUtils.closeStream(inputFs); 236 IOUtils.closeStream(outputFs); 237 } 238 239 @Override 240 public void map(BytesWritable key, NullWritable value, Context context) 241 throws InterruptedException, IOException { 242 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 243 Path outputPath = getOutputPath(inputInfo); 244 245 copyFile(context, inputInfo, outputPath); 246 } 247 248 /** 249 * Returns the location where the inputPath will be copied. 250 */ 251 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 252 Path path = null; 253 switch (inputInfo.getType()) { 254 case HFILE: 255 Path inputPath = new Path(inputInfo.getHfile()); 256 String family = inputPath.getParent().getName(); 257 TableName table =HFileLink.getReferencedTableName(inputPath.getName()); 258 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 259 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 260 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 261 new Path(region, new Path(family, hfile))); 262 break; 263 case WAL: 264 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 265 break; 266 default: 267 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 268 } 269 return new Path(outputArchive, path); 270 } 271 272 @SuppressWarnings("checkstyle:linelength") 273 /** 274 * Used by TestExportSnapshot to test for retries when failures happen. 275 * Failure is injected in {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 276 */ 277 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 278 throws IOException { 279 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 280 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 281 testing.injectedFailureCount++; 282 context.getCounter(Counter.COPY_FAILED).increment(1); 283 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 284 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 285 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 286 } 287 288 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 289 final Path outputPath) throws IOException { 290 // Get the file information 291 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 292 293 // Verify if the output file exists and is the same that we want to copy 294 if (outputFs.exists(outputPath)) { 295 FileStatus outputStat = outputFs.getFileStatus(outputPath); 296 if (outputStat != null && sameFile(inputStat, outputStat)) { 297 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 298 context.getCounter(Counter.FILES_SKIPPED).increment(1); 299 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 300 return; 301 } 302 } 303 304 InputStream in = openSourceFile(context, inputInfo); 305 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 306 if (Integer.MAX_VALUE != bandwidthMB) { 307 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 308 } 309 310 try { 311 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 312 313 // Ensure that the output folder is there and copy the file 314 createOutputPath(outputPath.getParent()); 315 FSDataOutputStream out = outputFs.create(outputPath, true); 316 try { 317 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); 318 } finally { 319 out.close(); 320 } 321 322 // Try to Preserve attributes 323 if (!preserveAttributes(outputPath, inputStat)) { 324 LOG.warn("You may have to run manually chown on: " + outputPath); 325 } 326 } finally { 327 in.close(); 328 injectTestFailure(context, inputInfo); 329 } 330 } 331 332 /** 333 * Create the output folder and optionally set ownership. 334 */ 335 private void createOutputPath(final Path path) throws IOException { 336 if (filesUser == null && filesGroup == null) { 337 outputFs.mkdirs(path); 338 } else { 339 Path parent = path.getParent(); 340 if (!outputFs.exists(parent) && !parent.isRoot()) { 341 createOutputPath(parent); 342 } 343 outputFs.mkdirs(path); 344 if (filesUser != null || filesGroup != null) { 345 // override the owner when non-null user/group is specified 346 outputFs.setOwner(path, filesUser, filesGroup); 347 } 348 if (filesMode > 0) { 349 outputFs.setPermission(path, new FsPermission(filesMode)); 350 } 351 } 352 } 353 354 /** 355 * Try to Preserve the files attribute selected by the user copying them from the source file 356 * This is only required when you are exporting as a different user than "hbase" or on a system 357 * that doesn't have the "hbase" user. 358 * 359 * This is not considered a blocking failure since the user can force a chmod with the user 360 * that knows is available on the system. 361 */ 362 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 363 FileStatus stat; 364 try { 365 stat = outputFs.getFileStatus(path); 366 } catch (IOException e) { 367 LOG.warn("Unable to get the status for file=" + path); 368 return false; 369 } 370 371 try { 372 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 373 outputFs.setPermission(path, new FsPermission(filesMode)); 374 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 375 outputFs.setPermission(path, refStat.getPermission()); 376 } 377 } catch (IOException e) { 378 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); 379 return false; 380 } 381 382 boolean hasRefStat = (refStat != null); 383 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 384 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 385 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 386 try { 387 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 388 outputFs.setOwner(path, user, group); 389 } 390 } catch (IOException e) { 391 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); 392 LOG.warn("The user/group may not exist on the destination cluster: user=" + 393 user + " group=" + group); 394 return false; 395 } 396 } 397 398 return true; 399 } 400 401 private boolean stringIsNotEmpty(final String str) { 402 return str != null && str.length() > 0; 403 } 404 405 private void copyData(final Context context, 406 final Path inputPath, final InputStream in, 407 final Path outputPath, final FSDataOutputStream out, 408 final long inputFileSize) 409 throws IOException { 410 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + 411 " (%.1f%%)"; 412 413 try { 414 byte[] buffer = new byte[bufferSize]; 415 long totalBytesWritten = 0; 416 int reportBytes = 0; 417 int bytesRead; 418 419 long stime = System.currentTimeMillis(); 420 while ((bytesRead = in.read(buffer)) > 0) { 421 out.write(buffer, 0, bytesRead); 422 totalBytesWritten += bytesRead; 423 reportBytes += bytesRead; 424 425 if (reportBytes >= REPORT_SIZE) { 426 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 427 context.setStatus(String.format(statusMessage, 428 StringUtils.humanReadableInt(totalBytesWritten), 429 (totalBytesWritten/(float)inputFileSize) * 100.0f) + 430 " from " + inputPath + " to " + outputPath); 431 reportBytes = 0; 432 } 433 } 434 long etime = System.currentTimeMillis(); 435 436 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 437 context.setStatus(String.format(statusMessage, 438 StringUtils.humanReadableInt(totalBytesWritten), 439 (totalBytesWritten/(float)inputFileSize) * 100.0f) + 440 " from " + inputPath + " to " + outputPath); 441 442 // Verify that the written size match 443 if (totalBytesWritten != inputFileSize) { 444 String msg = "number of bytes copied not matching copied=" + totalBytesWritten + 445 " expected=" + inputFileSize + " for file=" + inputPath; 446 throw new IOException(msg); 447 } 448 449 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 450 LOG.info("size=" + totalBytesWritten + 451 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" + 452 " time=" + StringUtils.formatTimeDiff(etime, stime) + 453 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0)); 454 context.getCounter(Counter.FILES_COPIED).increment(1); 455 } catch (IOException e) { 456 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 457 context.getCounter(Counter.COPY_FAILED).increment(1); 458 throw e; 459 } 460 } 461 462 /** 463 * Try to open the "source" file. 464 * Throws an IOException if the communication with the inputFs fail or 465 * if the file is not found. 466 */ 467 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 468 throws IOException { 469 try { 470 Configuration conf = context.getConfiguration(); 471 FileLink link = null; 472 switch (fileInfo.getType()) { 473 case HFILE: 474 Path inputPath = new Path(fileInfo.getHfile()); 475 link = getFileLink(inputPath, conf); 476 break; 477 case WAL: 478 String serverName = fileInfo.getWalServer(); 479 String logName = fileInfo.getWalName(); 480 link = new WALLink(inputRoot, serverName, logName); 481 break; 482 default: 483 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 484 } 485 return link.open(inputFs); 486 } catch (IOException e) { 487 context.getCounter(Counter.MISSING_FILES).increment(1); 488 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 489 throw e; 490 } 491 } 492 493 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 494 throws IOException { 495 try { 496 Configuration conf = context.getConfiguration(); 497 FileLink link = null; 498 switch (fileInfo.getType()) { 499 case HFILE: 500 Path inputPath = new Path(fileInfo.getHfile()); 501 link = getFileLink(inputPath, conf); 502 break; 503 case WAL: 504 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 505 break; 506 default: 507 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 508 } 509 return link.getFileStatus(inputFs); 510 } catch (FileNotFoundException e) { 511 context.getCounter(Counter.MISSING_FILES).increment(1); 512 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 513 throw e; 514 } catch (IOException e) { 515 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 516 throw e; 517 } 518 } 519 520 private FileLink getFileLink(Path path, Configuration conf) throws IOException{ 521 String regionName = HFileLink.getReferencedRegionName(path.getName()); 522 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 523 if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 524 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 525 HFileArchiveUtil.getArchivePath(conf), path); 526 } 527 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 528 } 529 530 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 531 try { 532 return fs.getFileChecksum(path); 533 } catch (IOException e) { 534 LOG.warn("Unable to get checksum for file=" + path, e); 535 return null; 536 } 537 } 538 539 /** 540 * Check if the two files are equal by looking at the file length, 541 * and at the checksum (if user has specified the verifyChecksum flag). 542 */ 543 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 544 // Not matching length 545 if (inputStat.getLen() != outputStat.getLen()) return false; 546 547 // Mark files as equals, since user asked for no checksum verification 548 if (!verifyChecksum) return true; 549 550 // If checksums are not available, files are not the same. 551 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 552 if (inChecksum == null) return false; 553 554 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 555 if (outChecksum == null) return false; 556 557 return inChecksum.equals(outChecksum); 558 } 559 } 560 561 // ========================================================================== 562 // Input Format 563 // ========================================================================== 564 565 /** 566 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 567 * @return list of files referenced by the snapshot (pair of path and size) 568 */ 569 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 570 final FileSystem fs, final Path snapshotDir) throws IOException { 571 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 572 573 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 574 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 575 576 // Get snapshot files 577 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 578 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 579 new SnapshotReferenceUtil.SnapshotVisitor() { 580 @Override 581 public void storeFile(final RegionInfo regionInfo, final String family, 582 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 583 // for storeFile.hasReference() case, copied as part of the manifest 584 if (!storeFile.hasReference()) { 585 String region = regionInfo.getEncodedName(); 586 String hfile = storeFile.getName(); 587 Path path = HFileLink.createPath(table, region, family, hfile); 588 589 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder() 590 .setType(SnapshotFileInfo.Type.HFILE) 591 .setHfile(path.toString()) 592 .build(); 593 594 long size; 595 if (storeFile.hasFileSize()) { 596 size = storeFile.getFileSize(); 597 } else { 598 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 599 } 600 files.add(new Pair<>(fileInfo, size)); 601 } 602 } 603 }); 604 605 return files; 606 } 607 608 /** 609 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 610 * The groups created will have similar amounts of bytes. 611 * <p> 612 * The algorithm used is pretty straightforward; the file list is sorted by size, 613 * and then each group fetch the bigger file available, iterating through groups 614 * alternating the direction. 615 */ 616 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( 617 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 618 // Sort files by size, from small to big 619 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 620 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 621 long r = a.getSecond() - b.getSecond(); 622 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 623 } 624 }); 625 626 // create balanced groups 627 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 628 long[] sizeGroups = new long[ngroups]; 629 int hi = files.size() - 1; 630 int lo = 0; 631 632 List<Pair<SnapshotFileInfo, Long>> group; 633 int dir = 1; 634 int g = 0; 635 636 while (hi >= lo) { 637 if (g == fileGroups.size()) { 638 group = new LinkedList<>(); 639 fileGroups.add(group); 640 } else { 641 group = fileGroups.get(g); 642 } 643 644 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 645 646 // add the hi one 647 sizeGroups[g] += fileInfo.getSecond(); 648 group.add(fileInfo); 649 650 // change direction when at the end or the beginning 651 g += dir; 652 if (g == ngroups) { 653 dir = -1; 654 g = ngroups - 1; 655 } else if (g < 0) { 656 dir = 1; 657 g = 0; 658 } 659 } 660 661 if (LOG.isDebugEnabled()) { 662 for (int i = 0; i < sizeGroups.length; ++i) { 663 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 664 } 665 } 666 667 return fileGroups; 668 } 669 670 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 671 @Override 672 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 673 TaskAttemptContext tac) throws IOException, InterruptedException { 674 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys()); 675 } 676 677 @Override 678 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 679 Configuration conf = context.getConfiguration(); 680 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 681 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 682 683 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 684 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 685 if (mappers == 0 && snapshotFiles.size() > 0) { 686 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 687 mappers = Math.min(mappers, snapshotFiles.size()); 688 conf.setInt(CONF_NUM_SPLITS, mappers); 689 conf.setInt(MR_NUM_MAPS, mappers); 690 } 691 692 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 693 List<InputSplit> splits = new ArrayList(groups.size()); 694 for (List<Pair<SnapshotFileInfo, Long>> files: groups) { 695 splits.add(new ExportSnapshotInputSplit(files)); 696 } 697 return splits; 698 } 699 700 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 701 private List<Pair<BytesWritable, Long>> files; 702 private long length; 703 704 public ExportSnapshotInputSplit() { 705 this.files = null; 706 } 707 708 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 709 this.files = new ArrayList(snapshotFiles.size()); 710 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) { 711 this.files.add(new Pair<>( 712 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 713 this.length += fileInfo.getSecond(); 714 } 715 } 716 717 private List<Pair<BytesWritable, Long>> getSplitKeys() { 718 return files; 719 } 720 721 @Override 722 public long getLength() throws IOException, InterruptedException { 723 return length; 724 } 725 726 @Override 727 public String[] getLocations() throws IOException, InterruptedException { 728 return new String[] {}; 729 } 730 731 @Override 732 public void readFields(DataInput in) throws IOException { 733 int count = in.readInt(); 734 files = new ArrayList<>(count); 735 length = 0; 736 for (int i = 0; i < count; ++i) { 737 BytesWritable fileInfo = new BytesWritable(); 738 fileInfo.readFields(in); 739 long size = in.readLong(); 740 files.add(new Pair<>(fileInfo, size)); 741 length += size; 742 } 743 } 744 745 @Override 746 public void write(DataOutput out) throws IOException { 747 out.writeInt(files.size()); 748 for (final Pair<BytesWritable, Long> fileInfo: files) { 749 fileInfo.getFirst().write(out); 750 out.writeLong(fileInfo.getSecond()); 751 } 752 } 753 } 754 755 private static class ExportSnapshotRecordReader 756 extends RecordReader<BytesWritable, NullWritable> { 757 private final List<Pair<BytesWritable, Long>> files; 758 private long totalSize = 0; 759 private long procSize = 0; 760 private int index = -1; 761 762 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 763 this.files = files; 764 for (Pair<BytesWritable, Long> fileInfo: files) { 765 totalSize += fileInfo.getSecond(); 766 } 767 } 768 769 @Override 770 public void close() { } 771 772 @Override 773 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); } 774 775 @Override 776 public NullWritable getCurrentValue() { return NullWritable.get(); } 777 778 @Override 779 public float getProgress() { return (float)procSize / totalSize; } 780 781 @Override 782 public void initialize(InputSplit split, TaskAttemptContext tac) { } 783 784 @Override 785 public boolean nextKeyValue() { 786 if (index >= 0) { 787 procSize += files.get(index).getSecond(); 788 } 789 return(++index < files.size()); 790 } 791 } 792 } 793 794 // ========================================================================== 795 // Tool 796 // ========================================================================== 797 798 /** 799 * Run Map-Reduce Job to perform the files copy. 800 */ 801 private void runCopyJob(final Path inputRoot, final Path outputRoot, 802 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, 803 final String filesUser, final String filesGroup, final int filesMode, 804 final int mappers, final int bandwidthMB) 805 throws IOException, InterruptedException, ClassNotFoundException { 806 Configuration conf = getConf(); 807 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 808 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 809 if (mappers > 0) { 810 conf.setInt(CONF_NUM_SPLITS, mappers); 811 conf.setInt(MR_NUM_MAPS, mappers); 812 } 813 conf.setInt(CONF_FILES_MODE, filesMode); 814 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 815 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 816 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 817 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 818 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 819 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 820 821 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 822 Job job = new Job(conf); 823 job.setJobName(jobname); 824 job.setJarByClass(ExportSnapshot.class); 825 TableMapReduceUtil.addDependencyJars(job); 826 job.setMapperClass(ExportMapper.class); 827 job.setInputFormatClass(ExportSnapshotInputFormat.class); 828 job.setOutputFormatClass(NullOutputFormat.class); 829 job.setMapSpeculativeExecution(false); 830 job.setNumReduceTasks(0); 831 832 // Acquire the delegation Tokens 833 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 834 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 835 new Path[] { inputRoot }, srcConf); 836 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 837 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 838 new Path[] { outputRoot }, destConf); 839 840 // Run the MR Job 841 if (!job.waitForCompletion(true)) { 842 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 843 } 844 } 845 846 private void verifySnapshot(final Configuration baseConf, 847 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 848 // Update the conf with the current root dir, since may be a different cluster 849 Configuration conf = new Configuration(baseConf); 850 CommonFSUtils.setRootDir(conf, rootDir); 851 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 852 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 853 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 854 } 855 856 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 857 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 858 ExecutorService pool = Executors 859 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 860 List<Future<Void>> futures = new ArrayList<>(); 861 for (Path dstPath : traversedPath) { 862 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 863 futures.add(future); 864 } 865 try { 866 for (Future<Void> future : futures) { 867 future.get(); 868 } 869 } catch (InterruptedException | ExecutionException e) { 870 throw new IOException(e); 871 } finally { 872 pool.shutdownNow(); 873 } 874 } 875 876 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 877 Configuration conf, List<Path> traversedPath) throws IOException { 878 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 879 try { 880 fs.setOwner(path, filesUser, filesGroup); 881 } catch (IOException e) { 882 throw new RuntimeException( 883 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 884 } 885 }, conf); 886 } 887 888 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 889 final List<Path> traversedPath, final Configuration conf) throws IOException { 890 if (filesMode <= 0) { 891 return; 892 } 893 FsPermission perm = new FsPermission(filesMode); 894 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 895 try { 896 fs.setPermission(path, perm); 897 } catch (IOException e) { 898 throw new RuntimeException( 899 "set permission for file " + path + " to " + filesMode + " failed", e); 900 } 901 }, conf); 902 } 903 904 private boolean verifyTarget = true; 905 private boolean verifyChecksum = true; 906 private String snapshotName = null; 907 private String targetName = null; 908 private boolean overwrite = false; 909 private String filesGroup = null; 910 private String filesUser = null; 911 private Path outputRoot = null; 912 private Path inputRoot = null; 913 private int bandwidthMB = Integer.MAX_VALUE; 914 private int filesMode = 0; 915 private int mappers = 0; 916 917 @Override 918 protected void processOptions(CommandLine cmd) { 919 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 920 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 921 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 922 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 923 } 924 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 925 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 926 } 927 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 928 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 929 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 930 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode); 931 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 932 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 933 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 934 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 935 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 936 } 937 938 /** 939 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 940 * @return 0 on success, and != 0 upon failure. 941 */ 942 @Override 943 public int doWork() throws IOException { 944 Configuration conf = getConf(); 945 946 // Check user options 947 if (snapshotName == null) { 948 System.err.println("Snapshot name not provided."); 949 LOG.error("Use -h or --help for usage instructions."); 950 return 0; 951 } 952 953 if (outputRoot == null) { 954 System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt() 955 + ") not provided."); 956 LOG.error("Use -h or --help for usage instructions."); 957 return 0; 958 } 959 960 if (targetName == null) { 961 targetName = snapshotName; 962 } 963 if (inputRoot == null) { 964 inputRoot = CommonFSUtils.getRootDir(conf); 965 } else { 966 CommonFSUtils.setRootDir(conf, inputRoot); 967 } 968 969 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 970 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 971 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 972 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 973 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 974 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 975 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) || 976 conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 977 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 978 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, 979 destConf); 980 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 981 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 982 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 983 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", 984 outputFs, outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 985 986 // Find the necessary directory which need to change owner and group 987 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 988 if (outputFs.exists(needSetOwnerDir)) { 989 if (skipTmp) { 990 needSetOwnerDir = outputSnapshotDir; 991 } else { 992 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 993 if (outputFs.exists(needSetOwnerDir)) { 994 needSetOwnerDir = snapshotTmpDir; 995 } 996 } 997 } 998 999 // Check if the snapshot already exists 1000 if (outputFs.exists(outputSnapshotDir)) { 1001 if (overwrite) { 1002 if (!outputFs.delete(outputSnapshotDir, true)) { 1003 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1004 return 1; 1005 } 1006 } else { 1007 System.err.println("The snapshot '" + targetName + 1008 "' already exists in the destination: " + outputSnapshotDir); 1009 return 1; 1010 } 1011 } 1012 1013 if (!skipTmp) { 1014 // Check if the snapshot already in-progress 1015 if (outputFs.exists(snapshotTmpDir)) { 1016 if (overwrite) { 1017 if (!outputFs.delete(snapshotTmpDir, true)) { 1018 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir); 1019 return 1; 1020 } 1021 } else { 1022 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress"); 1023 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, "); 1024 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option"); 1025 return 1; 1026 } 1027 } 1028 } 1029 1030 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1031 // The snapshot references must be copied before the hfiles otherwise the cleaner 1032 // will remove them because they are unreferenced. 1033 List<Path> travesedPaths = new ArrayList<>(); 1034 boolean copySucceeded = false; 1035 try { 1036 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1037 travesedPaths = 1038 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1039 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1040 copySucceeded = true; 1041 } catch (IOException e) { 1042 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + 1043 snapshotDir + " to=" + initialOutputSnapshotDir, e); 1044 } finally { 1045 if (copySucceeded) { 1046 if (filesUser != null || filesGroup != null) { 1047 LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " 1048 + filesUser) 1049 + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to " 1050 + filesGroup)); 1051 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1052 } 1053 if (filesMode > 0) { 1054 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1055 setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf); 1056 } 1057 } 1058 } 1059 1060 // Write a new .snapshotinfo if the target name is different from the source name 1061 if (!targetName.equals(snapshotName)) { 1062 SnapshotDescription snapshotDesc = 1063 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir) 1064 .toBuilder() 1065 .setName(targetName) 1066 .build(); 1067 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs); 1068 if (filesUser != null || filesGroup != null) { 1069 outputFs.setOwner(new Path(initialOutputSnapshotDir, 1070 SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup); 1071 } 1072 if (filesMode > 0) { 1073 outputFs.setPermission(new Path(initialOutputSnapshotDir, 1074 SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode)); 1075 } 1076 } 1077 1078 // Step 2 - Start MR Job to copy files 1079 // The snapshot references must be copied before the files otherwise the files gets removed 1080 // by the HFileArchiver, since they have no references. 1081 try { 1082 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, 1083 filesUser, filesGroup, filesMode, mappers, bandwidthMB); 1084 1085 LOG.info("Finalize the Snapshot Export"); 1086 if (!skipTmp) { 1087 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1088 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1089 throw new ExportSnapshotException("Unable to rename snapshot directory from=" + 1090 snapshotTmpDir + " to=" + outputSnapshotDir); 1091 } 1092 } 1093 1094 // Step 4 - Verify snapshot integrity 1095 if (verifyTarget) { 1096 LOG.info("Verify snapshot integrity"); 1097 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir); 1098 } 1099 1100 LOG.info("Export Completed: " + targetName); 1101 return 0; 1102 } catch (Exception e) { 1103 LOG.error("Snapshot export failed", e); 1104 if (!skipTmp) { 1105 outputFs.delete(snapshotTmpDir, true); 1106 } 1107 outputFs.delete(outputSnapshotDir, true); 1108 return 1; 1109 } finally { 1110 IOUtils.closeStream(inputFs); 1111 IOUtils.closeStream(outputFs); 1112 } 1113 } 1114 1115 @Override 1116 protected void printUsage() { 1117 super.printUsage(); 1118 System.out.println("\n" 1119 + "Examples:\n" 1120 + " hbase snapshot export \\\n" 1121 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1122 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" 1123 + "\n" 1124 + " hbase snapshot export \\\n" 1125 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1126 + " --copy-to hdfs://srv1:50070/hbase"); 1127 } 1128 1129 @Override protected void addOptions() { 1130 addRequiredOption(Options.SNAPSHOT); 1131 addOption(Options.COPY_TO); 1132 addOption(Options.COPY_FROM); 1133 addOption(Options.TARGET_NAME); 1134 addOption(Options.NO_CHECKSUM_VERIFY); 1135 addOption(Options.NO_TARGET_VERIFY); 1136 addOption(Options.OVERWRITE); 1137 addOption(Options.CHUSER); 1138 addOption(Options.CHGROUP); 1139 addOption(Options.CHMOD); 1140 addOption(Options.MAPPERS); 1141 addOption(Options.BANDWIDTH); 1142 } 1143 1144 public static void main(String[] args) { 1145 new ExportSnapshot().doStaticMain(args); 1146 } 1147}