001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.function.BiConsumer; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FSDataInputStream; 038import org.apache.hadoop.fs.FSDataOutputStream; 039import org.apache.hadoop.fs.FileChecksum; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsPermission; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.io.FileLink; 049import org.apache.hadoop.hbase.io.HFileLink; 050import org.apache.hadoop.hbase.io.WALLink; 051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.mob.MobUtils; 054import org.apache.hadoop.hbase.util.AbstractHBaseTool; 055import org.apache.hadoop.hbase.util.CommonFSUtils; 056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 057import org.apache.hadoop.hbase.util.FSUtils; 058import org.apache.hadoop.hbase.util.HFileArchiveUtil; 059import org.apache.hadoop.hbase.util.Pair; 060import org.apache.hadoop.io.BytesWritable; 061import org.apache.hadoop.io.IOUtils; 062import org.apache.hadoop.io.NullWritable; 063import org.apache.hadoop.io.Writable; 064import org.apache.hadoop.mapreduce.InputFormat; 065import org.apache.hadoop.mapreduce.InputSplit; 066import org.apache.hadoop.mapreduce.Job; 067import org.apache.hadoop.mapreduce.JobContext; 068import org.apache.hadoop.mapreduce.Mapper; 069import org.apache.hadoop.mapreduce.RecordReader; 070import org.apache.hadoop.mapreduce.TaskAttemptContext; 071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 072import org.apache.hadoop.mapreduce.security.TokenCache; 073import org.apache.hadoop.util.StringUtils; 074import org.apache.hadoop.util.Tool; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 081 082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 085 086/** 087 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 088 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 089 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 090 */ 091@InterfaceAudience.Public 092public class ExportSnapshot extends AbstractHBaseTool implements Tool { 093 public static final String NAME = "exportsnapshot"; 094 /** Configuration prefix for overrides for the source filesystem */ 095 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 096 /** Configuration prefix for overrides for the destination filesystem */ 097 public static final String CONF_DEST_PREFIX = NAME + ".to."; 098 099 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 100 101 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 102 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 103 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 104 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 105 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 106 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 107 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 108 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 109 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 110 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 111 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 112 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 113 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 114 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 115 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 116 private static final String CONF_COPY_MANIFEST_THREADS = 117 "snapshot.export.copy.references.threads"; 118 private static final int DEFAULT_COPY_MANIFEST_THREADS = 119 Runtime.getRuntime().availableProcessors(); 120 121 static class Testing { 122 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 123 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 124 int failuresCountToInject = 0; 125 int injectedFailureCount = 0; 126 } 127 128 // Command line options and defaults. 129 static final class Options { 130 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 131 static final Option TARGET_NAME = 132 new Option(null, "target", true, "Target name for the snapshot."); 133 static final Option COPY_TO = 134 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 135 static final Option COPY_FROM = 136 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 137 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 138 "Do not verify checksum, use name+length only."); 139 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 140 "Do not verify the integrity of the exported snapshot."); 141 static final Option NO_SOURCE_VERIFY = 142 new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot."); 143 static final Option OVERWRITE = 144 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 145 static final Option CHUSER = 146 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 147 static final Option CHGROUP = 148 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 149 static final Option CHMOD = 150 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 151 static final Option MAPPERS = new Option(null, "mappers", true, 152 "Number of mappers to use during the copy (mapreduce.job.maps)."); 153 static final Option BANDWIDTH = 154 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 155 static final Option RESET_TTL = 156 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 157 } 158 159 // Export Map-Reduce Counters, to keep track of the progress 160 public enum Counter { 161 MISSING_FILES, 162 FILES_COPIED, 163 FILES_SKIPPED, 164 COPY_FAILED, 165 BYTES_EXPECTED, 166 BYTES_SKIPPED, 167 BYTES_COPIED 168 } 169 170 private static class ExportMapper 171 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 172 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 173 final static int REPORT_SIZE = 1 * 1024 * 1024; 174 final static int BUFFER_SIZE = 64 * 1024; 175 176 private boolean verifyChecksum; 177 private String filesGroup; 178 private String filesUser; 179 private short filesMode; 180 private int bufferSize; 181 182 private FileSystem outputFs; 183 private Path outputArchive; 184 private Path outputRoot; 185 186 private FileSystem inputFs; 187 private Path inputArchive; 188 private Path inputRoot; 189 190 private static Testing testing = new Testing(); 191 192 @Override 193 public void setup(Context context) throws IOException { 194 Configuration conf = context.getConfiguration(); 195 196 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 197 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 198 199 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 200 201 filesGroup = conf.get(CONF_FILES_GROUP); 202 filesUser = conf.get(CONF_FILES_USER); 203 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 204 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 205 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 206 207 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 208 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 209 210 try { 211 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 212 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 213 } catch (IOException e) { 214 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 215 } 216 217 try { 218 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 219 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 220 } catch (IOException e) { 221 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 222 } 223 224 // Use the default block size of the outputFs if bigger 225 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 226 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 227 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 228 229 for (Counter c : Counter.values()) { 230 context.getCounter(c).increment(0); 231 } 232 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 233 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 234 // Get number of times we have already injected failure based on attempt number of this 235 // task. 236 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 237 } 238 } 239 240 @Override 241 protected void cleanup(Context context) { 242 IOUtils.closeStream(inputFs); 243 IOUtils.closeStream(outputFs); 244 } 245 246 @Override 247 public void map(BytesWritable key, NullWritable value, Context context) 248 throws InterruptedException, IOException { 249 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 250 Path outputPath = getOutputPath(inputInfo); 251 252 copyFile(context, inputInfo, outputPath); 253 } 254 255 /** 256 * Returns the location where the inputPath will be copied. 257 */ 258 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 259 Path path = null; 260 switch (inputInfo.getType()) { 261 case HFILE: 262 Path inputPath = new Path(inputInfo.getHfile()); 263 String family = inputPath.getParent().getName(); 264 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 265 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 266 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 267 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 268 new Path(region, new Path(family, hfile))); 269 break; 270 case WAL: 271 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 272 break; 273 default: 274 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 275 } 276 return new Path(outputArchive, path); 277 } 278 279 @SuppressWarnings("checkstyle:linelength") 280 /** 281 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 282 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 283 */ 284 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 285 throws IOException { 286 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 287 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 288 testing.injectedFailureCount++; 289 context.getCounter(Counter.COPY_FAILED).increment(1); 290 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 291 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 292 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 293 } 294 295 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 296 final Path outputPath) throws IOException { 297 // Get the file information 298 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 299 300 // Verify if the output file exists and is the same that we want to copy 301 if (outputFs.exists(outputPath)) { 302 FileStatus outputStat = outputFs.getFileStatus(outputPath); 303 if (outputStat != null && sameFile(inputStat, outputStat)) { 304 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 305 context.getCounter(Counter.FILES_SKIPPED).increment(1); 306 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 307 return; 308 } 309 } 310 311 InputStream in = openSourceFile(context, inputInfo); 312 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 313 if (Integer.MAX_VALUE != bandwidthMB) { 314 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 315 } 316 317 try { 318 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 319 320 // Ensure that the output folder is there and copy the file 321 createOutputPath(outputPath.getParent()); 322 FSDataOutputStream out = outputFs.create(outputPath, true); 323 try { 324 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); 325 } finally { 326 out.close(); 327 } 328 329 // Try to Preserve attributes 330 if (!preserveAttributes(outputPath, inputStat)) { 331 LOG.warn("You may have to run manually chown on: " + outputPath); 332 } 333 } finally { 334 in.close(); 335 injectTestFailure(context, inputInfo); 336 } 337 } 338 339 /** 340 * Create the output folder and optionally set ownership. 341 */ 342 private void createOutputPath(final Path path) throws IOException { 343 if (filesUser == null && filesGroup == null) { 344 outputFs.mkdirs(path); 345 } else { 346 Path parent = path.getParent(); 347 if (!outputFs.exists(parent) && !parent.isRoot()) { 348 createOutputPath(parent); 349 } 350 outputFs.mkdirs(path); 351 if (filesUser != null || filesGroup != null) { 352 // override the owner when non-null user/group is specified 353 outputFs.setOwner(path, filesUser, filesGroup); 354 } 355 if (filesMode > 0) { 356 outputFs.setPermission(path, new FsPermission(filesMode)); 357 } 358 } 359 } 360 361 /** 362 * Try to Preserve the files attribute selected by the user copying them from the source file 363 * This is only required when you are exporting as a different user than "hbase" or on a system 364 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 365 * can force a chmod with the user that knows is available on the system. 366 */ 367 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 368 FileStatus stat; 369 try { 370 stat = outputFs.getFileStatus(path); 371 } catch (IOException e) { 372 LOG.warn("Unable to get the status for file=" + path); 373 return false; 374 } 375 376 try { 377 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 378 outputFs.setPermission(path, new FsPermission(filesMode)); 379 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 380 outputFs.setPermission(path, refStat.getPermission()); 381 } 382 } catch (IOException e) { 383 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 384 return false; 385 } 386 387 boolean hasRefStat = (refStat != null); 388 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 389 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 390 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 391 try { 392 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 393 outputFs.setOwner(path, user, group); 394 } 395 } catch (IOException e) { 396 LOG.warn( 397 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 398 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 399 + " group=" + group); 400 return false; 401 } 402 } 403 404 return true; 405 } 406 407 private boolean stringIsNotEmpty(final String str) { 408 return str != null && str.length() > 0; 409 } 410 411 private void copyData(final Context context, final Path inputPath, final InputStream in, 412 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 413 throws IOException { 414 final String statusMessage = 415 "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)"; 416 417 try { 418 byte[] buffer = new byte[bufferSize]; 419 long totalBytesWritten = 0; 420 int reportBytes = 0; 421 int bytesRead; 422 423 long stime = EnvironmentEdgeManager.currentTime(); 424 while ((bytesRead = in.read(buffer)) > 0) { 425 out.write(buffer, 0, bytesRead); 426 totalBytesWritten += bytesRead; 427 reportBytes += bytesRead; 428 429 if (reportBytes >= REPORT_SIZE) { 430 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 431 context.setStatus( 432 String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 433 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 434 + " to " + outputPath); 435 reportBytes = 0; 436 } 437 } 438 long etime = EnvironmentEdgeManager.currentTime(); 439 440 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 441 context 442 .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten), 443 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 444 + outputPath); 445 446 // Verify that the written size match 447 if (totalBytesWritten != inputFileSize) { 448 String msg = "number of bytes copied not matching copied=" + totalBytesWritten 449 + " expected=" + inputFileSize + " for file=" + inputPath; 450 throw new IOException(msg); 451 } 452 453 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 454 LOG 455 .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten) 456 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String 457 .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 458 context.getCounter(Counter.FILES_COPIED).increment(1); 459 } catch (IOException e) { 460 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 461 context.getCounter(Counter.COPY_FAILED).increment(1); 462 throw e; 463 } 464 } 465 466 /** 467 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 468 * fail or if the file is not found. 469 */ 470 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 471 throws IOException { 472 try { 473 Configuration conf = context.getConfiguration(); 474 FileLink link = null; 475 switch (fileInfo.getType()) { 476 case HFILE: 477 Path inputPath = new Path(fileInfo.getHfile()); 478 link = getFileLink(inputPath, conf); 479 break; 480 case WAL: 481 String serverName = fileInfo.getWalServer(); 482 String logName = fileInfo.getWalName(); 483 link = new WALLink(inputRoot, serverName, logName); 484 break; 485 default: 486 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 487 } 488 return link.open(inputFs); 489 } catch (IOException e) { 490 context.getCounter(Counter.MISSING_FILES).increment(1); 491 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 492 throw e; 493 } 494 } 495 496 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 497 throws IOException { 498 try { 499 Configuration conf = context.getConfiguration(); 500 FileLink link = null; 501 switch (fileInfo.getType()) { 502 case HFILE: 503 Path inputPath = new Path(fileInfo.getHfile()); 504 link = getFileLink(inputPath, conf); 505 break; 506 case WAL: 507 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 508 break; 509 default: 510 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 511 } 512 return link.getFileStatus(inputFs); 513 } catch (FileNotFoundException e) { 514 context.getCounter(Counter.MISSING_FILES).increment(1); 515 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 516 throw e; 517 } catch (IOException e) { 518 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 519 throw e; 520 } 521 } 522 523 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 524 String regionName = HFileLink.getReferencedRegionName(path.getName()); 525 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 526 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 527 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 528 HFileArchiveUtil.getArchivePath(conf), path); 529 } 530 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 531 } 532 533 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 534 try { 535 return fs.getFileChecksum(path); 536 } catch (IOException e) { 537 LOG.warn("Unable to get checksum for file=" + path, e); 538 return null; 539 } 540 } 541 542 /** 543 * Check if the two files are equal by looking at the file length, and at the checksum (if user 544 * has specified the verifyChecksum flag). 545 */ 546 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 547 // Not matching length 548 if (inputStat.getLen() != outputStat.getLen()) return false; 549 550 // Mark files as equals, since user asked for no checksum verification 551 if (!verifyChecksum) return true; 552 553 // If checksums are not available, files are not the same. 554 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 555 if (inChecksum == null) return false; 556 557 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 558 if (outChecksum == null) return false; 559 560 return inChecksum.equals(outChecksum); 561 } 562 } 563 564 // ========================================================================== 565 // Input Format 566 // ========================================================================== 567 568 /** 569 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 570 * @return list of files referenced by the snapshot (pair of path and size) 571 */ 572 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 573 final FileSystem fs, final Path snapshotDir) throws IOException { 574 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 575 576 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 577 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 578 579 // Get snapshot files 580 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 581 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 582 new SnapshotReferenceUtil.SnapshotVisitor() { 583 @Override 584 public void storeFile(final RegionInfo regionInfo, final String family, 585 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 586 // for storeFile.hasReference() case, copied as part of the manifest 587 if (!storeFile.hasReference()) { 588 String region = regionInfo.getEncodedName(); 589 String hfile = storeFile.getName(); 590 Path path = HFileLink.createPath(table, region, family, hfile); 591 592 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder() 593 .setType(SnapshotFileInfo.Type.HFILE).setHfile(path.toString()).build(); 594 595 long size; 596 if (storeFile.hasFileSize()) { 597 size = storeFile.getFileSize(); 598 } else { 599 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 600 } 601 files.add(new Pair<>(fileInfo, size)); 602 } 603 } 604 }); 605 606 return files; 607 } 608 609 /** 610 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 611 * The groups created will have similar amounts of bytes. 612 * <p> 613 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 614 * group fetch the bigger file available, iterating through groups alternating the direction. 615 */ 616 static List<List<Pair<SnapshotFileInfo, Long>>> 617 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 618 // Sort files by size, from small to big 619 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 620 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 621 long r = a.getSecond() - b.getSecond(); 622 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 623 } 624 }); 625 626 // create balanced groups 627 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 628 long[] sizeGroups = new long[ngroups]; 629 int hi = files.size() - 1; 630 int lo = 0; 631 632 List<Pair<SnapshotFileInfo, Long>> group; 633 int dir = 1; 634 int g = 0; 635 636 while (hi >= lo) { 637 if (g == fileGroups.size()) { 638 group = new LinkedList<>(); 639 fileGroups.add(group); 640 } else { 641 group = fileGroups.get(g); 642 } 643 644 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 645 646 // add the hi one 647 sizeGroups[g] += fileInfo.getSecond(); 648 group.add(fileInfo); 649 650 // change direction when at the end or the beginning 651 g += dir; 652 if (g == ngroups) { 653 dir = -1; 654 g = ngroups - 1; 655 } else if (g < 0) { 656 dir = 1; 657 g = 0; 658 } 659 } 660 661 if (LOG.isDebugEnabled()) { 662 for (int i = 0; i < sizeGroups.length; ++i) { 663 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 664 } 665 } 666 667 return fileGroups; 668 } 669 670 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 671 @Override 672 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 673 TaskAttemptContext tac) throws IOException, InterruptedException { 674 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 675 } 676 677 @Override 678 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 679 Configuration conf = context.getConfiguration(); 680 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 681 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 682 683 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 684 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 685 if (mappers == 0 && snapshotFiles.size() > 0) { 686 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 687 mappers = Math.min(mappers, snapshotFiles.size()); 688 conf.setInt(CONF_NUM_SPLITS, mappers); 689 conf.setInt(MR_NUM_MAPS, mappers); 690 } 691 692 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 693 List<InputSplit> splits = new ArrayList(groups.size()); 694 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 695 splits.add(new ExportSnapshotInputSplit(files)); 696 } 697 return splits; 698 } 699 700 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 701 private List<Pair<BytesWritable, Long>> files; 702 private long length; 703 704 public ExportSnapshotInputSplit() { 705 this.files = null; 706 } 707 708 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 709 this.files = new ArrayList(snapshotFiles.size()); 710 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 711 this.files.add( 712 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 713 this.length += fileInfo.getSecond(); 714 } 715 } 716 717 private List<Pair<BytesWritable, Long>> getSplitKeys() { 718 return files; 719 } 720 721 @Override 722 public long getLength() throws IOException, InterruptedException { 723 return length; 724 } 725 726 @Override 727 public String[] getLocations() throws IOException, InterruptedException { 728 return new String[] {}; 729 } 730 731 @Override 732 public void readFields(DataInput in) throws IOException { 733 int count = in.readInt(); 734 files = new ArrayList<>(count); 735 length = 0; 736 for (int i = 0; i < count; ++i) { 737 BytesWritable fileInfo = new BytesWritable(); 738 fileInfo.readFields(in); 739 long size = in.readLong(); 740 files.add(new Pair<>(fileInfo, size)); 741 length += size; 742 } 743 } 744 745 @Override 746 public void write(DataOutput out) throws IOException { 747 out.writeInt(files.size()); 748 for (final Pair<BytesWritable, Long> fileInfo : files) { 749 fileInfo.getFirst().write(out); 750 out.writeLong(fileInfo.getSecond()); 751 } 752 } 753 } 754 755 private static class ExportSnapshotRecordReader 756 extends RecordReader<BytesWritable, NullWritable> { 757 private final List<Pair<BytesWritable, Long>> files; 758 private long totalSize = 0; 759 private long procSize = 0; 760 private int index = -1; 761 762 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 763 this.files = files; 764 for (Pair<BytesWritable, Long> fileInfo : files) { 765 totalSize += fileInfo.getSecond(); 766 } 767 } 768 769 @Override 770 public void close() { 771 } 772 773 @Override 774 public BytesWritable getCurrentKey() { 775 return files.get(index).getFirst(); 776 } 777 778 @Override 779 public NullWritable getCurrentValue() { 780 return NullWritable.get(); 781 } 782 783 @Override 784 public float getProgress() { 785 return (float) procSize / totalSize; 786 } 787 788 @Override 789 public void initialize(InputSplit split, TaskAttemptContext tac) { 790 } 791 792 @Override 793 public boolean nextKeyValue() { 794 if (index >= 0) { 795 procSize += files.get(index).getSecond(); 796 } 797 return (++index < files.size()); 798 } 799 } 800 } 801 802 // ========================================================================== 803 // Tool 804 // ========================================================================== 805 806 /** 807 * Run Map-Reduce Job to perform the files copy. 808 */ 809 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 810 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 811 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB) 812 throws IOException, InterruptedException, ClassNotFoundException { 813 Configuration conf = getConf(); 814 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 815 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 816 if (mappers > 0) { 817 conf.setInt(CONF_NUM_SPLITS, mappers); 818 conf.setInt(MR_NUM_MAPS, mappers); 819 } 820 conf.setInt(CONF_FILES_MODE, filesMode); 821 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 822 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 823 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 824 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 825 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 826 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 827 828 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 829 Job job = new Job(conf); 830 job.setJobName(jobname); 831 job.setJarByClass(ExportSnapshot.class); 832 TableMapReduceUtil.addDependencyJars(job); 833 job.setMapperClass(ExportMapper.class); 834 job.setInputFormatClass(ExportSnapshotInputFormat.class); 835 job.setOutputFormatClass(NullOutputFormat.class); 836 job.setMapSpeculativeExecution(false); 837 job.setNumReduceTasks(0); 838 839 // Acquire the delegation Tokens 840 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 841 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 842 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 843 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 844 845 // Run the MR Job 846 if (!job.waitForCompletion(true)) { 847 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 848 } 849 } 850 851 private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir, 852 final Path snapshotDir) throws IOException { 853 // Update the conf with the current root dir, since may be a different cluster 854 Configuration conf = new Configuration(baseConf); 855 CommonFSUtils.setRootDir(conf, rootDir); 856 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 857 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 858 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 859 } 860 861 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 862 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 863 ExecutorService pool = Executors 864 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 865 List<Future<Void>> futures = new ArrayList<>(); 866 for (Path dstPath : traversedPath) { 867 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 868 futures.add(future); 869 } 870 try { 871 for (Future<Void> future : futures) { 872 future.get(); 873 } 874 } catch (InterruptedException | ExecutionException e) { 875 throw new IOException(e); 876 } finally { 877 pool.shutdownNow(); 878 } 879 } 880 881 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 882 Configuration conf, List<Path> traversedPath) throws IOException { 883 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 884 try { 885 fs.setOwner(path, filesUser, filesGroup); 886 } catch (IOException e) { 887 throw new RuntimeException( 888 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 889 } 890 }, conf); 891 } 892 893 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 894 final List<Path> traversedPath, final Configuration conf) throws IOException { 895 if (filesMode <= 0) { 896 return; 897 } 898 FsPermission perm = new FsPermission(filesMode); 899 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 900 try { 901 fs.setPermission(path, perm); 902 } catch (IOException e) { 903 throw new RuntimeException( 904 "set permission for file " + path + " to " + filesMode + " failed", e); 905 } 906 }, conf); 907 } 908 909 private boolean verifyTarget = true; 910 private boolean verifySource = true; 911 private boolean verifyChecksum = true; 912 private String snapshotName = null; 913 private String targetName = null; 914 private boolean overwrite = false; 915 private String filesGroup = null; 916 private String filesUser = null; 917 private Path outputRoot = null; 918 private Path inputRoot = null; 919 private int bandwidthMB = Integer.MAX_VALUE; 920 private int filesMode = 0; 921 private int mappers = 0; 922 private boolean resetTtl = false; 923 924 @Override 925 protected void processOptions(CommandLine cmd) { 926 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 927 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 928 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 929 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 930 } 931 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 932 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 933 } 934 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 935 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 936 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 937 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 938 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 939 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 940 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 941 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 942 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 943 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 944 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 945 } 946 947 /** 948 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 949 * @return 0 on success, and != 0 upon failure. 950 */ 951 @Override 952 public int doWork() throws IOException { 953 Configuration conf = getConf(); 954 955 // Check user options 956 if (snapshotName == null) { 957 System.err.println("Snapshot name not provided."); 958 LOG.error("Use -h or --help for usage instructions."); 959 return 0; 960 } 961 962 if (outputRoot == null) { 963 System.err 964 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 965 LOG.error("Use -h or --help for usage instructions."); 966 return 0; 967 } 968 969 if (targetName == null) { 970 targetName = snapshotName; 971 } 972 if (inputRoot == null) { 973 inputRoot = CommonFSUtils.getRootDir(conf); 974 } else { 975 CommonFSUtils.setRootDir(conf, inputRoot); 976 } 977 978 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 979 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 980 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 981 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 982 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 983 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 984 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 985 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 986 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 987 Path snapshotTmpDir = 988 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 989 Path outputSnapshotDir = 990 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 991 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 992 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 993 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 994 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 995 996 // Verify snapshot source before copying files 997 if (verifySource) { 998 LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.", 999 inputFs.getUri(), inputRoot, snapshotDir); 1000 verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir); 1001 } 1002 1003 // Find the necessary directory which need to change owner and group 1004 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1005 if (outputFs.exists(needSetOwnerDir)) { 1006 if (skipTmp) { 1007 needSetOwnerDir = outputSnapshotDir; 1008 } else { 1009 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1010 if (outputFs.exists(needSetOwnerDir)) { 1011 needSetOwnerDir = snapshotTmpDir; 1012 } 1013 } 1014 } 1015 1016 // Check if the snapshot already exists 1017 if (outputFs.exists(outputSnapshotDir)) { 1018 if (overwrite) { 1019 if (!outputFs.delete(outputSnapshotDir, true)) { 1020 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1021 return 1; 1022 } 1023 } else { 1024 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1025 + outputSnapshotDir); 1026 return 1; 1027 } 1028 } 1029 1030 if (!skipTmp) { 1031 // Check if the snapshot already in-progress 1032 if (outputFs.exists(snapshotTmpDir)) { 1033 if (overwrite) { 1034 if (!outputFs.delete(snapshotTmpDir, true)) { 1035 System.err 1036 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1037 return 1; 1038 } 1039 } else { 1040 System.err 1041 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1042 System.err 1043 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1044 System.err 1045 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1046 return 1; 1047 } 1048 } 1049 } 1050 1051 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1052 // The snapshot references must be copied before the hfiles otherwise the cleaner 1053 // will remove them because they are unreferenced. 1054 List<Path> travesedPaths = new ArrayList<>(); 1055 boolean copySucceeded = false; 1056 try { 1057 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1058 travesedPaths = 1059 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1060 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1061 copySucceeded = true; 1062 } catch (IOException e) { 1063 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1064 + " to=" + initialOutputSnapshotDir, e); 1065 } finally { 1066 if (copySucceeded) { 1067 if (filesUser != null || filesGroup != null) { 1068 LOG.warn( 1069 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1070 + (filesGroup == null 1071 ? "" 1072 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1073 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1074 } 1075 if (filesMode > 0) { 1076 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1077 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1078 } 1079 } 1080 } 1081 1082 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1083 // reset TTL for target snapshot. 1084 if (!targetName.equals(snapshotName) || resetTtl) { 1085 SnapshotDescription.Builder snapshotDescBuilder = 1086 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1087 if (!targetName.equals(snapshotName)) { 1088 snapshotDescBuilder.setName(targetName); 1089 } 1090 if (resetTtl) { 1091 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1092 } 1093 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1094 initialOutputSnapshotDir, outputFs); 1095 if (filesUser != null || filesGroup != null) { 1096 outputFs.setOwner( 1097 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1098 filesGroup); 1099 } 1100 if (filesMode > 0) { 1101 outputFs.setPermission( 1102 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1103 new FsPermission((short) filesMode)); 1104 } 1105 } 1106 1107 // Step 2 - Start MR Job to copy files 1108 // The snapshot references must be copied before the files otherwise the files gets removed 1109 // by the HFileArchiver, since they have no references. 1110 try { 1111 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1112 filesGroup, filesMode, mappers, bandwidthMB); 1113 1114 LOG.info("Finalize the Snapshot Export"); 1115 if (!skipTmp) { 1116 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1117 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1118 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1119 + snapshotTmpDir + " to=" + outputSnapshotDir); 1120 } 1121 } 1122 1123 // Step 4 - Verify snapshot integrity 1124 if (verifyTarget) { 1125 LOG.info("Verify snapshot integrity"); 1126 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir); 1127 } 1128 1129 LOG.info("Export Completed: " + targetName); 1130 return 0; 1131 } catch (Exception e) { 1132 LOG.error("Snapshot export failed", e); 1133 if (!skipTmp) { 1134 outputFs.delete(snapshotTmpDir, true); 1135 } 1136 outputFs.delete(outputSnapshotDir, true); 1137 return 1; 1138 } finally { 1139 IOUtils.closeStream(inputFs); 1140 IOUtils.closeStream(outputFs); 1141 } 1142 } 1143 1144 @Override 1145 protected void printUsage() { 1146 super.printUsage(); 1147 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1148 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1149 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1150 + " hbase snapshot export \\\n" 1151 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1152 + " --copy-to hdfs://srv1:50070/hbase"); 1153 } 1154 1155 @Override 1156 protected void addOptions() { 1157 addRequiredOption(Options.SNAPSHOT); 1158 addOption(Options.COPY_TO); 1159 addOption(Options.COPY_FROM); 1160 addOption(Options.TARGET_NAME); 1161 addOption(Options.NO_CHECKSUM_VERIFY); 1162 addOption(Options.NO_TARGET_VERIFY); 1163 addOption(Options.NO_SOURCE_VERIFY); 1164 addOption(Options.OVERWRITE); 1165 addOption(Options.CHUSER); 1166 addOption(Options.CHGROUP); 1167 addOption(Options.CHMOD); 1168 addOption(Options.MAPPERS); 1169 addOption(Options.BANDWIDTH); 1170 addOption(Options.RESET_TTL); 1171 } 1172 1173 public static void main(String[] args) { 1174 new ExportSnapshot().doStaticMain(args); 1175 } 1176}