001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.function.BiConsumer; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataInputStream; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.FileChecksum; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.permission.FsPermission; 049import org.apache.hadoop.hbase.HBaseConfiguration; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.io.FileLink; 054import org.apache.hadoop.hbase.io.HFileLink; 055import org.apache.hadoop.hbase.io.WALLink; 056import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 057import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 058import org.apache.hadoop.hbase.mob.MobUtils; 059import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 060import org.apache.hadoop.hbase.util.AbstractHBaseTool; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.hadoop.hbase.util.HFileArchiveUtil; 065import org.apache.hadoop.hbase.util.Pair; 066import org.apache.hadoop.hbase.util.Strings; 067import org.apache.hadoop.io.BytesWritable; 068import org.apache.hadoop.io.NullWritable; 069import org.apache.hadoop.io.Writable; 070import org.apache.hadoop.mapreduce.InputFormat; 071import org.apache.hadoop.mapreduce.InputSplit; 072import org.apache.hadoop.mapreduce.Job; 073import org.apache.hadoop.mapreduce.JobContext; 074import org.apache.hadoop.mapreduce.Mapper; 075import org.apache.hadoop.mapreduce.RecordReader; 076import org.apache.hadoop.mapreduce.TaskAttemptContext; 077import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 078import org.apache.hadoop.mapreduce.security.TokenCache; 079import org.apache.hadoop.util.ReflectionUtils; 080import org.apache.hadoop.util.StringUtils; 081import org.apache.hadoop.util.Tool; 082import org.apache.yetus.audience.InterfaceAudience; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 089import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 090 091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 095 096/** 097 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 098 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 099 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 100 */ 101@InterfaceAudience.Public 102public class ExportSnapshot extends AbstractHBaseTool implements Tool { 103 public static final String NAME = "exportsnapshot"; 104 /** Configuration prefix for overrides for the source filesystem */ 105 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 106 /** Configuration prefix for overrides for the destination filesystem */ 107 public static final String CONF_DEST_PREFIX = NAME + ".to."; 108 109 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 110 111 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 112 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 113 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 114 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 115 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 116 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 117 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 118 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 119 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 120 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 121 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 122 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 123 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 124 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 125 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 126 private static final String CONF_INPUT_FILE_GROUPER_CLASS = 127 "snapshot.export.input.file.grouper.class"; 128 private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS = 129 "snapshot.export.input.file.location.resolver.class"; 130 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 131 private static final String CONF_COPY_MANIFEST_THREADS = 132 "snapshot.export.copy.references.threads"; 133 private static final int DEFAULT_COPY_MANIFEST_THREADS = 134 Runtime.getRuntime().availableProcessors(); 135 private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family"; 136 137 static class Testing { 138 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 139 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 140 int failuresCountToInject = 0; 141 int injectedFailureCount = 0; 142 } 143 144 // Command line options and defaults. 145 static final class Options { 146 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 147 static final Option TARGET_NAME = 148 new Option(null, "target", true, "Target name for the snapshot."); 149 static final Option COPY_TO = 150 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 151 static final Option COPY_FROM = 152 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 153 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 154 "Do not verify checksum, use name+length only."); 155 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 156 "Do not verify the exported snapshot's expiration status and integrity."); 157 static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false, 158 "Do not verify the source snapshot's expiration status and integrity."); 159 static final Option OVERWRITE = 160 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 161 static final Option CHUSER = 162 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 163 static final Option CHGROUP = 164 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 165 static final Option CHMOD = 166 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 167 static final Option MAPPERS = new Option(null, "mappers", true, 168 "Number of mappers to use during the copy (mapreduce.job.maps). " 169 + "If you provide a --custom-file-grouper, " 170 + "then --mappers is interpreted as the number of mappers per group."); 171 static final Option BANDWIDTH = 172 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 173 static final Option RESET_TTL = 174 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 175 static final Option STORAGE_POLICY = new Option(null, "storage-policy", true, 176 "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD"); 177 static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true, 178 "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. " 179 + "See JavaDoc on that class for more information."); 180 static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true, 181 "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. " 182 + "See JavaDoc on that class for more information."); 183 } 184 185 // Export Map-Reduce Counters, to keep track of the progress 186 public enum Counter { 187 MISSING_FILES, 188 FILES_COPIED, 189 FILES_SKIPPED, 190 COPY_FAILED, 191 BYTES_EXPECTED, 192 BYTES_SKIPPED, 193 BYTES_COPIED 194 } 195 196 /** 197 * Indicates the checksum comparison result. 198 */ 199 public enum ChecksumComparison { 200 TRUE, // checksum comparison is compatible and true. 201 FALSE, // checksum comparison is compatible and false. 202 INCOMPATIBLE, // checksum comparison is not compatible. 203 } 204 205 /** 206 * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot 207 * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation 208 * must return a data structure that contains each input file exactly once. Files that appear in 209 * separate entries in the top-level returned Collection are guaranteed to not be placed in the 210 * same InputSplit. This can be used to segregate your input files by the rack or host on which 211 * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve 212 * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper 213 * argument with the fully qualified class name of an implementation of CustomFileGrouper that's 214 * on the classpath. If this argument is not used, no particular grouping logic will be applied. 215 */ 216 @InterfaceAudience.Public 217 public interface CustomFileGrouper { 218 Collection<Collection<Pair<SnapshotFileInfo, Long>>> 219 getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles); 220 } 221 222 private static class NoopCustomFileGrouper implements CustomFileGrouper { 223 @Override 224 public Collection<Collection<Pair<SnapshotFileInfo, Long>>> 225 getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 226 return ImmutableList.of(snapshotFiles); 227 } 228 } 229 230 /** 231 * If desired, you may implement a FileLocationResolver in order to influence the _location_ 232 * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The 233 * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit 234 * being constructed. Whatever is returned will ultimately be reported by that split's 235 * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the 236 * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the 237 * --file-location-resolver argument with the fully qualified class name of an implementation of 238 * FileLocationResolver that's on the classpath. If this argument is not used, no locations will 239 * be attached to the InputSplits. 240 */ 241 @InterfaceAudience.Public 242 public interface FileLocationResolver { 243 Set<String> getLocationsForInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> files); 244 } 245 246 static class NoopFileLocationResolver implements FileLocationResolver { 247 @Override 248 public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) { 249 return ImmutableSet.of(); 250 } 251 } 252 253 private static class ExportMapper 254 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 255 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 256 final static int REPORT_SIZE = 1 * 1024 * 1024; 257 final static int BUFFER_SIZE = 64 * 1024; 258 259 private boolean verifyChecksum; 260 private String filesGroup; 261 private String filesUser; 262 private short filesMode; 263 private int bufferSize; 264 private int reportSize; 265 266 private FileSystem outputFs; 267 private Path outputArchive; 268 private Path outputRoot; 269 270 private FileSystem inputFs; 271 private Path inputArchive; 272 private Path inputRoot; 273 274 private static Testing testing = new Testing(); 275 276 @Override 277 public void setup(Context context) throws IOException { 278 Configuration conf = context.getConfiguration(); 279 280 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 281 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 282 283 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 284 285 filesGroup = conf.get(CONF_FILES_GROUP); 286 filesUser = conf.get(CONF_FILES_USER); 287 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 288 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 289 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 290 291 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 292 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 293 294 try { 295 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 296 } catch (IOException e) { 297 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 298 } 299 300 try { 301 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 302 } catch (IOException e) { 303 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 304 } 305 306 // Use the default block size of the outputFs if bigger 307 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 308 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 309 LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize)); 310 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 311 312 for (Counter c : Counter.values()) { 313 context.getCounter(c).increment(0); 314 } 315 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 316 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 317 // Get number of times we have already injected failure based on attempt number of this 318 // task. 319 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 320 } 321 } 322 323 @Override 324 public void map(BytesWritable key, NullWritable value, Context context) 325 throws InterruptedException, IOException { 326 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 327 Path outputPath = getOutputPath(inputInfo); 328 329 copyFile(context, inputInfo, outputPath); 330 // inject failure 331 injectTestFailure(context, inputInfo); 332 } 333 334 /** 335 * Returns the location where the inputPath will be copied. 336 */ 337 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 338 Path path = null; 339 switch (inputInfo.getType()) { 340 case HFILE: 341 Path inputPath = new Path(inputInfo.getHfile()); 342 String family = inputPath.getParent().getName(); 343 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 344 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 345 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 346 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 347 new Path(region, new Path(family, hfile))); 348 break; 349 case WAL: 350 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 351 break; 352 default: 353 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 354 } 355 return new Path(outputArchive, path); 356 } 357 358 /** 359 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 360 * {@link #map(BytesWritable, NullWritable, org.apache.hadoop.mapreduce.Mapper.Context)} 361 */ 362 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 363 throws IOException { 364 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 365 return; 366 } 367 if (testing.injectedFailureCount >= testing.failuresCountToInject) { 368 return; 369 } 370 testing.injectedFailureCount++; 371 context.getCounter(Counter.COPY_FAILED).increment(1); 372 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 373 throw new IOException(String.format( 374 context.getTaskAttemptID() + " TEST FAILURE (%d of max %d): Unable to copy input=%s", 375 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 376 } 377 378 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 379 final Path outputPath) throws IOException { 380 // Get the file information 381 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 382 383 // Verify if the output file exists and is the same that we want to copy 384 if (outputFs.exists(outputPath)) { 385 FileStatus outputStat = outputFs.getFileStatus(outputPath); 386 if (outputStat != null && sameFile(inputStat, outputStat)) { 387 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 388 context.getCounter(Counter.FILES_SKIPPED).increment(1); 389 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 390 return; 391 } 392 } 393 394 InputStream in = openSourceFile(context, inputInfo); 395 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 396 if (Integer.MAX_VALUE != bandwidthMB) { 397 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 398 } 399 400 Path inputPath = inputStat.getPath(); 401 try { 402 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 403 404 // Ensure that the output folder is there and copy the file 405 createOutputPath(outputPath.getParent()); 406 String family = new Path(inputInfo.getHfile()).getParent().getName(); 407 String familyStoragePolicy = generateFamilyStoragePolicyKey(family); 408 if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) { 409 String key = context.getConfiguration().get(familyStoragePolicy); 410 LOG.info("Setting storage policy {} for {}", key, outputPath.getParent()); 411 outputFs.setStoragePolicy(outputPath.getParent(), key); 412 } 413 FSDataOutputStream out = outputFs.create(outputPath, true); 414 415 long stime = EnvironmentEdgeManager.currentTime(); 416 long totalBytesWritten = 417 copyData(context, inputPath, in, outputPath, out, inputStat.getLen()); 418 419 // Verify the file length and checksum 420 verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath)); 421 422 long etime = EnvironmentEdgeManager.currentTime(); 423 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 424 LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten) 425 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec", 426 (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 427 context.getCounter(Counter.FILES_COPIED).increment(1); 428 429 // Try to Preserve attributes 430 if (!preserveAttributes(outputPath, inputStat)) { 431 LOG.warn("You may have to run manually chown on: " + outputPath); 432 } 433 } catch (IOException e) { 434 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 435 context.getCounter(Counter.COPY_FAILED).increment(1); 436 throw e; 437 } 438 } 439 440 /** 441 * Create the output folder and optionally set ownership. 442 */ 443 private void createOutputPath(final Path path) throws IOException { 444 if (filesUser == null && filesGroup == null) { 445 outputFs.mkdirs(path); 446 } else { 447 Path parent = path.getParent(); 448 if (!outputFs.exists(parent) && !parent.isRoot()) { 449 createOutputPath(parent); 450 } 451 outputFs.mkdirs(path); 452 if (filesUser != null || filesGroup != null) { 453 // override the owner when non-null user/group is specified 454 outputFs.setOwner(path, filesUser, filesGroup); 455 } 456 if (filesMode > 0) { 457 outputFs.setPermission(path, new FsPermission(filesMode)); 458 } 459 } 460 } 461 462 /** 463 * Try to Preserve the files attribute selected by the user copying them from the source file 464 * This is only required when you are exporting as a different user than "hbase" or on a system 465 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 466 * can force a chmod with the user that knows is available on the system. 467 */ 468 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 469 FileStatus stat; 470 try { 471 stat = outputFs.getFileStatus(path); 472 } catch (IOException e) { 473 LOG.warn("Unable to get the status for file=" + path); 474 return false; 475 } 476 477 try { 478 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 479 outputFs.setPermission(path, new FsPermission(filesMode)); 480 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 481 outputFs.setPermission(path, refStat.getPermission()); 482 } 483 } catch (IOException e) { 484 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 485 return false; 486 } 487 488 boolean hasRefStat = (refStat != null); 489 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 490 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 491 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 492 try { 493 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 494 outputFs.setOwner(path, user, group); 495 } 496 } catch (IOException e) { 497 LOG.warn( 498 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 499 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 500 + " group=" + group); 501 return false; 502 } 503 } 504 505 return true; 506 } 507 508 private boolean stringIsNotEmpty(final String str) { 509 return str != null && !str.isEmpty(); 510 } 511 512 private long copyData(final Context context, final Path inputPath, final InputStream in, 513 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 514 throws IOException { 515 final String statusMessage = 516 "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)"; 517 518 try { 519 byte[] buffer = new byte[bufferSize]; 520 long totalBytesWritten = 0; 521 int reportBytes = 0; 522 int bytesRead; 523 524 while ((bytesRead = in.read(buffer)) > 0) { 525 out.write(buffer, 0, bytesRead); 526 totalBytesWritten += bytesRead; 527 reportBytes += bytesRead; 528 529 if (reportBytes >= reportSize) { 530 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 531 context 532 .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 533 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 534 + " to " + outputPath); 535 reportBytes = 0; 536 } 537 } 538 539 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 540 context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 541 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 542 + outputPath); 543 544 return totalBytesWritten; 545 } finally { 546 out.close(); 547 in.close(); 548 } 549 } 550 551 /** 552 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 553 * fail or if the file is not found. 554 */ 555 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 556 throws IOException { 557 try { 558 Configuration conf = context.getConfiguration(); 559 FileLink link = null; 560 switch (fileInfo.getType()) { 561 case HFILE: 562 Path inputPath = new Path(fileInfo.getHfile()); 563 link = getFileLink(inputPath, conf); 564 break; 565 case WAL: 566 String serverName = fileInfo.getWalServer(); 567 String logName = fileInfo.getWalName(); 568 link = new WALLink(inputRoot, serverName, logName); 569 break; 570 default: 571 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 572 } 573 return link.open(inputFs); 574 } catch (IOException e) { 575 context.getCounter(Counter.MISSING_FILES).increment(1); 576 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 577 throw e; 578 } 579 } 580 581 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 582 throws IOException { 583 try { 584 Configuration conf = context.getConfiguration(); 585 FileLink link = null; 586 switch (fileInfo.getType()) { 587 case HFILE: 588 Path inputPath = new Path(fileInfo.getHfile()); 589 link = getFileLink(inputPath, conf); 590 break; 591 case WAL: 592 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 593 break; 594 default: 595 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 596 } 597 return link.getFileStatus(inputFs); 598 } catch (FileNotFoundException e) { 599 context.getCounter(Counter.MISSING_FILES).increment(1); 600 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 601 throw e; 602 } catch (IOException e) { 603 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 604 throw e; 605 } 606 } 607 608 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 609 String regionName = HFileLink.getReferencedRegionName(path.getName()); 610 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 611 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 612 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 613 HFileArchiveUtil.getArchivePath(conf), path); 614 } 615 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 616 } 617 618 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 619 try { 620 return fs.getFileChecksum(path); 621 } catch (IOException e) { 622 LOG.warn("Unable to get checksum for file=" + path, e); 623 return null; 624 } 625 } 626 627 /** 628 * Utility to compare the file length and checksums for the paths specified. 629 */ 630 private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat) 631 throws IOException { 632 long inputLen = inputStat.getLen(); 633 long outputLen = outputStat.getLen(); 634 Path inputPath = inputStat.getPath(); 635 Path outputPath = outputStat.getPath(); 636 637 if (inputLen != outputLen) { 638 throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen 639 + ") and output:" + outputPath + " (" + outputLen + ")"); 640 } 641 642 // If length==0, we will skip checksum 643 if (inputLen != 0 && verifyChecksum) { 644 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 645 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 646 647 ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum); 648 if (!checksumComparison.equals(ChecksumComparison.TRUE)) { 649 StringBuilder errMessage = new StringBuilder("Checksum mismatch between ") 650 .append(inputPath).append(" and ").append(outputPath).append("."); 651 652 boolean addSkipHint = false; 653 String inputScheme = inputFs.getScheme(); 654 String outputScheme = outputFs.getScheme(); 655 if (!inputScheme.equals(outputScheme)) { 656 errMessage.append(" Input and output filesystems are of different types.\n") 657 .append("Their checksum algorithms may be incompatible."); 658 addSkipHint = true; 659 } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) { 660 errMessage.append(" Input and output differ in block-size."); 661 addSkipHint = true; 662 } else if ( 663 inChecksum != null && outChecksum != null 664 && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 665 ) { 666 errMessage.append(" Input and output checksum algorithms are of different types."); 667 addSkipHint = true; 668 } 669 if (addSkipHint) { 670 errMessage 671 .append(" You can choose file-level checksum validation via " 672 + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" 673 + " or filesystems are different.\n") 674 .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,") 675 .append( 676 " for the table backup scenario, you should use -i option to skip checksum-checks.\n") 677 .append(" (NOTE: By skipping checksums, one runs the risk of " 678 + "masking data-corruption during file-transfer.)\n"); 679 } 680 throw new IOException(errMessage.toString()); 681 } 682 } 683 } 684 685 /** 686 * Utility to compare checksums 687 */ 688 private ChecksumComparison verifyChecksum(final FileChecksum inChecksum, 689 final FileChecksum outChecksum) { 690 // If the input or output checksum is null, or the algorithms of input and output are not 691 // equal, that means there is no comparison 692 // and return not compatible. else if matched, return compatible with the matched result. 693 if ( 694 inChecksum == null || outChecksum == null 695 || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 696 ) { 697 return ChecksumComparison.INCOMPATIBLE; 698 } else if (inChecksum.equals(outChecksum)) { 699 return ChecksumComparison.TRUE; 700 } 701 return ChecksumComparison.FALSE; 702 } 703 704 /** 705 * Check if the two files are equal by looking at the file length, and at the checksum (if user 706 * has specified the verifyChecksum flag). 707 */ 708 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 709 // Not matching length 710 if (inputStat.getLen() != outputStat.getLen()) return false; 711 712 // Mark files as equals, since user asked for no checksum verification 713 if (!verifyChecksum) return true; 714 715 // If checksums are not available, files are not the same. 716 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 717 if (inChecksum == null) return false; 718 719 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 720 if (outChecksum == null) return false; 721 722 return inChecksum.equals(outChecksum); 723 } 724 } 725 726 // ========================================================================== 727 // Input Format 728 // ========================================================================== 729 730 /** 731 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 732 * @return list of files referenced by the snapshot (pair of path and size) 733 */ 734 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 735 final FileSystem fs, final Path snapshotDir) throws IOException { 736 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 737 738 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 739 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 740 741 // Get snapshot files 742 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 743 Set<String> addedFiles = new HashSet<>(); 744 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 745 new SnapshotReferenceUtil.SnapshotVisitor() { 746 @Override 747 public void storeFile(final RegionInfo regionInfo, final String family, 748 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 749 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 750 if (!storeFile.hasReference()) { 751 String region = regionInfo.getEncodedName(); 752 String hfile = storeFile.getName(); 753 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 754 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 755 } else { 756 Pair<String, String> referredToRegionAndFile = 757 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 758 String referencedRegion = referredToRegionAndFile.getFirst(); 759 String referencedHFile = referredToRegionAndFile.getSecond(); 760 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 761 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 762 } 763 String fileToExport = snapshotFileAndSize.getFirst().getHfile(); 764 if (!addedFiles.contains(fileToExport)) { 765 files.add(snapshotFileAndSize); 766 addedFiles.add(fileToExport); 767 } else { 768 LOG.debug("Skip the existing file: {}.", fileToExport); 769 } 770 } 771 }); 772 773 return files; 774 } 775 776 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 777 Configuration conf, TableName table, String region, String family, String hfile, long size) 778 throws IOException { 779 Path path = HFileLink.createPath(table, region, family, hfile); 780 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 781 .setHfile(path.toString()).build(); 782 if (size == -1) { 783 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 784 } 785 return new Pair<>(fileInfo, size); 786 } 787 788 /** 789 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 790 * The groups created will have similar amounts of bytes. 791 * <p> 792 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 793 * group fetch the bigger file available, iterating through groups alternating the direction. 794 */ 795 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( 796 final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int ngroups) { 797 List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles); 798 // Sort files by size, from small to big 799 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 800 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 801 long r = a.getSecond() - b.getSecond(); 802 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 803 } 804 }); 805 806 // create balanced groups 807 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 808 int hi = files.size() - 1; 809 int lo = 0; 810 811 List<Pair<SnapshotFileInfo, Long>> group; 812 int dir = 1; 813 int g = 0; 814 815 while (hi >= lo) { 816 if (g == fileGroups.size()) { 817 group = new LinkedList<>(); 818 fileGroups.add(group); 819 } else { 820 group = fileGroups.get(g); 821 } 822 823 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 824 825 // add the hi one 826 group.add(fileInfo); 827 828 // change direction when at the end or the beginning 829 g += dir; 830 if (g == ngroups) { 831 dir = -1; 832 g = ngroups - 1; 833 } else if (g < 0) { 834 dir = 1; 835 g = 0; 836 } 837 } 838 839 return fileGroups; 840 } 841 842 static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 843 @Override 844 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 845 TaskAttemptContext tac) throws IOException, InterruptedException { 846 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 847 } 848 849 @Override 850 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 851 Configuration conf = context.getConfiguration(); 852 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 853 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 854 855 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 856 857 Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups = 858 groupFilesForSplits(conf, snapshotFiles); 859 860 Class<? extends FileLocationResolver> fileLocationResolverClass = 861 conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class, 862 FileLocationResolver.class); 863 FileLocationResolver fileLocationResolver = 864 ReflectionUtils.newInstance(fileLocationResolverClass, conf); 865 LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit", 866 fileLocationResolverClass); 867 868 List<InputSplit> splits = new ArrayList<>(balancedGroups.size()); 869 for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) { 870 splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver)); 871 } 872 return splits; 873 } 874 875 Collection<List<Pair<SnapshotFileInfo, Long>>> groupFilesForSplits(Configuration conf, 876 List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 877 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 878 if (mappers == 0 && !snapshotFiles.isEmpty()) { 879 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 880 mappers = Math.min(mappers, snapshotFiles.size()); 881 conf.setInt(CONF_NUM_SPLITS, mappers); 882 conf.setInt(MR_NUM_MAPS, mappers); 883 } 884 885 Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass( 886 CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class); 887 CustomFileGrouper customFileGrouper = 888 ReflectionUtils.newInstance(inputFileGrouperClass, conf); 889 Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups = 890 customFileGrouper.getGroupedInputFiles(snapshotFiles); 891 892 LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass, 893 groups.size()); 894 int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1); 895 LOG.info( 896 "Splitting each group into {} InputSplits, " 897 + "to achieve closest possible amount of mappers to target of {}", 898 mappersPerGroup, mappers); 899 900 // Within each group, create splits of equal size. Groups are not mixed together. 901 return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup)) 902 .flatMap(Collection::stream).toList(); 903 } 904 905 static class ExportSnapshotInputSplit extends InputSplit implements Writable { 906 907 private List<Pair<BytesWritable, Long>> files; 908 private String[] locations; 909 private long length; 910 911 public ExportSnapshotInputSplit() { 912 this.files = null; 913 this.locations = null; 914 } 915 916 public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles, 917 FileLocationResolver fileLocationResolver) { 918 this.files = new ArrayList<>(snapshotFiles.size()); 919 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 920 this.files.add( 921 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 922 this.length += fileInfo.getSecond(); 923 } 924 this.locations = 925 fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]); 926 LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, " 927 + "with location hints: {}", files, length, locations); 928 } 929 930 private List<Pair<BytesWritable, Long>> getSplitKeys() { 931 return files; 932 } 933 934 @Override 935 public long getLength() throws IOException, InterruptedException { 936 return length; 937 } 938 939 @Override 940 public String[] getLocations() throws IOException, InterruptedException { 941 return locations; 942 } 943 944 @Override 945 public void readFields(DataInput in) throws IOException { 946 int count = in.readInt(); 947 files = new ArrayList<>(count); 948 length = 0; 949 for (int i = 0; i < count; ++i) { 950 BytesWritable fileInfo = new BytesWritable(); 951 fileInfo.readFields(in); 952 long size = in.readLong(); 953 files.add(new Pair<>(fileInfo, size)); 954 length += size; 955 } 956 int locationCount = in.readInt(); 957 List<String> locations = new ArrayList<>(locationCount); 958 for (int i = 0; i < locationCount; ++i) { 959 locations.add(in.readUTF()); 960 } 961 this.locations = locations.toArray(new String[0]); 962 } 963 964 @Override 965 public void write(DataOutput out) throws IOException { 966 out.writeInt(files.size()); 967 for (final Pair<BytesWritable, Long> fileInfo : files) { 968 fileInfo.getFirst().write(out); 969 out.writeLong(fileInfo.getSecond()); 970 } 971 out.writeInt(locations.length); 972 for (String location : locations) { 973 out.writeUTF(location); 974 } 975 } 976 } 977 978 private static class ExportSnapshotRecordReader 979 extends RecordReader<BytesWritable, NullWritable> { 980 private final List<Pair<BytesWritable, Long>> files; 981 private long totalSize = 0; 982 private long procSize = 0; 983 private int index = -1; 984 985 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 986 this.files = files; 987 for (Pair<BytesWritable, Long> fileInfo : files) { 988 totalSize += fileInfo.getSecond(); 989 } 990 } 991 992 @Override 993 public void close() { 994 } 995 996 @Override 997 public BytesWritable getCurrentKey() { 998 return files.get(index).getFirst(); 999 } 1000 1001 @Override 1002 public NullWritable getCurrentValue() { 1003 return NullWritable.get(); 1004 } 1005 1006 @Override 1007 public float getProgress() { 1008 return (float) procSize / totalSize; 1009 } 1010 1011 @Override 1012 public void initialize(InputSplit split, TaskAttemptContext tac) { 1013 } 1014 1015 @Override 1016 public boolean nextKeyValue() { 1017 if (index >= 0) { 1018 procSize += files.get(index).getSecond(); 1019 } 1020 return (++index < files.size()); 1021 } 1022 } 1023 } 1024 1025 // ========================================================================== 1026 // Tool 1027 // ========================================================================== 1028 1029 /** 1030 * Run Map-Reduce Job to perform the files copy. 1031 */ 1032 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 1033 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 1034 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB, 1035 final String storagePolicy, final String customFileGrouper, final String fileLocationResolver) 1036 throws IOException, InterruptedException, ClassNotFoundException { 1037 Configuration conf = getConf(); 1038 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 1039 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 1040 if (mappers > 0) { 1041 conf.setInt(CONF_NUM_SPLITS, mappers); 1042 conf.setInt(MR_NUM_MAPS, mappers); 1043 } 1044 conf.setInt(CONF_FILES_MODE, filesMode); 1045 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 1046 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 1047 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 1048 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 1049 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 1050 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 1051 if (storagePolicy != null) { 1052 for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) { 1053 conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue()); 1054 } 1055 } 1056 if (customFileGrouper != null) { 1057 conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper); 1058 } 1059 if (fileLocationResolver != null) { 1060 conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver); 1061 } 1062 1063 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 1064 Job job = new Job(conf); 1065 job.setJobName(jobname); 1066 job.setJarByClass(ExportSnapshot.class); 1067 TableMapReduceUtil.addDependencyJars(job); 1068 job.setMapperClass(ExportMapper.class); 1069 job.setInputFormatClass(ExportSnapshotInputFormat.class); 1070 job.setOutputFormatClass(NullOutputFormat.class); 1071 job.setMapSpeculativeExecution(false); 1072 job.setNumReduceTasks(0); 1073 1074 // Acquire the delegation Tokens 1075 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1076 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 1077 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1078 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 1079 1080 // Run the MR Job 1081 if (!job.waitForCompletion(true)) { 1082 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 1083 } 1084 } 1085 1086 private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf, 1087 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 1088 // Update the conf with the current root dir, since may be a different cluster 1089 Configuration conf = new Configuration(baseConf); 1090 CommonFSUtils.setRootDir(conf, rootDir); 1091 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 1092 boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), 1093 snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); 1094 if (isExpired) { 1095 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); 1096 } 1097 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 1098 } 1099 1100 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 1101 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 1102 ExecutorService pool = Executors 1103 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1104 List<Future<Void>> futures = new ArrayList<>(); 1105 for (Path dstPath : traversedPath) { 1106 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 1107 futures.add(future); 1108 } 1109 try { 1110 for (Future<Void> future : futures) { 1111 future.get(); 1112 } 1113 } catch (InterruptedException | ExecutionException e) { 1114 throw new IOException(e); 1115 } finally { 1116 pool.shutdownNow(); 1117 } 1118 } 1119 1120 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 1121 Configuration conf, List<Path> traversedPath) throws IOException { 1122 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1123 try { 1124 fs.setOwner(path, filesUser, filesGroup); 1125 } catch (IOException e) { 1126 throw new RuntimeException( 1127 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 1128 } 1129 }, conf); 1130 } 1131 1132 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 1133 final List<Path> traversedPath, final Configuration conf) throws IOException { 1134 if (filesMode <= 0) { 1135 return; 1136 } 1137 FsPermission perm = new FsPermission(filesMode); 1138 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1139 try { 1140 fs.setPermission(path, perm); 1141 } catch (IOException e) { 1142 throw new RuntimeException( 1143 "set permission for file " + path + " to " + filesMode + " failed", e); 1144 } 1145 }, conf); 1146 } 1147 1148 private Map<String, String> storagePolicyPerFamily(String storagePolicy) { 1149 Map<String, String> familyStoragePolicy = new HashMap<>(); 1150 for (String familyConf : storagePolicy.split("&")) { 1151 String[] familySplit = familyConf.split("="); 1152 if (familySplit.length != 2) { 1153 continue; 1154 } 1155 // family is key, storage policy is value 1156 familyStoragePolicy.put(familySplit[0], familySplit[1]); 1157 } 1158 return familyStoragePolicy; 1159 } 1160 1161 private static String generateFamilyStoragePolicyKey(String family) { 1162 return CONF_STORAGE_POLICY + "." + family; 1163 } 1164 1165 private boolean verifyTarget = true; 1166 private boolean verifySource = true; 1167 private boolean verifyChecksum = true; 1168 private String snapshotName = null; 1169 private String targetName = null; 1170 private boolean overwrite = false; 1171 private String filesGroup = null; 1172 private String filesUser = null; 1173 private Path outputRoot = null; 1174 private Path inputRoot = null; 1175 private int bandwidthMB = Integer.MAX_VALUE; 1176 private int filesMode = 0; 1177 private int mappers = 0; 1178 private boolean resetTtl = false; 1179 private String storagePolicy = null; 1180 private String customFileGrouper = null; 1181 private String fileLocationResolver = null; 1182 1183 @Override 1184 protected void processOptions(CommandLine cmd) { 1185 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 1186 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 1187 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 1188 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 1189 } 1190 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 1191 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 1192 } 1193 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 1194 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 1195 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 1196 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 1197 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 1198 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 1199 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 1200 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 1201 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 1202 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 1203 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 1204 if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) { 1205 storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt()); 1206 } 1207 if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) { 1208 customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt()); 1209 } 1210 if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) { 1211 fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt()); 1212 } 1213 } 1214 1215 /** 1216 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 1217 * @return 0 on success, and != 0 upon failure. 1218 */ 1219 @Override 1220 public int doWork() throws IOException { 1221 Configuration conf = getConf(); 1222 1223 // Check user options 1224 if (snapshotName == null) { 1225 System.err.println("Snapshot name not provided."); 1226 LOG.error("Use -h or --help for usage instructions."); 1227 return EXIT_FAILURE; 1228 } 1229 1230 if (outputRoot == null) { 1231 System.err 1232 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 1233 LOG.error("Use -h or --help for usage instructions."); 1234 return EXIT_FAILURE; 1235 } 1236 1237 if (targetName == null) { 1238 targetName = snapshotName; 1239 } 1240 if (inputRoot == null) { 1241 inputRoot = CommonFSUtils.getRootDir(conf); 1242 } else { 1243 CommonFSUtils.setRootDir(conf, inputRoot); 1244 } 1245 1246 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1247 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 1248 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1249 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 1250 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 1251 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1252 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1253 Path snapshotTmpDir = 1254 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1255 Path outputSnapshotDir = 1256 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1257 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1258 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1259 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1260 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1261 1262 // throw CorruptedSnapshotException if we can't read the snapshot info. 1263 SnapshotDescription sourceSnapshotDesc = 1264 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir); 1265 1266 // Verify snapshot source before copying files 1267 if (verifySource) { 1268 LOG.info("Verify the source snapshot's expiration status and integrity."); 1269 verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir); 1270 } 1271 1272 // Find the necessary directory which need to change owner and group 1273 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1274 if (outputFs.exists(needSetOwnerDir)) { 1275 if (skipTmp) { 1276 needSetOwnerDir = outputSnapshotDir; 1277 } else { 1278 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1279 if (outputFs.exists(needSetOwnerDir)) { 1280 needSetOwnerDir = snapshotTmpDir; 1281 } 1282 } 1283 } 1284 1285 // Check if the snapshot already exists 1286 if (outputFs.exists(outputSnapshotDir)) { 1287 if (overwrite) { 1288 if (!outputFs.delete(outputSnapshotDir, true)) { 1289 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1290 return EXIT_FAILURE; 1291 } 1292 } else { 1293 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1294 + outputSnapshotDir); 1295 return EXIT_FAILURE; 1296 } 1297 } 1298 1299 if (!skipTmp) { 1300 // Check if the snapshot already in-progress 1301 if (outputFs.exists(snapshotTmpDir)) { 1302 if (overwrite) { 1303 if (!outputFs.delete(snapshotTmpDir, true)) { 1304 System.err 1305 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1306 return EXIT_FAILURE; 1307 } 1308 } else { 1309 System.err 1310 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1311 System.err 1312 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1313 System.err 1314 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1315 return EXIT_FAILURE; 1316 } 1317 } 1318 } 1319 1320 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1321 // The snapshot references must be copied before the hfiles otherwise the cleaner 1322 // will remove them because they are unreferenced. 1323 List<Path> travesedPaths = new ArrayList<>(); 1324 boolean copySucceeded = false; 1325 try { 1326 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1327 travesedPaths = 1328 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1329 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1330 copySucceeded = true; 1331 } catch (IOException e) { 1332 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1333 + " to=" + initialOutputSnapshotDir, e); 1334 } finally { 1335 if (copySucceeded) { 1336 if (filesUser != null || filesGroup != null) { 1337 LOG.warn( 1338 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1339 + (filesGroup == null 1340 ? "" 1341 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1342 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1343 } 1344 if (filesMode > 0) { 1345 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1346 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1347 } 1348 } 1349 } 1350 1351 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1352 // reset TTL for target snapshot. 1353 if (!targetName.equals(snapshotName) || resetTtl) { 1354 SnapshotDescription.Builder snapshotDescBuilder = 1355 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1356 if (!targetName.equals(snapshotName)) { 1357 snapshotDescBuilder.setName(targetName); 1358 } 1359 if (resetTtl) { 1360 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1361 } 1362 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1363 initialOutputSnapshotDir, outputFs); 1364 if (filesUser != null || filesGroup != null) { 1365 outputFs.setOwner( 1366 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1367 filesGroup); 1368 } 1369 if (filesMode > 0) { 1370 outputFs.setPermission( 1371 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1372 new FsPermission((short) filesMode)); 1373 } 1374 } 1375 1376 // Step 2 - Start MR Job to copy files 1377 // The snapshot references must be copied before the files otherwise the files gets removed 1378 // by the HFileArchiver, since they have no references. 1379 try { 1380 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1381 filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper, 1382 fileLocationResolver); 1383 1384 LOG.info("Finalize the Snapshot Export"); 1385 if (!skipTmp) { 1386 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1387 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1388 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1389 + snapshotTmpDir + " to=" + outputSnapshotDir); 1390 } 1391 } 1392 1393 // Step 4 - Verify snapshot integrity 1394 if (verifyTarget) { 1395 LOG.info("Verify the exported snapshot's expiration status and integrity."); 1396 SnapshotDescription targetSnapshotDesc = 1397 SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir); 1398 verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir); 1399 } 1400 1401 LOG.info("Export Completed: " + targetName); 1402 return EXIT_SUCCESS; 1403 } catch (Exception e) { 1404 LOG.error("Snapshot export failed", e); 1405 if (!skipTmp) { 1406 outputFs.delete(snapshotTmpDir, true); 1407 } 1408 outputFs.delete(outputSnapshotDir, true); 1409 return EXIT_FAILURE; 1410 } 1411 } 1412 1413 @Override 1414 protected void printUsage() { 1415 super.printUsage(); 1416 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1417 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1418 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1419 + " hbase snapshot export \\\n" 1420 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1421 + " --copy-to hdfs://srv1:50070/hbase"); 1422 } 1423 1424 @Override 1425 protected void addOptions() { 1426 addRequiredOption(Options.SNAPSHOT); 1427 addOption(Options.COPY_TO); 1428 addOption(Options.COPY_FROM); 1429 addOption(Options.TARGET_NAME); 1430 addOption(Options.NO_CHECKSUM_VERIFY); 1431 addOption(Options.NO_TARGET_VERIFY); 1432 addOption(Options.NO_SOURCE_VERIFY); 1433 addOption(Options.OVERWRITE); 1434 addOption(Options.CHUSER); 1435 addOption(Options.CHGROUP); 1436 addOption(Options.CHMOD); 1437 addOption(Options.MAPPERS); 1438 addOption(Options.BANDWIDTH); 1439 addOption(Options.RESET_TTL); 1440 addOption(Options.CUSTOM_FILE_GROUPER); 1441 addOption(Options.FILE_LOCATION_RESOLVER); 1442 } 1443 1444 public static void main(String[] args) { 1445 new ExportSnapshot().doStaticMain(args); 1446 } 1447}