001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Executors; 038import java.util.concurrent.Future; 039import java.util.function.BiConsumer; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileChecksum; 044import org.apache.hadoop.fs.FileStatus; 045import org.apache.hadoop.fs.FileSystem; 046import org.apache.hadoop.fs.Path; 047import org.apache.hadoop.fs.permission.FsPermission; 048import org.apache.hadoop.hbase.HBaseConfiguration; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.io.FileLink; 053import org.apache.hadoop.hbase.io.HFileLink; 054import org.apache.hadoop.hbase.io.WALLink; 055import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 056import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 057import org.apache.hadoop.hbase.mob.MobUtils; 058import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 059import org.apache.hadoop.hbase.util.AbstractHBaseTool; 060import org.apache.hadoop.hbase.util.CommonFSUtils; 061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 062import org.apache.hadoop.hbase.util.FSUtils; 063import org.apache.hadoop.hbase.util.HFileArchiveUtil; 064import org.apache.hadoop.hbase.util.Pair; 065import org.apache.hadoop.hbase.util.Strings; 066import org.apache.hadoop.io.BytesWritable; 067import org.apache.hadoop.io.NullWritable; 068import org.apache.hadoop.io.Writable; 069import org.apache.hadoop.mapreduce.InputFormat; 070import org.apache.hadoop.mapreduce.InputSplit; 071import org.apache.hadoop.mapreduce.Job; 072import org.apache.hadoop.mapreduce.JobContext; 073import org.apache.hadoop.mapreduce.Mapper; 074import org.apache.hadoop.mapreduce.RecordReader; 075import org.apache.hadoop.mapreduce.TaskAttemptContext; 076import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 077import org.apache.hadoop.mapreduce.security.TokenCache; 078import org.apache.hadoop.util.StringUtils; 079import org.apache.hadoop.util.Tool; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 085import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 091 092/** 093 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 094 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 095 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 096 */ 097@InterfaceAudience.Public 098public class ExportSnapshot extends AbstractHBaseTool implements Tool { 099 public static final String NAME = "exportsnapshot"; 100 /** Configuration prefix for overrides for the source filesystem */ 101 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 102 /** Configuration prefix for overrides for the destination filesystem */ 103 public static final String CONF_DEST_PREFIX = NAME + ".to."; 104 105 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 106 107 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 108 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 109 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 110 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 111 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 112 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 113 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 114 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 115 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 116 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 117 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 118 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 119 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 120 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 121 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 122 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 123 private static final String CONF_COPY_MANIFEST_THREADS = 124 "snapshot.export.copy.references.threads"; 125 private static final int DEFAULT_COPY_MANIFEST_THREADS = 126 Runtime.getRuntime().availableProcessors(); 127 private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family"; 128 129 static class Testing { 130 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 131 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 132 int failuresCountToInject = 0; 133 int injectedFailureCount = 0; 134 } 135 136 // Command line options and defaults. 137 static final class Options { 138 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 139 static final Option TARGET_NAME = 140 new Option(null, "target", true, "Target name for the snapshot."); 141 static final Option COPY_TO = 142 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 143 static final Option COPY_FROM = 144 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 145 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 146 "Do not verify checksum, use name+length only."); 147 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 148 "Do not verify the exported snapshot's expiration status and integrity."); 149 static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false, 150 "Do not verify the source snapshot's expiration status and integrity."); 151 static final Option OVERWRITE = 152 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 153 static final Option CHUSER = 154 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 155 static final Option CHGROUP = 156 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 157 static final Option CHMOD = 158 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 159 static final Option MAPPERS = new Option(null, "mappers", true, 160 "Number of mappers to use during the copy (mapreduce.job.maps)."); 161 static final Option BANDWIDTH = 162 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 163 static final Option RESET_TTL = 164 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 165 static final Option STORAGE_POLICY = new Option(null, "storage-policy", true, 166 "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD"); 167 } 168 169 // Export Map-Reduce Counters, to keep track of the progress 170 public enum Counter { 171 MISSING_FILES, 172 FILES_COPIED, 173 FILES_SKIPPED, 174 COPY_FAILED, 175 BYTES_EXPECTED, 176 BYTES_SKIPPED, 177 BYTES_COPIED 178 } 179 180 /** 181 * Indicates the checksum comparison result. 182 */ 183 public enum ChecksumComparison { 184 TRUE, // checksum comparison is compatible and true. 185 FALSE, // checksum comparison is compatible and false. 186 INCOMPATIBLE, // checksum comparison is not compatible. 187 } 188 189 private static class ExportMapper 190 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 191 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 192 final static int REPORT_SIZE = 1 * 1024 * 1024; 193 final static int BUFFER_SIZE = 64 * 1024; 194 195 private boolean verifyChecksum; 196 private String filesGroup; 197 private String filesUser; 198 private short filesMode; 199 private int bufferSize; 200 private int reportSize; 201 202 private FileSystem outputFs; 203 private Path outputArchive; 204 private Path outputRoot; 205 206 private FileSystem inputFs; 207 private Path inputArchive; 208 private Path inputRoot; 209 210 private static Testing testing = new Testing(); 211 212 @Override 213 public void setup(Context context) throws IOException { 214 Configuration conf = context.getConfiguration(); 215 216 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 217 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 218 219 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 220 221 filesGroup = conf.get(CONF_FILES_GROUP); 222 filesUser = conf.get(CONF_FILES_USER); 223 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 224 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 225 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 226 227 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 228 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 229 230 try { 231 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 232 } catch (IOException e) { 233 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 234 } 235 236 try { 237 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 238 } catch (IOException e) { 239 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 240 } 241 242 // Use the default block size of the outputFs if bigger 243 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 244 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 245 LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize)); 246 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 247 248 for (Counter c : Counter.values()) { 249 context.getCounter(c).increment(0); 250 } 251 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 252 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 253 // Get number of times we have already injected failure based on attempt number of this 254 // task. 255 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 256 } 257 } 258 259 @Override 260 public void map(BytesWritable key, NullWritable value, Context context) 261 throws InterruptedException, IOException { 262 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 263 Path outputPath = getOutputPath(inputInfo); 264 265 copyFile(context, inputInfo, outputPath); 266 } 267 268 /** 269 * Returns the location where the inputPath will be copied. 270 */ 271 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 272 Path path = null; 273 switch (inputInfo.getType()) { 274 case HFILE: 275 Path inputPath = new Path(inputInfo.getHfile()); 276 String family = inputPath.getParent().getName(); 277 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 278 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 279 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 280 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 281 new Path(region, new Path(family, hfile))); 282 break; 283 case WAL: 284 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 285 break; 286 default: 287 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 288 } 289 return new Path(outputArchive, path); 290 } 291 292 @SuppressWarnings("checkstyle:linelength") 293 /** 294 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 295 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 296 */ 297 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 298 throws IOException { 299 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 300 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 301 testing.injectedFailureCount++; 302 context.getCounter(Counter.COPY_FAILED).increment(1); 303 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 304 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 305 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 306 } 307 308 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 309 final Path outputPath) throws IOException { 310 // Get the file information 311 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 312 313 // Verify if the output file exists and is the same that we want to copy 314 if (outputFs.exists(outputPath)) { 315 FileStatus outputStat = outputFs.getFileStatus(outputPath); 316 if (outputStat != null && sameFile(inputStat, outputStat)) { 317 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 318 context.getCounter(Counter.FILES_SKIPPED).increment(1); 319 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 320 return; 321 } 322 } 323 324 InputStream in = openSourceFile(context, inputInfo); 325 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 326 if (Integer.MAX_VALUE != bandwidthMB) { 327 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 328 } 329 330 Path inputPath = inputStat.getPath(); 331 try { 332 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 333 334 // Ensure that the output folder is there and copy the file 335 createOutputPath(outputPath.getParent()); 336 String family = new Path(inputInfo.getHfile()).getParent().getName(); 337 String familyStoragePolicy = generateFamilyStoragePolicyKey(family); 338 if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) { 339 String key = context.getConfiguration().get(familyStoragePolicy); 340 LOG.info("Setting storage policy {} for {}", key, outputPath.getParent()); 341 outputFs.setStoragePolicy(outputPath.getParent(), key); 342 } 343 FSDataOutputStream out = outputFs.create(outputPath, true); 344 345 long stime = EnvironmentEdgeManager.currentTime(); 346 long totalBytesWritten = 347 copyData(context, inputPath, in, outputPath, out, inputStat.getLen()); 348 349 // Verify the file length and checksum 350 verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath)); 351 352 long etime = EnvironmentEdgeManager.currentTime(); 353 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 354 LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten) 355 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec", 356 (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 357 context.getCounter(Counter.FILES_COPIED).increment(1); 358 359 // Try to Preserve attributes 360 if (!preserveAttributes(outputPath, inputStat)) { 361 LOG.warn("You may have to run manually chown on: " + outputPath); 362 } 363 } catch (IOException e) { 364 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 365 context.getCounter(Counter.COPY_FAILED).increment(1); 366 throw e; 367 } finally { 368 injectTestFailure(context, inputInfo); 369 } 370 } 371 372 /** 373 * Create the output folder and optionally set ownership. 374 */ 375 private void createOutputPath(final Path path) throws IOException { 376 if (filesUser == null && filesGroup == null) { 377 outputFs.mkdirs(path); 378 } else { 379 Path parent = path.getParent(); 380 if (!outputFs.exists(parent) && !parent.isRoot()) { 381 createOutputPath(parent); 382 } 383 outputFs.mkdirs(path); 384 if (filesUser != null || filesGroup != null) { 385 // override the owner when non-null user/group is specified 386 outputFs.setOwner(path, filesUser, filesGroup); 387 } 388 if (filesMode > 0) { 389 outputFs.setPermission(path, new FsPermission(filesMode)); 390 } 391 } 392 } 393 394 /** 395 * Try to Preserve the files attribute selected by the user copying them from the source file 396 * This is only required when you are exporting as a different user than "hbase" or on a system 397 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 398 * can force a chmod with the user that knows is available on the system. 399 */ 400 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 401 FileStatus stat; 402 try { 403 stat = outputFs.getFileStatus(path); 404 } catch (IOException e) { 405 LOG.warn("Unable to get the status for file=" + path); 406 return false; 407 } 408 409 try { 410 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 411 outputFs.setPermission(path, new FsPermission(filesMode)); 412 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 413 outputFs.setPermission(path, refStat.getPermission()); 414 } 415 } catch (IOException e) { 416 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 417 return false; 418 } 419 420 boolean hasRefStat = (refStat != null); 421 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 422 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 423 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 424 try { 425 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 426 outputFs.setOwner(path, user, group); 427 } 428 } catch (IOException e) { 429 LOG.warn( 430 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 431 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 432 + " group=" + group); 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 private boolean stringIsNotEmpty(final String str) { 441 return str != null && !str.isEmpty(); 442 } 443 444 private long copyData(final Context context, final Path inputPath, final InputStream in, 445 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 446 throws IOException { 447 final String statusMessage = 448 "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)"; 449 450 try { 451 byte[] buffer = new byte[bufferSize]; 452 long totalBytesWritten = 0; 453 int reportBytes = 0; 454 int bytesRead; 455 456 while ((bytesRead = in.read(buffer)) > 0) { 457 out.write(buffer, 0, bytesRead); 458 totalBytesWritten += bytesRead; 459 reportBytes += bytesRead; 460 461 if (reportBytes >= reportSize) { 462 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 463 context 464 .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 465 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 466 + " to " + outputPath); 467 reportBytes = 0; 468 } 469 } 470 471 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 472 context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 473 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 474 + outputPath); 475 476 return totalBytesWritten; 477 } finally { 478 out.close(); 479 in.close(); 480 } 481 } 482 483 /** 484 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 485 * fail or if the file is not found. 486 */ 487 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 488 throws IOException { 489 try { 490 Configuration conf = context.getConfiguration(); 491 FileLink link = null; 492 switch (fileInfo.getType()) { 493 case HFILE: 494 Path inputPath = new Path(fileInfo.getHfile()); 495 link = getFileLink(inputPath, conf); 496 break; 497 case WAL: 498 String serverName = fileInfo.getWalServer(); 499 String logName = fileInfo.getWalName(); 500 link = new WALLink(inputRoot, serverName, logName); 501 break; 502 default: 503 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 504 } 505 return link.open(inputFs); 506 } catch (IOException e) { 507 context.getCounter(Counter.MISSING_FILES).increment(1); 508 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 509 throw e; 510 } 511 } 512 513 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 514 throws IOException { 515 try { 516 Configuration conf = context.getConfiguration(); 517 FileLink link = null; 518 switch (fileInfo.getType()) { 519 case HFILE: 520 Path inputPath = new Path(fileInfo.getHfile()); 521 link = getFileLink(inputPath, conf); 522 break; 523 case WAL: 524 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 525 break; 526 default: 527 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 528 } 529 return link.getFileStatus(inputFs); 530 } catch (FileNotFoundException e) { 531 context.getCounter(Counter.MISSING_FILES).increment(1); 532 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 533 throw e; 534 } catch (IOException e) { 535 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 536 throw e; 537 } 538 } 539 540 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 541 String regionName = HFileLink.getReferencedRegionName(path.getName()); 542 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 543 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 544 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 545 HFileArchiveUtil.getArchivePath(conf), path); 546 } 547 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 548 } 549 550 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 551 try { 552 return fs.getFileChecksum(path); 553 } catch (IOException e) { 554 LOG.warn("Unable to get checksum for file=" + path, e); 555 return null; 556 } 557 } 558 559 /** 560 * Utility to compare the file length and checksums for the paths specified. 561 */ 562 private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat) 563 throws IOException { 564 long inputLen = inputStat.getLen(); 565 long outputLen = outputStat.getLen(); 566 Path inputPath = inputStat.getPath(); 567 Path outputPath = outputStat.getPath(); 568 569 if (inputLen != outputLen) { 570 throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen 571 + ") and output:" + outputPath + " (" + outputLen + ")"); 572 } 573 574 // If length==0, we will skip checksum 575 if (inputLen != 0 && verifyChecksum) { 576 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 577 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 578 579 ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum); 580 if (!checksumComparison.equals(ChecksumComparison.TRUE)) { 581 StringBuilder errMessage = new StringBuilder("Checksum mismatch between ") 582 .append(inputPath).append(" and ").append(outputPath).append("."); 583 584 boolean addSkipHint = false; 585 String inputScheme = inputFs.getScheme(); 586 String outputScheme = outputFs.getScheme(); 587 if (!inputScheme.equals(outputScheme)) { 588 errMessage.append(" Input and output filesystems are of different types.\n") 589 .append("Their checksum algorithms may be incompatible."); 590 addSkipHint = true; 591 } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) { 592 errMessage.append(" Input and output differ in block-size."); 593 addSkipHint = true; 594 } else if ( 595 inChecksum != null && outChecksum != null 596 && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 597 ) { 598 errMessage.append(" Input and output checksum algorithms are of different types."); 599 addSkipHint = true; 600 } 601 if (addSkipHint) { 602 errMessage 603 .append(" You can choose file-level checksum validation via " 604 + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" 605 + " or filesystems are different.\n") 606 .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,") 607 .append( 608 " for the table backup scenario, you should use -i option to skip checksum-checks.\n") 609 .append(" (NOTE: By skipping checksums, one runs the risk of " 610 + "masking data-corruption during file-transfer.)\n"); 611 } 612 throw new IOException(errMessage.toString()); 613 } 614 } 615 } 616 617 /** 618 * Utility to compare checksums 619 */ 620 private ChecksumComparison verifyChecksum(final FileChecksum inChecksum, 621 final FileChecksum outChecksum) { 622 // If the input or output checksum is null, or the algorithms of input and output are not 623 // equal, that means there is no comparison 624 // and return not compatible. else if matched, return compatible with the matched result. 625 if ( 626 inChecksum == null || outChecksum == null 627 || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 628 ) { 629 return ChecksumComparison.INCOMPATIBLE; 630 } else if (inChecksum.equals(outChecksum)) { 631 return ChecksumComparison.TRUE; 632 } 633 return ChecksumComparison.FALSE; 634 } 635 636 /** 637 * Check if the two files are equal by looking at the file length, and at the checksum (if user 638 * has specified the verifyChecksum flag). 639 */ 640 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 641 // Not matching length 642 if (inputStat.getLen() != outputStat.getLen()) return false; 643 644 // Mark files as equals, since user asked for no checksum verification 645 if (!verifyChecksum) return true; 646 647 // If checksums are not available, files are not the same. 648 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 649 if (inChecksum == null) return false; 650 651 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 652 if (outChecksum == null) return false; 653 654 return inChecksum.equals(outChecksum); 655 } 656 } 657 658 // ========================================================================== 659 // Input Format 660 // ========================================================================== 661 662 /** 663 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 664 * @return list of files referenced by the snapshot (pair of path and size) 665 */ 666 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 667 final FileSystem fs, final Path snapshotDir) throws IOException { 668 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 669 670 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 671 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 672 673 // Get snapshot files 674 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 675 Set<String> addedFiles = new HashSet<>(); 676 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 677 new SnapshotReferenceUtil.SnapshotVisitor() { 678 @Override 679 public void storeFile(final RegionInfo regionInfo, final String family, 680 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 681 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 682 if (!storeFile.hasReference()) { 683 String region = regionInfo.getEncodedName(); 684 String hfile = storeFile.getName(); 685 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 686 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 687 } else { 688 Pair<String, String> referredToRegionAndFile = 689 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 690 String referencedRegion = referredToRegionAndFile.getFirst(); 691 String referencedHFile = referredToRegionAndFile.getSecond(); 692 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 693 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 694 } 695 String fileToExport = snapshotFileAndSize.getFirst().getHfile(); 696 if (!addedFiles.contains(fileToExport)) { 697 files.add(snapshotFileAndSize); 698 addedFiles.add(fileToExport); 699 } else { 700 LOG.debug("Skip the existing file: {}.", fileToExport); 701 } 702 } 703 }); 704 705 return files; 706 } 707 708 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 709 Configuration conf, TableName table, String region, String family, String hfile, long size) 710 throws IOException { 711 Path path = HFileLink.createPath(table, region, family, hfile); 712 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 713 .setHfile(path.toString()).build(); 714 if (size == -1) { 715 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 716 } 717 return new Pair<>(fileInfo, size); 718 } 719 720 /** 721 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 722 * The groups created will have similar amounts of bytes. 723 * <p> 724 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 725 * group fetch the bigger file available, iterating through groups alternating the direction. 726 */ 727 static List<List<Pair<SnapshotFileInfo, Long>>> 728 getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 729 // Sort files by size, from small to big 730 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 731 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 732 long r = a.getSecond() - b.getSecond(); 733 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 734 } 735 }); 736 737 // create balanced groups 738 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 739 long[] sizeGroups = new long[ngroups]; 740 int hi = files.size() - 1; 741 int lo = 0; 742 743 List<Pair<SnapshotFileInfo, Long>> group; 744 int dir = 1; 745 int g = 0; 746 747 while (hi >= lo) { 748 if (g == fileGroups.size()) { 749 group = new LinkedList<>(); 750 fileGroups.add(group); 751 } else { 752 group = fileGroups.get(g); 753 } 754 755 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 756 757 // add the hi one 758 sizeGroups[g] += fileInfo.getSecond(); 759 group.add(fileInfo); 760 761 // change direction when at the end or the beginning 762 g += dir; 763 if (g == ngroups) { 764 dir = -1; 765 g = ngroups - 1; 766 } else if (g < 0) { 767 dir = 1; 768 g = 0; 769 } 770 } 771 772 if (LOG.isDebugEnabled()) { 773 for (int i = 0; i < sizeGroups.length; ++i) { 774 LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i])); 775 } 776 } 777 778 return fileGroups; 779 } 780 781 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 782 @Override 783 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 784 TaskAttemptContext tac) throws IOException, InterruptedException { 785 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 786 } 787 788 @Override 789 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 790 Configuration conf = context.getConfiguration(); 791 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 792 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 793 794 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 795 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 796 if (mappers == 0 && snapshotFiles.size() > 0) { 797 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 798 mappers = Math.min(mappers, snapshotFiles.size()); 799 conf.setInt(CONF_NUM_SPLITS, mappers); 800 conf.setInt(MR_NUM_MAPS, mappers); 801 } 802 803 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 804 List<InputSplit> splits = new ArrayList(groups.size()); 805 for (List<Pair<SnapshotFileInfo, Long>> files : groups) { 806 splits.add(new ExportSnapshotInputSplit(files)); 807 } 808 return splits; 809 } 810 811 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 812 private List<Pair<BytesWritable, Long>> files; 813 private long length; 814 815 public ExportSnapshotInputSplit() { 816 this.files = null; 817 } 818 819 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 820 this.files = new ArrayList(snapshotFiles.size()); 821 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 822 this.files.add( 823 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 824 this.length += fileInfo.getSecond(); 825 } 826 } 827 828 private List<Pair<BytesWritable, Long>> getSplitKeys() { 829 return files; 830 } 831 832 @Override 833 public long getLength() throws IOException, InterruptedException { 834 return length; 835 } 836 837 @Override 838 public String[] getLocations() throws IOException, InterruptedException { 839 return new String[] {}; 840 } 841 842 @Override 843 public void readFields(DataInput in) throws IOException { 844 int count = in.readInt(); 845 files = new ArrayList<>(count); 846 length = 0; 847 for (int i = 0; i < count; ++i) { 848 BytesWritable fileInfo = new BytesWritable(); 849 fileInfo.readFields(in); 850 long size = in.readLong(); 851 files.add(new Pair<>(fileInfo, size)); 852 length += size; 853 } 854 } 855 856 @Override 857 public void write(DataOutput out) throws IOException { 858 out.writeInt(files.size()); 859 for (final Pair<BytesWritable, Long> fileInfo : files) { 860 fileInfo.getFirst().write(out); 861 out.writeLong(fileInfo.getSecond()); 862 } 863 } 864 } 865 866 private static class ExportSnapshotRecordReader 867 extends RecordReader<BytesWritable, NullWritable> { 868 private final List<Pair<BytesWritable, Long>> files; 869 private long totalSize = 0; 870 private long procSize = 0; 871 private int index = -1; 872 873 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 874 this.files = files; 875 for (Pair<BytesWritable, Long> fileInfo : files) { 876 totalSize += fileInfo.getSecond(); 877 } 878 } 879 880 @Override 881 public void close() { 882 } 883 884 @Override 885 public BytesWritable getCurrentKey() { 886 return files.get(index).getFirst(); 887 } 888 889 @Override 890 public NullWritable getCurrentValue() { 891 return NullWritable.get(); 892 } 893 894 @Override 895 public float getProgress() { 896 return (float) procSize / totalSize; 897 } 898 899 @Override 900 public void initialize(InputSplit split, TaskAttemptContext tac) { 901 } 902 903 @Override 904 public boolean nextKeyValue() { 905 if (index >= 0) { 906 procSize += files.get(index).getSecond(); 907 } 908 return (++index < files.size()); 909 } 910 } 911 } 912 913 // ========================================================================== 914 // Tool 915 // ========================================================================== 916 917 /** 918 * Run Map-Reduce Job to perform the files copy. 919 */ 920 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 921 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 922 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB, 923 final String storagePolicy) throws IOException, InterruptedException, ClassNotFoundException { 924 Configuration conf = getConf(); 925 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 926 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 927 if (mappers > 0) { 928 conf.setInt(CONF_NUM_SPLITS, mappers); 929 conf.setInt(MR_NUM_MAPS, mappers); 930 } 931 conf.setInt(CONF_FILES_MODE, filesMode); 932 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 933 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 934 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 935 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 936 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 937 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 938 if (storagePolicy != null) { 939 for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) { 940 conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue()); 941 } 942 } 943 944 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 945 Job job = new Job(conf); 946 job.setJobName(jobname); 947 job.setJarByClass(ExportSnapshot.class); 948 TableMapReduceUtil.addDependencyJars(job); 949 job.setMapperClass(ExportMapper.class); 950 job.setInputFormatClass(ExportSnapshotInputFormat.class); 951 job.setOutputFormatClass(NullOutputFormat.class); 952 job.setMapSpeculativeExecution(false); 953 job.setNumReduceTasks(0); 954 955 // Acquire the delegation Tokens 956 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 957 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 958 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 959 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 960 961 // Run the MR Job 962 if (!job.waitForCompletion(true)) { 963 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 964 } 965 } 966 967 private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf, 968 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 969 // Update the conf with the current root dir, since may be a different cluster 970 Configuration conf = new Configuration(baseConf); 971 CommonFSUtils.setRootDir(conf, rootDir); 972 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 973 boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), 974 snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); 975 if (isExpired) { 976 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); 977 } 978 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 979 } 980 981 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 982 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 983 ExecutorService pool = Executors 984 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 985 List<Future<Void>> futures = new ArrayList<>(); 986 for (Path dstPath : traversedPath) { 987 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 988 futures.add(future); 989 } 990 try { 991 for (Future<Void> future : futures) { 992 future.get(); 993 } 994 } catch (InterruptedException | ExecutionException e) { 995 throw new IOException(e); 996 } finally { 997 pool.shutdownNow(); 998 } 999 } 1000 1001 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 1002 Configuration conf, List<Path> traversedPath) throws IOException { 1003 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1004 try { 1005 fs.setOwner(path, filesUser, filesGroup); 1006 } catch (IOException e) { 1007 throw new RuntimeException( 1008 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 1009 } 1010 }, conf); 1011 } 1012 1013 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 1014 final List<Path> traversedPath, final Configuration conf) throws IOException { 1015 if (filesMode <= 0) { 1016 return; 1017 } 1018 FsPermission perm = new FsPermission(filesMode); 1019 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1020 try { 1021 fs.setPermission(path, perm); 1022 } catch (IOException e) { 1023 throw new RuntimeException( 1024 "set permission for file " + path + " to " + filesMode + " failed", e); 1025 } 1026 }, conf); 1027 } 1028 1029 private Map<String, String> storagePolicyPerFamily(String storagePolicy) { 1030 Map<String, String> familyStoragePolicy = new HashMap<>(); 1031 for (String familyConf : storagePolicy.split("&")) { 1032 String[] familySplit = familyConf.split("="); 1033 if (familySplit.length != 2) { 1034 continue; 1035 } 1036 // family is key, storage policy is value 1037 familyStoragePolicy.put(familySplit[0], familySplit[1]); 1038 } 1039 return familyStoragePolicy; 1040 } 1041 1042 private static String generateFamilyStoragePolicyKey(String family) { 1043 return CONF_STORAGE_POLICY + "." + family; 1044 } 1045 1046 private boolean verifyTarget = true; 1047 private boolean verifySource = true; 1048 private boolean verifyChecksum = true; 1049 private String snapshotName = null; 1050 private String targetName = null; 1051 private boolean overwrite = false; 1052 private String filesGroup = null; 1053 private String filesUser = null; 1054 private Path outputRoot = null; 1055 private Path inputRoot = null; 1056 private int bandwidthMB = Integer.MAX_VALUE; 1057 private int filesMode = 0; 1058 private int mappers = 0; 1059 private boolean resetTtl = false; 1060 private String storagePolicy = null; 1061 1062 @Override 1063 protected void processOptions(CommandLine cmd) { 1064 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 1065 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 1066 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 1067 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 1068 } 1069 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 1070 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 1071 } 1072 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 1073 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 1074 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 1075 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 1076 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 1077 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 1078 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 1079 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 1080 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 1081 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 1082 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 1083 if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) { 1084 storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt()); 1085 } 1086 } 1087 1088 /** 1089 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 1090 * @return 0 on success, and != 0 upon failure. 1091 */ 1092 @Override 1093 public int doWork() throws IOException { 1094 Configuration conf = getConf(); 1095 1096 // Check user options 1097 if (snapshotName == null) { 1098 System.err.println("Snapshot name not provided."); 1099 LOG.error("Use -h or --help for usage instructions."); 1100 return EXIT_FAILURE; 1101 } 1102 1103 if (outputRoot == null) { 1104 System.err 1105 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 1106 LOG.error("Use -h or --help for usage instructions."); 1107 return EXIT_FAILURE; 1108 } 1109 1110 if (targetName == null) { 1111 targetName = snapshotName; 1112 } 1113 if (inputRoot == null) { 1114 inputRoot = CommonFSUtils.getRootDir(conf); 1115 } else { 1116 CommonFSUtils.setRootDir(conf, inputRoot); 1117 } 1118 1119 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1120 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 1121 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1122 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 1123 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 1124 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1125 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1126 Path snapshotTmpDir = 1127 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1128 Path outputSnapshotDir = 1129 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1130 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1131 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1132 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1133 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1134 1135 // throw CorruptedSnapshotException if we can't read the snapshot info. 1136 SnapshotDescription sourceSnapshotDesc = 1137 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir); 1138 1139 // Verify snapshot source before copying files 1140 if (verifySource) { 1141 LOG.info("Verify the source snapshot's expiration status and integrity."); 1142 verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir); 1143 } 1144 1145 // Find the necessary directory which need to change owner and group 1146 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1147 if (outputFs.exists(needSetOwnerDir)) { 1148 if (skipTmp) { 1149 needSetOwnerDir = outputSnapshotDir; 1150 } else { 1151 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1152 if (outputFs.exists(needSetOwnerDir)) { 1153 needSetOwnerDir = snapshotTmpDir; 1154 } 1155 } 1156 } 1157 1158 // Check if the snapshot already exists 1159 if (outputFs.exists(outputSnapshotDir)) { 1160 if (overwrite) { 1161 if (!outputFs.delete(outputSnapshotDir, true)) { 1162 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1163 return EXIT_FAILURE; 1164 } 1165 } else { 1166 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1167 + outputSnapshotDir); 1168 return EXIT_FAILURE; 1169 } 1170 } 1171 1172 if (!skipTmp) { 1173 // Check if the snapshot already in-progress 1174 if (outputFs.exists(snapshotTmpDir)) { 1175 if (overwrite) { 1176 if (!outputFs.delete(snapshotTmpDir, true)) { 1177 System.err 1178 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1179 return EXIT_FAILURE; 1180 } 1181 } else { 1182 System.err 1183 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1184 System.err 1185 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1186 System.err 1187 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1188 return EXIT_FAILURE; 1189 } 1190 } 1191 } 1192 1193 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1194 // The snapshot references must be copied before the hfiles otherwise the cleaner 1195 // will remove them because they are unreferenced. 1196 List<Path> travesedPaths = new ArrayList<>(); 1197 boolean copySucceeded = false; 1198 try { 1199 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1200 travesedPaths = 1201 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1202 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1203 copySucceeded = true; 1204 } catch (IOException e) { 1205 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1206 + " to=" + initialOutputSnapshotDir, e); 1207 } finally { 1208 if (copySucceeded) { 1209 if (filesUser != null || filesGroup != null) { 1210 LOG.warn( 1211 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1212 + (filesGroup == null 1213 ? "" 1214 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1215 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1216 } 1217 if (filesMode > 0) { 1218 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1219 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1220 } 1221 } 1222 } 1223 1224 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1225 // reset TTL for target snapshot. 1226 if (!targetName.equals(snapshotName) || resetTtl) { 1227 SnapshotDescription.Builder snapshotDescBuilder = 1228 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1229 if (!targetName.equals(snapshotName)) { 1230 snapshotDescBuilder.setName(targetName); 1231 } 1232 if (resetTtl) { 1233 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1234 } 1235 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1236 initialOutputSnapshotDir, outputFs); 1237 if (filesUser != null || filesGroup != null) { 1238 outputFs.setOwner( 1239 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1240 filesGroup); 1241 } 1242 if (filesMode > 0) { 1243 outputFs.setPermission( 1244 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1245 new FsPermission((short) filesMode)); 1246 } 1247 } 1248 1249 // Step 2 - Start MR Job to copy files 1250 // The snapshot references must be copied before the files otherwise the files gets removed 1251 // by the HFileArchiver, since they have no references. 1252 try { 1253 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1254 filesGroup, filesMode, mappers, bandwidthMB, storagePolicy); 1255 1256 LOG.info("Finalize the Snapshot Export"); 1257 if (!skipTmp) { 1258 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1259 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1260 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1261 + snapshotTmpDir + " to=" + outputSnapshotDir); 1262 } 1263 } 1264 1265 // Step 4 - Verify snapshot integrity 1266 if (verifyTarget) { 1267 LOG.info("Verify the exported snapshot's expiration status and integrity."); 1268 SnapshotDescription targetSnapshotDesc = 1269 SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir); 1270 verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir); 1271 } 1272 1273 LOG.info("Export Completed: " + targetName); 1274 return EXIT_SUCCESS; 1275 } catch (Exception e) { 1276 LOG.error("Snapshot export failed", e); 1277 if (!skipTmp) { 1278 outputFs.delete(snapshotTmpDir, true); 1279 } 1280 outputFs.delete(outputSnapshotDir, true); 1281 return EXIT_FAILURE; 1282 } 1283 } 1284 1285 @Override 1286 protected void printUsage() { 1287 super.printUsage(); 1288 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1289 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1290 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1291 + " hbase snapshot export \\\n" 1292 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1293 + " --copy-to hdfs://srv1:50070/hbase"); 1294 } 1295 1296 @Override 1297 protected void addOptions() { 1298 addRequiredOption(Options.SNAPSHOT); 1299 addOption(Options.COPY_TO); 1300 addOption(Options.COPY_FROM); 1301 addOption(Options.TARGET_NAME); 1302 addOption(Options.NO_CHECKSUM_VERIFY); 1303 addOption(Options.NO_TARGET_VERIFY); 1304 addOption(Options.NO_SOURCE_VERIFY); 1305 addOption(Options.OVERWRITE); 1306 addOption(Options.CHUSER); 1307 addOption(Options.CHGROUP); 1308 addOption(Options.CHMOD); 1309 addOption(Options.MAPPERS); 1310 addOption(Options.BANDWIDTH); 1311 addOption(Options.RESET_TTL); 1312 } 1313 1314 public static void main(String[] args) { 1315 new ExportSnapshot().doStaticMain(args); 1316 } 1317}