001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.function.BiConsumer; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FSDataOutputStream; 039import org.apache.hadoop.fs.FileChecksum; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsPermission; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.io.FileLink; 049import org.apache.hadoop.hbase.io.HFileLink; 050import org.apache.hadoop.hbase.io.WALLink; 051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.mob.MobUtils; 054import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 055import org.apache.hadoop.hbase.util.AbstractHBaseTool; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.hbase.util.HFileArchiveUtil; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.io.NullWritable; 063import org.apache.hadoop.io.Writable; 064import org.apache.hadoop.mapreduce.InputFormat; 065import org.apache.hadoop.mapreduce.InputSplit; 066import org.apache.hadoop.mapreduce.Job; 067import org.apache.hadoop.mapreduce.JobContext; 068import org.apache.hadoop.mapreduce.Mapper; 069import org.apache.hadoop.mapreduce.RecordReader; 070import org.apache.hadoop.mapreduce.TaskAttemptContext; 071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 072import org.apache.hadoop.mapreduce.security.TokenCache; 073import org.apache.hadoop.util.StringUtils; 074import org.apache.hadoop.util.Tool; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 085 086/** 087 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 088 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 089 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 090 */ 091@InterfaceAudience.Public 092public class ExportSnapshot extends AbstractHBaseTool implements Tool { 093 public static final String NAME = "exportsnapshot"; 094 /** Configuration prefix for overrides for the source filesystem */ 095 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 096 /** Configuration prefix for overrides for the destination filesystem */ 097 public static final String CONF_DEST_PREFIX = NAME + ".to."; 098 099 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 100 101 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 102 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 103 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 104 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 105 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 106 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 107 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 108 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 109 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 110 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 111 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 112 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 113 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 114 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 115 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 116 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 117 private static final String CONF_COPY_MANIFEST_THREADS = 118 "snapshot.export.copy.references.threads"; 119 private static final int DEFAULT_COPY_MANIFEST_THREADS = 120 Runtime.getRuntime().availableProcessors(); 121 122 static class Testing { 123 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 124 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 125 int failuresCountToInject = 0; 126 int injectedFailureCount = 0; 127 } 128 129 // Command line options and defaults. 130 static final class Options { 131 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 132 static final Option TARGET_NAME = 133 new Option(null, "target", true, "Target name for the snapshot."); 134 static final Option COPY_TO = 135 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 136 static final Option COPY_FROM = 137 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 138 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 139 "Do not verify checksum, use name+length only."); 140 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 141 "Do not verify the integrity of the exported snapshot."); 142 static final Option NO_SOURCE_VERIFY = 143 new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot."); 144 static final Option OVERWRITE = 145 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 146 static final Option CHUSER = 147 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 148 static final Option CHGROUP = 149 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 150 static final Option CHMOD = 151 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 152 static final Option MAPPERS = new Option(null, "mappers", true, 153 "Number of mappers to use during the copy (mapreduce.job.maps)."); 154 static final Option BANDWIDTH = 155 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 156 static final Option RESET_TTL = 157 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 158 } 159 160 // Export Map-Reduce Counters, to keep track of the progress 161 public enum Counter { 162 MISSING_FILES, 163 FILES_COPIED, 164 FILES_SKIPPED, 165 COPY_FAILED, 166 BYTES_EXPECTED, 167 BYTES_SKIPPED, 168 BYTES_COPIED 169 } 170 171 private static class ExportMapper 172 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 173 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 174 final static int REPORT_SIZE = 1 * 1024 * 1024; 175 final static int BUFFER_SIZE = 64 * 1024; 176 177 private boolean verifyChecksum; 178 private String filesGroup; 179 private String filesUser; 180 private short filesMode; 181 private int bufferSize; 182 private int reportSize; 183 184 private FileSystem outputFs; 185 private Path outputArchive; 186 private Path outputRoot; 187 188 private FileSystem inputFs; 189 private Path inputArchive; 190 private Path inputRoot; 191 192 private static Testing testing = new Testing(); 193 194 @Override 195 public void setup(Context context) throws IOException { 196 Configuration conf = context.getConfiguration(); 197 198 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 199 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 200 201 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 202 203 filesGroup = conf.get(CONF_FILES_GROUP); 204 filesUser = conf.get(CONF_FILES_USER); 205 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 206 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 207 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 208 209 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 210 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 211 212 try { 213 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 214 } catch (IOException e) { 215 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 216 } 217 218 try { 219 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 220 } catch (IOException e) { 221 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 222 } 223 224 // Use the default block size of the outputFs if bigger 225 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 226 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 227 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 228 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 229 230 for (Counter c : Counter.values()) { 231 context.getCounter(c).increment(0); 232 } 233 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 234 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 235 // Get number of times we have already injected failure based on attempt number of this 236 // task. 237 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 238 } 239 } 240 241 @Override 242 public void map(BytesWritable key, NullWritable value, Context context) 243 throws InterruptedException, IOException { 244 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 245 Path outputPath = getOutputPath(inputInfo); 246 247 copyFile(context, inputInfo, outputPath); 248 } 249 250 /** 251 * Returns the location where the inputPath will be copied. 252 */ 253 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 254 Path path = null; 255 switch (inputInfo.getType()) { 256 case HFILE: 257 Path inputPath = new Path(inputInfo.getHfile()); 258 String family = inputPath.getParent().getName(); 259 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 260 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 261 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 262 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 263 new Path(region, new Path(family, hfile))); 264 break; 265 case WAL: 266 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 267 break; 268 default: 269 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 270 } 271 return new Path(outputArchive, path); 272 } 273 274 @SuppressWarnings("checkstyle:linelength") 275 /** 276 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 277 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 278 */ 279 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 280 throws IOException { 281 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 282 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 283 testing.injectedFailureCount++; 284 context.getCounter(Counter.COPY_FAILED).increment(1); 285 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 286 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 287 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 288 } 289 290 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 291 final Path outputPath) throws IOException { 292 // Get the file information 293 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 294 295 // Verify if the output file exists and is the same that we want to copy 296 if (outputFs.exists(outputPath)) { 297 FileStatus outputStat = outputFs.getFileStatus(outputPath); 298 if (outputStat != null && sameFile(inputStat, outputStat)) { 299 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 300 context.getCounter(Counter.FILES_SKIPPED).increment(1); 301 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 302 return; 303 } 304 } 305 306 InputStream in = openSourceFile(context, inputInfo); 307 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 308 if (Integer.MAX_VALUE != bandwidthMB) { 309 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 310 } 311 312 try { 313 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 314 315 // Ensure that the output folder is there and copy the file 316 createOutputPath(outputPath.getParent()); 317 FSDataOutputStream out = outputFs.create(outputPath, true); 318 try { 319 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); 320 } finally { 321 out.close(); 322 } 323 324 // Try to Preserve attributes 325 if (!preserveAttributes(outputPath, inputStat)) { 326 LOG.warn("You may have to run manually chown on: " + outputPath); 327 } 328 } finally { 329 in.close(); 330 injectTestFailure(context, inputInfo); 331 } 332 } 333 334 /** 335 * Create the output folder and optionally set ownership. 336 */ 337 private void createOutputPath(final Path path) throws IOException { 338 if (filesUser == null && filesGroup == null) { 339 outputFs.mkdirs(path); 340 } else { 341 Path parent = path.getParent(); 342 if (!outputFs.exists(parent) && !parent.isRoot()) { 343 createOutputPath(parent); 344 } 345 outputFs.mkdirs(path); 346 if (filesUser != null || filesGroup != null) { 347 // override the owner when non-null user/group is specified 348 outputFs.setOwner(path, filesUser, filesGroup); 349 } 350 if (filesMode > 0) { 351 outputFs.setPermission(path, new FsPermission(filesMode)); 352 } 353 } 354 } 355 356 /** 357 * Try to Preserve the files attribute selected by the user copying them from the source file 358 * This is only required when you are exporting as a different user than "hbase" or on a system 359 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 360 * can force a chmod with the user that knows is available on the system. 361 */ 362 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 363 FileStatus stat; 364 try { 365 stat = outputFs.getFileStatus(path); 366 } catch (IOException e) { 367 LOG.warn("Unable to get the status for file=" + path); 368 return false; 369 } 370 371 try { 372 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 373 outputFs.setPermission(path, new FsPermission(filesMode)); 374 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 375 outputFs.setPermission(path, refStat.getPermission()); 376 } 377 } catch (IOException e) { 378 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 379 return false; 380 } 381 382 boolean hasRefStat = (refStat != null); 383 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 384 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 385 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 386 try { 387 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 388 outputFs.setOwner(path, user, group); 389 } 390 } catch (IOException e) { 391 LOG.warn( 392 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 393 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 394 + " group=" + group); 395 return false; 396 } 397 } 398 399 return true; 400 } 401 402 private boolean stringIsNotEmpty(final String str) { 403 return str != null && str.length() > 0; 404 } 405 406 private void copyData(final Context context, final Path inputPath, final InputStream in, 407 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 408 throws IOException { 409 final String statusMessage = 410 "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)"; 411 412 try { 413 byte[] buffer = new byte[bufferSize]; 414 long totalBytesWritten = 0; 415 int reportBytes = 0; 416 int bytesRead; 417 418 long stime = EnvironmentEdgeManager.currentTime(); 419 while ((bytesRead = in.read(buffer)) > 0) { 420 out.write(buffer, 0, bytesRead); 421 totalBytesWritten += bytesRead; 422 reportBytes += bytesRead; 423 424 if (reportBytes >= reportSize) { 425 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 426 context.setStatus( 427 String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 428 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 429 + " to " + outputPath); 430 reportBytes = 0; 431 } 432 } 433 long etime = EnvironmentEdgeManager.currentTime(); 434 435 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 436 context 437 .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 438 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 439 + outputPath); 440 441 // Verify that the written size match 442 if (totalBytesWritten != inputFileSize) { 443 String msg = "number of bytes copied not matching copied=" + totalBytesWritten 444 + " expected=" + inputFileSize + " for file=" + inputPath; 445 throw new IOException(msg); 446 } 447 448 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 449 LOG 450 .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten) 451 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String 452 .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 453 context.getCounter(Counter.FILES_COPIED).increment(1); 454 } catch (IOException e) { 455 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 456 context.getCounter(Counter.COPY_FAILED).increment(1); 457 throw e; 458 } 459 } 460 461 /** 462 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 463 * fail or if the file is not found. 464 */ 465 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 466 throws IOException { 467 try { 468 Configuration conf = context.getConfiguration(); 469 FileLink link = null; 470 switch (fileInfo.getType()) { 471 case HFILE: 472 Path inputPath = new Path(fileInfo.getHfile()); 473 link = getFileLink(inputPath, conf); 474 break; 475 case WAL: 476 String serverName = fileInfo.getWalServer(); 477 String logName = fileInfo.getWalName(); 478 link = new WALLink(inputRoot, serverName, logName); 479 break; 480 default: 481 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 482 } 483 return link.open(inputFs); 484 } catch (IOException e) { 485 context.getCounter(Counter.MISSING_FILES).increment(1); 486 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 487 throw e; 488 } 489 } 490 491 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 492 throws IOException { 493 try { 494 Configuration conf = context.getConfiguration(); 495 FileLink link = null; 496 switch (fileInfo.getType()) { 497 case HFILE: 498 Path inputPath = new Path(fileInfo.getHfile()); 499 link = getFileLink(inputPath, conf); 500 break; 501 case WAL: 502 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 503 break; 504 default: 505 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 506 } 507 return link.getFileStatus(inputFs); 508 } catch (FileNotFoundException e) { 509 context.getCounter(Counter.MISSING_FILES).increment(1); 510 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 511 throw e; 512 } catch (IOException e) { 513 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 514 throw e; 515 } 516 } 517 518 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 519 String regionName = HFileLink.getReferencedRegionName(path.getName()); 520 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 521 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 522 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 523 HFileArchiveUtil.getArchivePath(conf), path); 524 } 525 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 526 } 527 528 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 529 try { 530 return fs.getFileChecksum(path); 531 } catch (IOException e) { 532 LOG.warn("Unable to get checksum for file=" + path, e); 533 return null; 534 } 535 } 536 537 /** 538 * Check if the two files are equal by looking at the file length, and at the checksum (if user 539 * has specified the verifyChecksum flag). 540 */ 541 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 542 // Not matching length 543 if (inputStat.getLen() != outputStat.getLen()) return false; 544 545 // Mark files as equals, since user asked for no checksum verification 546 if (!verifyChecksum) return true; 547 548 // If checksums are not available, files are not the same. 549 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 550 if (inChecksum == null) return false; 551 552 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 553 if (outChecksum == null) return false; 554 555 return inChecksum.equals(outChecksum); 556 } 557 } 558 559 // ========================================================================== 560 // Input Format 561 // ========================================================================== 562 563 /** 564 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 565 * @return list of files referenced by the snapshot (pair of path and size) 566 */ 567 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 568 final FileSystem fs, final Path snapshotDir) throws IOException { 569 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 570 571 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 572 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 573 574 // Get snapshot files 575 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 576 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 577 new SnapshotReferenceUtil.SnapshotVisitor() { 578 @Override 579 public void storeFile(final RegionInfo regionInfo, final String family, 580 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 581 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 582 if (!storeFile.hasReference()) { 583 String region = regionInfo.getEncodedName(); 584 String hfile = storeFile.getName(); 585 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 586 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 587 } else { 588 Pair<String, String> referredToRegionAndFile = 589 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 590 String referencedRegion = referredToRegionAndFile.getFirst(); 591 String referencedHFile = referredToRegionAndFile.getSecond(); 592 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 593 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 594 } 595 files.add(snapshotFileAndSize); 596 } 597 }); 598 599 return files; 600 } 601 602 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 603 Configuration conf, TableName table, String region, String family, String hfile, long size) 604 throws IOException { 605 Path path = HFileLink.createPath(table, region, family, hfile); 606 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 607 .setHfile(path.toString()).build(); 608 if (size == -1) { 609 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 610 } 611 return new Pair<>(fileInfo, size); 612 } 613 614 /** 615 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 616 * The groups created will have similar amounts of bytes. 617 * <p> 618 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 619 * group fetch the bigger file available, iterating through groups alternating the direction. 620 */ 621 static List<List<Pair<SnapshotFileInfo, Long>>> 622 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 623 // Sort files by size, from small to big 624 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 625 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 626 long r = a.getSecond() - b.getSecond(); 627 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 628 } 629 }); 630 631 // create balanced groups 632 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 633 long[] sizeGroups = new long[ngroups]; 634 int hi = files.size() - 1; 635 int lo = 0; 636 637 List<Pair<SnapshotFileInfo, Long>> group; 638 int dir = 1; 639 int g = 0; 640 641 while (hi >= lo) { 642 if (g == fileGroups.size()) { 643 group = new LinkedList<>(); 644 fileGroups.add(group); 645 } else { 646 group = fileGroups.get(g); 647 } 648 649 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 650 651 // add the hi one 652 sizeGroups[g] += fileInfo.getSecond(); 653 group.add(fileInfo); 654 655 // change direction when at the end or the beginning 656 g += dir; 657 if (g == ngroups) { 658 dir = -1; 659 g = ngroups - 1; 660 } else if (g < 0) { 661 dir = 1; 662 g = 0; 663 } 664 } 665 666 if (LOG.isDebugEnabled()) { 667 for (int i = 0; i < sizeGroups.length; ++i) { 668 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 669 } 670 } 671 672 return fileGroups; 673 } 674 675 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 676 @Override 677 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 678 TaskAttemptContext tac) throws IOException, InterruptedException { 679 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 680 } 681 682 @Override 683 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 684 Configuration conf = context.getConfiguration(); 685 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 686 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 687 688 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 689 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 690 if (mappers == 0 && snapshotFiles.size() > 0) { 691 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 692 mappers = Math.min(mappers, snapshotFiles.size()); 693 conf.setInt(CONF_NUM_SPLITS, mappers); 694 conf.setInt(MR_NUM_MAPS, mappers); 695 } 696 697 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 698 List<InputSplit> splits = new ArrayList(groups.size()); 699 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 700 splits.add(new ExportSnapshotInputSplit(files)); 701 } 702 return splits; 703 } 704 705 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 706 private List<Pair<BytesWritable, Long>> files; 707 private long length; 708 709 public ExportSnapshotInputSplit() { 710 this.files = null; 711 } 712 713 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 714 this.files = new ArrayList(snapshotFiles.size()); 715 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 716 this.files.add( 717 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 718 this.length += fileInfo.getSecond(); 719 } 720 } 721 722 private List<Pair<BytesWritable, Long>> getSplitKeys() { 723 return files; 724 } 725 726 @Override 727 public long getLength() throws IOException, InterruptedException { 728 return length; 729 } 730 731 @Override 732 public String[] getLocations() throws IOException, InterruptedException { 733 return new String[] {}; 734 } 735 736 @Override 737 public void readFields(DataInput in) throws IOException { 738 int count = in.readInt(); 739 files = new ArrayList<>(count); 740 length = 0; 741 for (int i = 0; i < count; ++i) { 742 BytesWritable fileInfo = new BytesWritable(); 743 fileInfo.readFields(in); 744 long size = in.readLong(); 745 files.add(new Pair<>(fileInfo, size)); 746 length += size; 747 } 748 } 749 750 @Override 751 public void write(DataOutput out) throws IOException { 752 out.writeInt(files.size()); 753 for (final Pair<BytesWritable, Long> fileInfo : files) { 754 fileInfo.getFirst().write(out); 755 out.writeLong(fileInfo.getSecond()); 756 } 757 } 758 } 759 760 private static class ExportSnapshotRecordReader 761 extends RecordReader<BytesWritable, NullWritable> { 762 private final List<Pair<BytesWritable, Long>> files; 763 private long totalSize = 0; 764 private long procSize = 0; 765 private int index = -1; 766 767 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 768 this.files = files; 769 for (Pair<BytesWritable, Long> fileInfo : files) { 770 totalSize += fileInfo.getSecond(); 771 } 772 } 773 774 @Override 775 public void close() { 776 } 777 778 @Override 779 public BytesWritable getCurrentKey() { 780 return files.get(index).getFirst(); 781 } 782 783 @Override 784 public NullWritable getCurrentValue() { 785 return NullWritable.get(); 786 } 787 788 @Override 789 public float getProgress() { 790 return (float) procSize / totalSize; 791 } 792 793 @Override 794 public void initialize(InputSplit split, TaskAttemptContext tac) { 795 } 796 797 @Override 798 public boolean nextKeyValue() { 799 if (index >= 0) { 800 procSize += files.get(index).getSecond(); 801 } 802 return (++index < files.size()); 803 } 804 } 805 } 806 807 // ========================================================================== 808 // Tool 809 // ========================================================================== 810 811 /** 812 * Run Map-Reduce Job to perform the files copy. 813 */ 814 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 815 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 816 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 817 throws IOException, InterruptedException, ClassNotFoundException { 818 Configuration conf = getConf(); 819 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 820 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 821 if (mappers > 0) { 822 conf.setInt(CONF_NUM_SPLITS, mappers); 823 conf.setInt(MR_NUM_MAPS, mappers); 824 } 825 conf.setInt(CONF_FILES_MODE, filesMode); 826 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 827 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 828 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 829 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 830 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 831 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 832 833 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 834 Job job = new Job(conf); 835 job.setJobName(jobname); 836 job.setJarByClass(ExportSnapshot.class); 837 TableMapReduceUtil.addDependencyJars(job); 838 job.setMapperClass(ExportMapper.class); 839 job.setInputFormatClass(ExportSnapshotInputFormat.class); 840 job.setOutputFormatClass(NullOutputFormat.class); 841 job.setMapSpeculativeExecution(false); 842 job.setNumReduceTasks(0); 843 844 // Acquire the delegation Tokens 845 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 846 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 847 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 848 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 849 850 // Run the MR Job 851 if (!job.waitForCompletion(true)) { 852 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 853 } 854 } 855 856 private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir, 857 final Path snapshotDir) throws IOException { 858 // Update the conf with the current root dir, since may be a different cluster 859 Configuration conf = new Configuration(baseConf); 860 CommonFSUtils.setRootDir(conf, rootDir); 861 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 862 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 863 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 864 } 865 866 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 867 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 868 ExecutorService pool = Executors 869 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 870 List<Future<Void>> futures = new ArrayList<>(); 871 for (Path dstPath : traversedPath) { 872 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 873 futures.add(future); 874 } 875 try { 876 for (Future<Void> future : futures) { 877 future.get(); 878 } 879 } catch (InterruptedException | ExecutionException e) { 880 throw new IOException(e); 881 } finally { 882 pool.shutdownNow(); 883 } 884 } 885 886 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 887 Configuration conf, List<Path> traversedPath) throws IOException { 888 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 889 try { 890 fs.setOwner(path, filesUser, filesGroup); 891 } catch (IOException e) { 892 throw new RuntimeException( 893 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 894 } 895 }, conf); 896 } 897 898 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 899 final List<Path> traversedPath, final Configuration conf) throws IOException { 900 if (filesMode <= 0) { 901 return; 902 } 903 FsPermission perm = new FsPermission(filesMode); 904 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 905 try { 906 fs.setPermission(path, perm); 907 } catch (IOException e) { 908 throw new RuntimeException( 909 "set permission for file " + path + " to " + filesMode + " failed", e); 910 } 911 }, conf); 912 } 913 914 private boolean verifyTarget = true; 915 private boolean verifySource = true; 916 private boolean verifyChecksum = true; 917 private String snapshotName = null; 918 private String targetName = null; 919 private boolean overwrite = false; 920 private String filesGroup = null; 921 private String filesUser = null; 922 private Path outputRoot = null; 923 private Path inputRoot = null; 924 private int bandwidthMB = Integer.MAX_VALUE; 925 private int filesMode = 0; 926 private int mappers = 0; 927 private boolean resetTtl = false; 928 929 @Override 930 protected void processOptions(CommandLine cmd) { 931 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 932 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 933 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 934 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 935 } 936 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 937 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 938 } 939 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 940 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 941 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 942 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 943 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 944 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 945 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 946 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 947 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 948 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 949 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 950 } 951 952 /** 953 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 954 * @return 0 on success, and != 0 upon failure. 955 */ 956 @Override 957 public int doWork() throws IOException { 958 Configuration conf = getConf(); 959 960 // Check user options 961 if (snapshotName == null) { 962 System.err.println("Snapshot name not provided."); 963 LOG.error("Use -h or --help for usage instructions."); 964 return 0; 965 } 966 967 if (outputRoot == null) { 968 System.err 969 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 970 LOG.error("Use -h or --help for usage instructions."); 971 return 0; 972 } 973 974 if (targetName == null) { 975 targetName = snapshotName; 976 } 977 if (inputRoot == null) { 978 inputRoot = CommonFSUtils.getRootDir(conf); 979 } else { 980 CommonFSUtils.setRootDir(conf, inputRoot); 981 } 982 983 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 984 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 985 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 986 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 987 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 988 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 989 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 990 Path snapshotTmpDir = 991 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 992 Path outputSnapshotDir = 993 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 994 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 995 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 996 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 997 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 998 999 // Verify snapshot source before copying files 1000 if (verifySource) { 1001 LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.", 1002 inputFs.getUri(), inputRoot, snapshotDir); 1003 verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir); 1004 } 1005 1006 // Find the necessary directory which need to change owner and group 1007 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1008 if (outputFs.exists(needSetOwnerDir)) { 1009 if (skipTmp) { 1010 needSetOwnerDir = outputSnapshotDir; 1011 } else { 1012 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1013 if (outputFs.exists(needSetOwnerDir)) { 1014 needSetOwnerDir = snapshotTmpDir; 1015 } 1016 } 1017 } 1018 1019 // Check if the snapshot already exists 1020 if (outputFs.exists(outputSnapshotDir)) { 1021 if (overwrite) { 1022 if (!outputFs.delete(outputSnapshotDir, true)) { 1023 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1024 return 1; 1025 } 1026 } else { 1027 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1028 + outputSnapshotDir); 1029 return 1; 1030 } 1031 } 1032 1033 if (!skipTmp) { 1034 // Check if the snapshot already in-progress 1035 if (outputFs.exists(snapshotTmpDir)) { 1036 if (overwrite) { 1037 if (!outputFs.delete(snapshotTmpDir, true)) { 1038 System.err 1039 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1040 return 1; 1041 } 1042 } else { 1043 System.err 1044 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1045 System.err 1046 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1047 System.err 1048 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1049 return 1; 1050 } 1051 } 1052 } 1053 1054 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1055 // The snapshot references must be copied before the hfiles otherwise the cleaner 1056 // will remove them because they are unreferenced. 1057 List<Path> travesedPaths = new ArrayList<>(); 1058 boolean copySucceeded = false; 1059 try { 1060 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1061 travesedPaths = 1062 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1063 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1064 copySucceeded = true; 1065 } catch (IOException e) { 1066 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1067 + " to=" + initialOutputSnapshotDir, e); 1068 } finally { 1069 if (copySucceeded) { 1070 if (filesUser != null || filesGroup != null) { 1071 LOG.warn( 1072 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1073 + (filesGroup == null 1074 ? "" 1075 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1076 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1077 } 1078 if (filesMode > 0) { 1079 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1080 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1081 } 1082 } 1083 } 1084 1085 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1086 // reset TTL for target snapshot. 1087 if (!targetName.equals(snapshotName) || resetTtl) { 1088 SnapshotDescription.Builder snapshotDescBuilder = 1089 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1090 if (!targetName.equals(snapshotName)) { 1091 snapshotDescBuilder.setName(targetName); 1092 } 1093 if (resetTtl) { 1094 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1095 } 1096 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1097 initialOutputSnapshotDir, outputFs); 1098 if (filesUser != null || filesGroup != null) { 1099 outputFs.setOwner( 1100 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1101 filesGroup); 1102 } 1103 if (filesMode > 0) { 1104 outputFs.setPermission( 1105 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1106 new FsPermission((short) filesMode)); 1107 } 1108 } 1109 1110 // Step 2 - Start MR Job to copy files 1111 // The snapshot references must be copied before the files otherwise the files gets removed 1112 // by the HFileArchiver, since they have no references. 1113 try { 1114 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1115 filesGroup, filesMode, mappers, bandwidthMB); 1116 1117 LOG.info("Finalize the Snapshot Export"); 1118 if (!skipTmp) { 1119 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1120 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1121 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1122 + snapshotTmpDir + " to=" + outputSnapshotDir); 1123 } 1124 } 1125 1126 // Step 4 - Verify snapshot integrity 1127 if (verifyTarget) { 1128 LOG.info("Verify snapshot integrity"); 1129 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir); 1130 } 1131 1132 LOG.info("Export Completed: " + targetName); 1133 return 0; 1134 } catch (Exception e) { 1135 LOG.error("Snapshot export failed", e); 1136 if (!skipTmp) { 1137 outputFs.delete(snapshotTmpDir, true); 1138 } 1139 outputFs.delete(outputSnapshotDir, true); 1140 return 1; 1141 } 1142 } 1143 1144 @Override 1145 protected void printUsage() { 1146 super.printUsage(); 1147 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1148 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1149 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1150 + " hbase snapshot export \\\n" 1151 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1152 + " --copy-to hdfs://srv1:50070/hbase"); 1153 } 1154 1155 @Override 1156 protected void addOptions() { 1157 addRequiredOption(Options.SNAPSHOT); 1158 addOption(Options.COPY_TO); 1159 addOption(Options.COPY_FROM); 1160 addOption(Options.TARGET_NAME); 1161 addOption(Options.NO_CHECKSUM_VERIFY); 1162 addOption(Options.NO_TARGET_VERIFY); 1163 addOption(Options.NO_SOURCE_VERIFY); 1164 addOption(Options.OVERWRITE); 1165 addOption(Options.CHUSER); 1166 addOption(Options.CHGROUP); 1167 addOption(Options.CHMOD); 1168 addOption(Options.MAPPERS); 1169 addOption(Options.BANDWIDTH); 1170 addOption(Options.RESET_TTL); 1171 } 1172 1173 public static void main(String[] args) { 1174 new ExportSnapshot().doStaticMain(args); 1175 } 1176}