001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.function.BiConsumer; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FSDataOutputStream; 039import org.apache.hadoop.fs.FileChecksum; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsPermission; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.io.FileLink; 049import org.apache.hadoop.hbase.io.HFileLink; 050import org.apache.hadoop.hbase.io.WALLink; 051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.mob.MobUtils; 054import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 055import org.apache.hadoop.hbase.util.AbstractHBaseTool; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 058import org.apache.hadoop.hbase.util.FSUtils; 059import org.apache.hadoop.hbase.util.HFileArchiveUtil; 060import org.apache.hadoop.hbase.util.Pair; 061import org.apache.hadoop.io.BytesWritable; 062import org.apache.hadoop.io.IOUtils; 063import org.apache.hadoop.io.NullWritable; 064import org.apache.hadoop.io.Writable; 065import org.apache.hadoop.mapreduce.InputFormat; 066import org.apache.hadoop.mapreduce.InputSplit; 067import org.apache.hadoop.mapreduce.Job; 068import org.apache.hadoop.mapreduce.JobContext; 069import org.apache.hadoop.mapreduce.Mapper; 070import org.apache.hadoop.mapreduce.RecordReader; 071import org.apache.hadoop.mapreduce.TaskAttemptContext; 072import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 073import org.apache.hadoop.mapreduce.security.TokenCache; 074import org.apache.hadoop.util.StringUtils; 075import org.apache.hadoop.util.Tool; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 081import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 082 083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 086 087/** 088 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 089 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 090 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 091 */ 092@InterfaceAudience.Public 093public class ExportSnapshot extends AbstractHBaseTool implements Tool { 094 public static final String NAME = "exportsnapshot"; 095 /** Configuration prefix for overrides for the source filesystem */ 096 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 097 /** Configuration prefix for overrides for the destination filesystem */ 098 public static final String CONF_DEST_PREFIX = NAME + ".to."; 099 100 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 101 102 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 103 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 104 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 105 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 106 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 107 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 108 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 109 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 110 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 111 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 112 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 113 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 114 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 115 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 116 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 117 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 118 private static final String CONF_COPY_MANIFEST_THREADS = 119 "snapshot.export.copy.references.threads"; 120 private static final int DEFAULT_COPY_MANIFEST_THREADS = 121 Runtime.getRuntime().availableProcessors(); 122 123 static class Testing { 124 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 125 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 126 int failuresCountToInject = 0; 127 int injectedFailureCount = 0; 128 } 129 130 // Command line options and defaults. 131 static final class Options { 132 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 133 static final Option TARGET_NAME = 134 new Option(null, "target", true, "Target name for the snapshot."); 135 static final Option COPY_TO = 136 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 137 static final Option COPY_FROM = 138 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 139 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 140 "Do not verify checksum, use name+length only."); 141 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 142 "Do not verify the integrity of the exported snapshot."); 143 static final Option NO_SOURCE_VERIFY = 144 new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot."); 145 static final Option OVERWRITE = 146 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 147 static final Option CHUSER = 148 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 149 static final Option CHGROUP = 150 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 151 static final Option CHMOD = 152 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 153 static final Option MAPPERS = new Option(null, "mappers", true, 154 "Number of mappers to use during the copy (mapreduce.job.maps)."); 155 static final Option BANDWIDTH = 156 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 157 static final Option RESET_TTL = 158 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 159 } 160 161 // Export Map-Reduce Counters, to keep track of the progress 162 public enum Counter { 163 MISSING_FILES, 164 FILES_COPIED, 165 FILES_SKIPPED, 166 COPY_FAILED, 167 BYTES_EXPECTED, 168 BYTES_SKIPPED, 169 BYTES_COPIED 170 } 171 172 private static class ExportMapper 173 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 174 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 175 final static int REPORT_SIZE = 1 * 1024 * 1024; 176 final static int BUFFER_SIZE = 64 * 1024; 177 178 private boolean verifyChecksum; 179 private String filesGroup; 180 private String filesUser; 181 private short filesMode; 182 private int bufferSize; 183 private int reportSize; 184 185 private FileSystem outputFs; 186 private Path outputArchive; 187 private Path outputRoot; 188 189 private FileSystem inputFs; 190 private Path inputArchive; 191 private Path inputRoot; 192 193 private static Testing testing = new Testing(); 194 195 @Override 196 public void setup(Context context) throws IOException { 197 Configuration conf = context.getConfiguration(); 198 199 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 200 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 201 202 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 203 204 filesGroup = conf.get(CONF_FILES_GROUP); 205 filesUser = conf.get(CONF_FILES_USER); 206 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 207 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 208 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 209 210 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 211 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 212 213 try { 214 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 215 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 216 } catch (IOException e) { 217 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 218 } 219 220 try { 221 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 222 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 223 } catch (IOException e) { 224 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 225 } 226 227 // Use the default block size of the outputFs if bigger 228 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 229 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 230 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 231 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 232 233 for (Counter c : Counter.values()) { 234 context.getCounter(c).increment(0); 235 } 236 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 237 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 238 // Get number of times we have already injected failure based on attempt number of this 239 // task. 240 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 241 } 242 } 243 244 @Override 245 protected void cleanup(Context context) { 246 IOUtils.closeStream(inputFs); 247 IOUtils.closeStream(outputFs); 248 } 249 250 @Override 251 public void map(BytesWritable key, NullWritable value, Context context) 252 throws InterruptedException, IOException { 253 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 254 Path outputPath = getOutputPath(inputInfo); 255 256 copyFile(context, inputInfo, outputPath); 257 } 258 259 /** 260 * Returns the location where the inputPath will be copied. 261 */ 262 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 263 Path path = null; 264 switch (inputInfo.getType()) { 265 case HFILE: 266 Path inputPath = new Path(inputInfo.getHfile()); 267 String family = inputPath.getParent().getName(); 268 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 269 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 270 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 271 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 272 new Path(region, new Path(family, hfile))); 273 break; 274 case WAL: 275 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 276 break; 277 default: 278 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 279 } 280 return new Path(outputArchive, path); 281 } 282 283 @SuppressWarnings("checkstyle:linelength") 284 /** 285 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 286 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 287 */ 288 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 289 throws IOException { 290 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 291 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 292 testing.injectedFailureCount++; 293 context.getCounter(Counter.COPY_FAILED).increment(1); 294 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 295 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 296 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 297 } 298 299 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 300 final Path outputPath) throws IOException { 301 // Get the file information 302 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 303 304 // Verify if the output file exists and is the same that we want to copy 305 if (outputFs.exists(outputPath)) { 306 FileStatus outputStat = outputFs.getFileStatus(outputPath); 307 if (outputStat != null && sameFile(inputStat, outputStat)) { 308 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 309 context.getCounter(Counter.FILES_SKIPPED).increment(1); 310 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 311 return; 312 } 313 } 314 315 InputStream in = openSourceFile(context, inputInfo); 316 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 317 if (Integer.MAX_VALUE != bandwidthMB) { 318 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 319 } 320 321 try { 322 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 323 324 // Ensure that the output folder is there and copy the file 325 createOutputPath(outputPath.getParent()); 326 FSDataOutputStream out = outputFs.create(outputPath, true); 327 try { 328 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); 329 } finally { 330 out.close(); 331 } 332 333 // Try to Preserve attributes 334 if (!preserveAttributes(outputPath, inputStat)) { 335 LOG.warn("You may have to run manually chown on: " + outputPath); 336 } 337 } finally { 338 in.close(); 339 injectTestFailure(context, inputInfo); 340 } 341 } 342 343 /** 344 * Create the output folder and optionally set ownership. 345 */ 346 private void createOutputPath(final Path path) throws IOException { 347 if (filesUser == null && filesGroup == null) { 348 outputFs.mkdirs(path); 349 } else { 350 Path parent = path.getParent(); 351 if (!outputFs.exists(parent) && !parent.isRoot()) { 352 createOutputPath(parent); 353 } 354 outputFs.mkdirs(path); 355 if (filesUser != null || filesGroup != null) { 356 // override the owner when non-null user/group is specified 357 outputFs.setOwner(path, filesUser, filesGroup); 358 } 359 if (filesMode > 0) { 360 outputFs.setPermission(path, new FsPermission(filesMode)); 361 } 362 } 363 } 364 365 /** 366 * Try to Preserve the files attribute selected by the user copying them from the source file 367 * This is only required when you are exporting as a different user than "hbase" or on a system 368 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 369 * can force a chmod with the user that knows is available on the system. 370 */ 371 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 372 FileStatus stat; 373 try { 374 stat = outputFs.getFileStatus(path); 375 } catch (IOException e) { 376 LOG.warn("Unable to get the status for file=" + path); 377 return false; 378 } 379 380 try { 381 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 382 outputFs.setPermission(path, new FsPermission(filesMode)); 383 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 384 outputFs.setPermission(path, refStat.getPermission()); 385 } 386 } catch (IOException e) { 387 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 388 return false; 389 } 390 391 boolean hasRefStat = (refStat != null); 392 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 393 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 394 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 395 try { 396 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 397 outputFs.setOwner(path, user, group); 398 } 399 } catch (IOException e) { 400 LOG.warn( 401 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 402 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 403 + " group=" + group); 404 return false; 405 } 406 } 407 408 return true; 409 } 410 411 private boolean stringIsNotEmpty(final String str) { 412 return str != null && str.length() > 0; 413 } 414 415 private void copyData(final Context context, final Path inputPath, final InputStream in, 416 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 417 throws IOException { 418 final String statusMessage = 419 "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)"; 420 421 try { 422 byte[] buffer = new byte[bufferSize]; 423 long totalBytesWritten = 0; 424 int reportBytes = 0; 425 int bytesRead; 426 427 long stime = EnvironmentEdgeManager.currentTime(); 428 while ((bytesRead = in.read(buffer)) > 0) { 429 out.write(buffer, 0, bytesRead); 430 totalBytesWritten += bytesRead; 431 reportBytes += bytesRead; 432 433 if (reportBytes >= reportSize) { 434 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 435 context.setStatus( 436 String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 437 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 438 + " to " + outputPath); 439 reportBytes = 0; 440 } 441 } 442 long etime = EnvironmentEdgeManager.currentTime(); 443 444 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 445 context 446 .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 447 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 448 + outputPath); 449 450 // Verify that the written size match 451 if (totalBytesWritten != inputFileSize) { 452 String msg = "number of bytes copied not matching copied=" + totalBytesWritten 453 + " expected=" + inputFileSize + " for file=" + inputPath; 454 throw new IOException(msg); 455 } 456 457 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 458 LOG 459 .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten) 460 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String 461 .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 462 context.getCounter(Counter.FILES_COPIED).increment(1); 463 } catch (IOException e) { 464 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 465 context.getCounter(Counter.COPY_FAILED).increment(1); 466 throw e; 467 } 468 } 469 470 /** 471 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 472 * fail or if the file is not found. 473 */ 474 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 475 throws IOException { 476 try { 477 Configuration conf = context.getConfiguration(); 478 FileLink link = null; 479 switch (fileInfo.getType()) { 480 case HFILE: 481 Path inputPath = new Path(fileInfo.getHfile()); 482 link = getFileLink(inputPath, conf); 483 break; 484 case WAL: 485 String serverName = fileInfo.getWalServer(); 486 String logName = fileInfo.getWalName(); 487 link = new WALLink(inputRoot, serverName, logName); 488 break; 489 default: 490 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 491 } 492 return link.open(inputFs); 493 } catch (IOException e) { 494 context.getCounter(Counter.MISSING_FILES).increment(1); 495 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 496 throw e; 497 } 498 } 499 500 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 501 throws IOException { 502 try { 503 Configuration conf = context.getConfiguration(); 504 FileLink link = null; 505 switch (fileInfo.getType()) { 506 case HFILE: 507 Path inputPath = new Path(fileInfo.getHfile()); 508 link = getFileLink(inputPath, conf); 509 break; 510 case WAL: 511 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 512 break; 513 default: 514 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 515 } 516 return link.getFileStatus(inputFs); 517 } catch (FileNotFoundException e) { 518 context.getCounter(Counter.MISSING_FILES).increment(1); 519 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 520 throw e; 521 } catch (IOException e) { 522 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 523 throw e; 524 } 525 } 526 527 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 528 String regionName = HFileLink.getReferencedRegionName(path.getName()); 529 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 530 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 531 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 532 HFileArchiveUtil.getArchivePath(conf), path); 533 } 534 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 535 } 536 537 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 538 try { 539 return fs.getFileChecksum(path); 540 } catch (IOException e) { 541 LOG.warn("Unable to get checksum for file=" + path, e); 542 return null; 543 } 544 } 545 546 /** 547 * Check if the two files are equal by looking at the file length, and at the checksum (if user 548 * has specified the verifyChecksum flag). 549 */ 550 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 551 // Not matching length 552 if (inputStat.getLen() != outputStat.getLen()) return false; 553 554 // Mark files as equals, since user asked for no checksum verification 555 if (!verifyChecksum) return true; 556 557 // If checksums are not available, files are not the same. 558 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 559 if (inChecksum == null) return false; 560 561 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 562 if (outChecksum == null) return false; 563 564 return inChecksum.equals(outChecksum); 565 } 566 } 567 568 // ========================================================================== 569 // Input Format 570 // ========================================================================== 571 572 /** 573 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 574 * @return list of files referenced by the snapshot (pair of path and size) 575 */ 576 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 577 final FileSystem fs, final Path snapshotDir) throws IOException { 578 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 579 580 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 581 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 582 583 // Get snapshot files 584 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 585 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 586 new SnapshotReferenceUtil.SnapshotVisitor() { 587 @Override 588 public void storeFile(final RegionInfo regionInfo, final String family, 589 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 590 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 591 if (!storeFile.hasReference()) { 592 String region = regionInfo.getEncodedName(); 593 String hfile = storeFile.getName(); 594 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 595 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 596 } else { 597 Pair<String, String> referredToRegionAndFile = 598 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 599 String referencedRegion = referredToRegionAndFile.getFirst(); 600 String referencedHFile = referredToRegionAndFile.getSecond(); 601 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 602 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 603 } 604 files.add(snapshotFileAndSize); 605 } 606 }); 607 608 return files; 609 } 610 611 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 612 Configuration conf, TableName table, String region, String family, String hfile, long size) 613 throws IOException { 614 Path path = HFileLink.createPath(table, region, family, hfile); 615 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 616 .setHfile(path.toString()).build(); 617 if (size == -1) { 618 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 619 } 620 return new Pair<>(fileInfo, size); 621 } 622 623 /** 624 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 625 * The groups created will have similar amounts of bytes. 626 * <p> 627 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 628 * group fetch the bigger file available, iterating through groups alternating the direction. 629 */ 630 static List<List<Pair<SnapshotFileInfo, Long>>> 631 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 632 // Sort files by size, from small to big 633 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 634 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 635 long r = a.getSecond() - b.getSecond(); 636 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 637 } 638 }); 639 640 // create balanced groups 641 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 642 long[] sizeGroups = new long[ngroups]; 643 int hi = files.size() - 1; 644 int lo = 0; 645 646 List<Pair<SnapshotFileInfo, Long>> group; 647 int dir = 1; 648 int g = 0; 649 650 while (hi >= lo) { 651 if (g == fileGroups.size()) { 652 group = new LinkedList<>(); 653 fileGroups.add(group); 654 } else { 655 group = fileGroups.get(g); 656 } 657 658 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 659 660 // add the hi one 661 sizeGroups[g] += fileInfo.getSecond(); 662 group.add(fileInfo); 663 664 // change direction when at the end or the beginning 665 g += dir; 666 if (g == ngroups) { 667 dir = -1; 668 g = ngroups - 1; 669 } else if (g < 0) { 670 dir = 1; 671 g = 0; 672 } 673 } 674 675 if (LOG.isDebugEnabled()) { 676 for (int i = 0; i < sizeGroups.length; ++i) { 677 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 678 } 679 } 680 681 return fileGroups; 682 } 683 684 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 685 @Override 686 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 687 TaskAttemptContext tac) throws IOException, InterruptedException { 688 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 689 } 690 691 @Override 692 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 693 Configuration conf = context.getConfiguration(); 694 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 695 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 696 697 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 698 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 699 if (mappers == 0 && snapshotFiles.size() > 0) { 700 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 701 mappers = Math.min(mappers, snapshotFiles.size()); 702 conf.setInt(CONF_NUM_SPLITS, mappers); 703 conf.setInt(MR_NUM_MAPS, mappers); 704 } 705 706 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 707 List<InputSplit> splits = new ArrayList(groups.size()); 708 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 709 splits.add(new ExportSnapshotInputSplit(files)); 710 } 711 return splits; 712 } 713 714 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 715 private List<Pair<BytesWritable, Long>> files; 716 private long length; 717 718 public ExportSnapshotInputSplit() { 719 this.files = null; 720 } 721 722 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 723 this.files = new ArrayList(snapshotFiles.size()); 724 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 725 this.files.add( 726 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 727 this.length += fileInfo.getSecond(); 728 } 729 } 730 731 private List<Pair<BytesWritable, Long>> getSplitKeys() { 732 return files; 733 } 734 735 @Override 736 public long getLength() throws IOException, InterruptedException { 737 return length; 738 } 739 740 @Override 741 public String[] getLocations() throws IOException, InterruptedException { 742 return new String[] {}; 743 } 744 745 @Override 746 public void readFields(DataInput in) throws IOException { 747 int count = in.readInt(); 748 files = new ArrayList<>(count); 749 length = 0; 750 for (int i = 0; i < count; ++i) { 751 BytesWritable fileInfo = new BytesWritable(); 752 fileInfo.readFields(in); 753 long size = in.readLong(); 754 files.add(new Pair<>(fileInfo, size)); 755 length += size; 756 } 757 } 758 759 @Override 760 public void write(DataOutput out) throws IOException { 761 out.writeInt(files.size()); 762 for (final Pair<BytesWritable, Long> fileInfo : files) { 763 fileInfo.getFirst().write(out); 764 out.writeLong(fileInfo.getSecond()); 765 } 766 } 767 } 768 769 private static class ExportSnapshotRecordReader 770 extends RecordReader<BytesWritable, NullWritable> { 771 private final List<Pair<BytesWritable, Long>> files; 772 private long totalSize = 0; 773 private long procSize = 0; 774 private int index = -1; 775 776 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 777 this.files = files; 778 for (Pair<BytesWritable, Long> fileInfo : files) { 779 totalSize += fileInfo.getSecond(); 780 } 781 } 782 783 @Override 784 public void close() { 785 } 786 787 @Override 788 public BytesWritable getCurrentKey() { 789 return files.get(index).getFirst(); 790 } 791 792 @Override 793 public NullWritable getCurrentValue() { 794 return NullWritable.get(); 795 } 796 797 @Override 798 public float getProgress() { 799 return (float) procSize / totalSize; 800 } 801 802 @Override 803 public void initialize(InputSplit split, TaskAttemptContext tac) { 804 } 805 806 @Override 807 public boolean nextKeyValue() { 808 if (index >= 0) { 809 procSize += files.get(index).getSecond(); 810 } 811 return (++index < files.size()); 812 } 813 } 814 } 815 816 // ========================================================================== 817 // Tool 818 // ========================================================================== 819 820 /** 821 * Run Map-Reduce Job to perform the files copy. 822 */ 823 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 824 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 825 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 826 throws IOException, InterruptedException, ClassNotFoundException { 827 Configuration conf = getConf(); 828 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 829 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 830 if (mappers > 0) { 831 conf.setInt(CONF_NUM_SPLITS, mappers); 832 conf.setInt(MR_NUM_MAPS, mappers); 833 } 834 conf.setInt(CONF_FILES_MODE, filesMode); 835 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 836 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 837 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 838 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 839 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 840 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 841 842 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 843 Job job = new Job(conf); 844 job.setJobName(jobname); 845 job.setJarByClass(ExportSnapshot.class); 846 TableMapReduceUtil.addDependencyJars(job); 847 job.setMapperClass(ExportMapper.class); 848 job.setInputFormatClass(ExportSnapshotInputFormat.class); 849 job.setOutputFormatClass(NullOutputFormat.class); 850 job.setMapSpeculativeExecution(false); 851 job.setNumReduceTasks(0); 852 853 // Acquire the delegation Tokens 854 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 855 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 856 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 857 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 858 859 // Run the MR Job 860 if (!job.waitForCompletion(true)) { 861 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 862 } 863 } 864 865 private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir, 866 final Path snapshotDir) throws IOException { 867 // Update the conf with the current root dir, since may be a different cluster 868 Configuration conf = new Configuration(baseConf); 869 CommonFSUtils.setRootDir(conf, rootDir); 870 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 871 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 872 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 873 } 874 875 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 876 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 877 ExecutorService pool = Executors 878 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 879 List<Future<Void>> futures = new ArrayList<>(); 880 for (Path dstPath : traversedPath) { 881 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 882 futures.add(future); 883 } 884 try { 885 for (Future<Void> future : futures) { 886 future.get(); 887 } 888 } catch (InterruptedException | ExecutionException e) { 889 throw new IOException(e); 890 } finally { 891 pool.shutdownNow(); 892 } 893 } 894 895 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 896 Configuration conf, List<Path> traversedPath) throws IOException { 897 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 898 try { 899 fs.setOwner(path, filesUser, filesGroup); 900 } catch (IOException e) { 901 throw new RuntimeException( 902 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 903 } 904 }, conf); 905 } 906 907 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 908 final List<Path> traversedPath, final Configuration conf) throws IOException { 909 if (filesMode <= 0) { 910 return; 911 } 912 FsPermission perm = new FsPermission(filesMode); 913 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 914 try { 915 fs.setPermission(path, perm); 916 } catch (IOException e) { 917 throw new RuntimeException( 918 "set permission for file " + path + " to " + filesMode + " failed", e); 919 } 920 }, conf); 921 } 922 923 private boolean verifyTarget = true; 924 private boolean verifySource = true; 925 private boolean verifyChecksum = true; 926 private String snapshotName = null; 927 private String targetName = null; 928 private boolean overwrite = false; 929 private String filesGroup = null; 930 private String filesUser = null; 931 private Path outputRoot = null; 932 private Path inputRoot = null; 933 private int bandwidthMB = Integer.MAX_VALUE; 934 private int filesMode = 0; 935 private int mappers = 0; 936 private boolean resetTtl = false; 937 938 @Override 939 protected void processOptions(CommandLine cmd) { 940 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 941 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 942 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 943 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 944 } 945 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 946 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 947 } 948 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 949 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 950 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 951 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 952 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 953 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 954 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 955 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 956 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 957 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 958 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 959 } 960 961 /** 962 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 963 * @return 0 on success, and != 0 upon failure. 964 */ 965 @Override 966 public int doWork() throws IOException { 967 Configuration conf = getConf(); 968 969 // Check user options 970 if (snapshotName == null) { 971 System.err.println("Snapshot name not provided."); 972 LOG.error("Use -h or --help for usage instructions."); 973 return 0; 974 } 975 976 if (outputRoot == null) { 977 System.err 978 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 979 LOG.error("Use -h or --help for usage instructions."); 980 return 0; 981 } 982 983 if (targetName == null) { 984 targetName = snapshotName; 985 } 986 if (inputRoot == null) { 987 inputRoot = CommonFSUtils.getRootDir(conf); 988 } else { 989 CommonFSUtils.setRootDir(conf, inputRoot); 990 } 991 992 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 993 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 994 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 995 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 996 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 997 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 998 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 999 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1000 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1001 Path snapshotTmpDir = 1002 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1003 Path outputSnapshotDir = 1004 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1005 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1006 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1007 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1008 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1009 1010 // Verify snapshot source before copying files 1011 if (verifySource) { 1012 LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.", 1013 inputFs.getUri(), inputRoot, snapshotDir); 1014 verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir); 1015 } 1016 1017 // Find the necessary directory which need to change owner and group 1018 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1019 if (outputFs.exists(needSetOwnerDir)) { 1020 if (skipTmp) { 1021 needSetOwnerDir = outputSnapshotDir; 1022 } else { 1023 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1024 if (outputFs.exists(needSetOwnerDir)) { 1025 needSetOwnerDir = snapshotTmpDir; 1026 } 1027 } 1028 } 1029 1030 // Check if the snapshot already exists 1031 if (outputFs.exists(outputSnapshotDir)) { 1032 if (overwrite) { 1033 if (!outputFs.delete(outputSnapshotDir, true)) { 1034 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1035 return 1; 1036 } 1037 } else { 1038 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1039 + outputSnapshotDir); 1040 return 1; 1041 } 1042 } 1043 1044 if (!skipTmp) { 1045 // Check if the snapshot already in-progress 1046 if (outputFs.exists(snapshotTmpDir)) { 1047 if (overwrite) { 1048 if (!outputFs.delete(snapshotTmpDir, true)) { 1049 System.err 1050 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1051 return 1; 1052 } 1053 } else { 1054 System.err 1055 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1056 System.err 1057 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1058 System.err 1059 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1060 return 1; 1061 } 1062 } 1063 } 1064 1065 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1066 // The snapshot references must be copied before the hfiles otherwise the cleaner 1067 // will remove them because they are unreferenced. 1068 List<Path> travesedPaths = new ArrayList<>(); 1069 boolean copySucceeded = false; 1070 try { 1071 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1072 travesedPaths = 1073 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1074 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1075 copySucceeded = true; 1076 } catch (IOException e) { 1077 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1078 + " to=" + initialOutputSnapshotDir, e); 1079 } finally { 1080 if (copySucceeded) { 1081 if (filesUser != null || filesGroup != null) { 1082 LOG.warn( 1083 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1084 + (filesGroup == null 1085 ? "" 1086 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1087 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1088 } 1089 if (filesMode > 0) { 1090 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1091 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1092 } 1093 } 1094 } 1095 1096 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1097 // reset TTL for target snapshot. 1098 if (!targetName.equals(snapshotName) || resetTtl) { 1099 SnapshotDescription.Builder snapshotDescBuilder = 1100 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1101 if (!targetName.equals(snapshotName)) { 1102 snapshotDescBuilder.setName(targetName); 1103 } 1104 if (resetTtl) { 1105 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1106 } 1107 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1108 initialOutputSnapshotDir, outputFs); 1109 if (filesUser != null || filesGroup != null) { 1110 outputFs.setOwner( 1111 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1112 filesGroup); 1113 } 1114 if (filesMode > 0) { 1115 outputFs.setPermission( 1116 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1117 new FsPermission((short) filesMode)); 1118 } 1119 } 1120 1121 // Step 2 - Start MR Job to copy files 1122 // The snapshot references must be copied before the files otherwise the files gets removed 1123 // by the HFileArchiver, since they have no references. 1124 try { 1125 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1126 filesGroup, filesMode, mappers, bandwidthMB); 1127 1128 LOG.info("Finalize the Snapshot Export"); 1129 if (!skipTmp) { 1130 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1131 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1132 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1133 + snapshotTmpDir + " to=" + outputSnapshotDir); 1134 } 1135 } 1136 1137 // Step 4 - Verify snapshot integrity 1138 if (verifyTarget) { 1139 LOG.info("Verify snapshot integrity"); 1140 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir); 1141 } 1142 1143 LOG.info("Export Completed: " + targetName); 1144 return 0; 1145 } catch (Exception e) { 1146 LOG.error("Snapshot export failed", e); 1147 if (!skipTmp) { 1148 outputFs.delete(snapshotTmpDir, true); 1149 } 1150 outputFs.delete(outputSnapshotDir, true); 1151 return 1; 1152 } finally { 1153 IOUtils.closeStream(inputFs); 1154 IOUtils.closeStream(outputFs); 1155 } 1156 } 1157 1158 @Override 1159 protected void printUsage() { 1160 super.printUsage(); 1161 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1162 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1163 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1164 + " hbase snapshot export \\\n" 1165 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1166 + " --copy-to hdfs://srv1:50070/hbase"); 1167 } 1168 1169 @Override 1170 protected void addOptions() { 1171 addRequiredOption(Options.SNAPSHOT); 1172 addOption(Options.COPY_TO); 1173 addOption(Options.COPY_FROM); 1174 addOption(Options.TARGET_NAME); 1175 addOption(Options.NO_CHECKSUM_VERIFY); 1176 addOption(Options.NO_TARGET_VERIFY); 1177 addOption(Options.NO_SOURCE_VERIFY); 1178 addOption(Options.OVERWRITE); 1179 addOption(Options.CHUSER); 1180 addOption(Options.CHGROUP); 1181 addOption(Options.CHMOD); 1182 addOption(Options.MAPPERS); 1183 addOption(Options.BANDWIDTH); 1184 addOption(Options.RESET_TTL); 1185 } 1186 1187 public static void main(String[] args) { 1188 new ExportSnapshot().doStaticMain(args); 1189 } 1190}