001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.snapshot; 019 020import java.io.BufferedInputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.io.InputStream; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Map; 035import java.util.Set; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.function.BiConsumer; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataInputStream; 043import org.apache.hadoop.fs.FSDataOutputStream; 044import org.apache.hadoop.fs.FileChecksum; 045import org.apache.hadoop.fs.FileStatus; 046import org.apache.hadoop.fs.FileSystem; 047import org.apache.hadoop.fs.Path; 048import org.apache.hadoop.fs.permission.FsPermission; 049import org.apache.hadoop.hbase.HBaseConfiguration; 050import org.apache.hadoop.hbase.HConstants; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.io.FileLink; 054import org.apache.hadoop.hbase.io.HFileLink; 055import org.apache.hadoop.hbase.io.WALLink; 056import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 057import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 058import org.apache.hadoop.hbase.mob.MobUtils; 059import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 060import org.apache.hadoop.hbase.util.AbstractHBaseTool; 061import org.apache.hadoop.hbase.util.CommonFSUtils; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.hadoop.hbase.util.HFileArchiveUtil; 065import org.apache.hadoop.hbase.util.Pair; 066import org.apache.hadoop.hbase.util.Strings; 067import org.apache.hadoop.io.BytesWritable; 068import org.apache.hadoop.io.NullWritable; 069import org.apache.hadoop.io.Writable; 070import org.apache.hadoop.mapreduce.InputFormat; 071import org.apache.hadoop.mapreduce.InputSplit; 072import org.apache.hadoop.mapreduce.Job; 073import org.apache.hadoop.mapreduce.JobContext; 074import org.apache.hadoop.mapreduce.Mapper; 075import org.apache.hadoop.mapreduce.RecordReader; 076import org.apache.hadoop.mapreduce.TaskAttemptContext; 077import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 078import org.apache.hadoop.mapreduce.security.TokenCache; 079import org.apache.hadoop.util.ReflectionUtils; 080import org.apache.hadoop.util.StringUtils; 081import org.apache.hadoop.util.Tool; 082import org.apache.yetus.audience.InterfaceAudience; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 089import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 090 091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 095 096/** 097 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the 098 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the 099 * .archive/ location. When everything is done, the second cluster can restore the snapshot. 100 */ 101@InterfaceAudience.Public 102public class ExportSnapshot extends AbstractHBaseTool implements Tool { 103 public static final String NAME = "exportsnapshot"; 104 /** Configuration prefix for overrides for the source filesystem */ 105 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 106 /** Configuration prefix for overrides for the destination filesystem */ 107 public static final String CONF_DEST_PREFIX = NAME + ".to."; 108 109 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 110 111 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 112 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 113 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 114 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 115 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 116 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 117 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 118 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 119 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 120 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 121 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 122 private static final String CONF_REPORT_SIZE = "snapshot.export.report.size"; 123 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 124 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 125 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 126 private static final String CONF_INPUT_FILE_GROUPER_CLASS = 127 "snapshot.export.input.file.grouper.class"; 128 private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS = 129 "snapshot.export.input.file.location.resolver.class"; 130 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 131 private static final String CONF_COPY_MANIFEST_THREADS = 132 "snapshot.export.copy.references.threads"; 133 private static final int DEFAULT_COPY_MANIFEST_THREADS = 134 Runtime.getRuntime().availableProcessors(); 135 private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family"; 136 137 static class Testing { 138 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 139 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 140 int failuresCountToInject = 0; 141 int injectedFailureCount = 0; 142 } 143 144 // Command line options and defaults. 145 static final class Options { 146 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 147 static final Option TARGET_NAME = 148 new Option(null, "target", true, "Target name for the snapshot."); 149 static final Option COPY_TO = 150 new Option(null, "copy-to", true, "Remote " + "destination hdfs://"); 151 static final Option COPY_FROM = 152 new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)"); 153 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 154 "Do not verify checksum, use name+length only."); 155 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 156 "Do not verify the exported snapshot's expiration status and integrity."); 157 static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false, 158 "Do not verify the source snapshot's expiration status and integrity."); 159 static final Option OVERWRITE = 160 new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists."); 161 static final Option CHUSER = 162 new Option(null, "chuser", true, "Change the owner of the files to the specified one."); 163 static final Option CHGROUP = 164 new Option(null, "chgroup", true, "Change the group of the files to the specified one."); 165 static final Option CHMOD = 166 new Option(null, "chmod", true, "Change the permission of the files to the specified one."); 167 static final Option MAPPERS = new Option(null, "mappers", true, 168 "Number of mappers to use during the copy (mapreduce.job.maps). " 169 + "If you provide a --custom-file-grouper, " 170 + "then --mappers is interpreted as the number of mappers per group."); 171 static final Option BANDWIDTH = 172 new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); 173 static final Option RESET_TTL = 174 new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); 175 static final Option STORAGE_POLICY = new Option(null, "storage-policy", true, 176 "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD"); 177 static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true, 178 "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. " 179 + "See JavaDoc on that class for more information."); 180 static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true, 181 "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. " 182 + "See JavaDoc on that class for more information."); 183 } 184 185 // Export Map-Reduce Counters, to keep track of the progress 186 public enum Counter { 187 MISSING_FILES, 188 FILES_COPIED, 189 FILES_SKIPPED, 190 COPY_FAILED, 191 BYTES_EXPECTED, 192 BYTES_SKIPPED, 193 BYTES_COPIED 194 } 195 196 /** 197 * Indicates the checksum comparison result. 198 */ 199 public enum ChecksumComparison { 200 TRUE, // checksum comparison is compatible and true. 201 FALSE, // checksum comparison is compatible and false. 202 INCOMPATIBLE, // checksum comparison is not compatible. 203 } 204 205 /** 206 * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot 207 * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation 208 * must return a data structure that contains each input file exactly once. Files that appear in 209 * separate entries in the top-level returned Collection are guaranteed to not be placed in the 210 * same InputSplit. This can be used to segregate your input files by the rack or host on which 211 * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve 212 * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper 213 * argument with the fully qualified class name of an implementation of CustomFileGrouper that's 214 * on the classpath. If this argument is not used, no particular grouping logic will be applied. 215 */ 216 @InterfaceAudience.Public 217 public interface CustomFileGrouper { 218 Collection<Collection<Pair<SnapshotFileInfo, Long>>> 219 getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles); 220 } 221 222 private static class NoopCustomFileGrouper implements CustomFileGrouper { 223 @Override 224 public Collection<Collection<Pair<SnapshotFileInfo, Long>>> 225 getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 226 return ImmutableList.of(snapshotFiles); 227 } 228 } 229 230 /** 231 * If desired, you may implement a FileLocationResolver in order to influence the _location_ 232 * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The 233 * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit 234 * being constructed. Whatever is returned will ultimately be reported by that split's 235 * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the 236 * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the 237 * --file-location-resolver argument with the fully qualified class name of an implementation of 238 * FileLocationResolver that's on the classpath. If this argument is not used, no locations will 239 * be attached to the InputSplits. 240 */ 241 @InterfaceAudience.Public 242 public interface FileLocationResolver { 243 Set<String> getLocationsForInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> files); 244 } 245 246 static class NoopFileLocationResolver implements FileLocationResolver { 247 @Override 248 public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) { 249 return ImmutableSet.of(); 250 } 251 } 252 253 private static class ExportMapper 254 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> { 255 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 256 final static int REPORT_SIZE = 1 * 1024 * 1024; 257 final static int BUFFER_SIZE = 64 * 1024; 258 259 private boolean verifyChecksum; 260 private String filesGroup; 261 private String filesUser; 262 private short filesMode; 263 private int bufferSize; 264 private int reportSize; 265 266 private FileSystem outputFs; 267 private Path outputArchive; 268 private Path outputRoot; 269 270 private FileSystem inputFs; 271 private Path inputArchive; 272 private Path inputRoot; 273 274 private static Testing testing = new Testing(); 275 276 @Override 277 public void setup(Context context) throws IOException { 278 Configuration conf = context.getConfiguration(); 279 280 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 281 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 282 283 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 284 285 filesGroup = conf.get(CONF_FILES_GROUP); 286 filesUser = conf.get(CONF_FILES_USER); 287 filesMode = (short) conf.getInt(CONF_FILES_MODE, 0); 288 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 289 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 290 291 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 292 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 293 294 try { 295 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 296 } catch (IOException e) { 297 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 298 } 299 300 try { 301 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 302 } catch (IOException e) { 303 throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e); 304 } 305 306 // Use the default block size of the outputFs if bigger 307 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 308 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 309 LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize)); 310 reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE); 311 312 for (Counter c : Counter.values()) { 313 context.getCounter(c).increment(0); 314 } 315 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 316 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 317 // Get number of times we have already injected failure based on attempt number of this 318 // task. 319 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 320 } 321 } 322 323 @Override 324 public void map(BytesWritable key, NullWritable value, Context context) 325 throws InterruptedException, IOException { 326 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 327 Path outputPath = getOutputPath(inputInfo); 328 329 copyFile(context, inputInfo, outputPath); 330 } 331 332 /** 333 * Returns the location where the inputPath will be copied. 334 */ 335 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 336 Path path = null; 337 switch (inputInfo.getType()) { 338 case HFILE: 339 Path inputPath = new Path(inputInfo.getHfile()); 340 String family = inputPath.getParent().getName(); 341 TableName table = HFileLink.getReferencedTableName(inputPath.getName()); 342 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 343 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 344 path = new Path(CommonFSUtils.getTableDir(new Path("./"), table), 345 new Path(region, new Path(family, hfile))); 346 break; 347 case WAL: 348 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 349 break; 350 default: 351 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 352 } 353 return new Path(outputArchive, path); 354 } 355 356 @SuppressWarnings("checkstyle:linelength") 357 /** 358 * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in 359 * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 360 */ 361 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 362 throws IOException { 363 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 364 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 365 testing.injectedFailureCount++; 366 context.getCounter(Counter.COPY_FAILED).increment(1); 367 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 368 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 369 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 370 } 371 372 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 373 final Path outputPath) throws IOException { 374 // Get the file information 375 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 376 377 // Verify if the output file exists and is the same that we want to copy 378 if (outputFs.exists(outputPath)) { 379 FileStatus outputStat = outputFs.getFileStatus(outputPath); 380 if (outputStat != null && sameFile(inputStat, outputStat)) { 381 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 382 context.getCounter(Counter.FILES_SKIPPED).increment(1); 383 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 384 return; 385 } 386 } 387 388 InputStream in = openSourceFile(context, inputInfo); 389 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 390 if (Integer.MAX_VALUE != bandwidthMB) { 391 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 392 } 393 394 Path inputPath = inputStat.getPath(); 395 try { 396 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 397 398 // Ensure that the output folder is there and copy the file 399 createOutputPath(outputPath.getParent()); 400 String family = new Path(inputInfo.getHfile()).getParent().getName(); 401 String familyStoragePolicy = generateFamilyStoragePolicyKey(family); 402 if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) { 403 String key = context.getConfiguration().get(familyStoragePolicy); 404 LOG.info("Setting storage policy {} for {}", key, outputPath.getParent()); 405 outputFs.setStoragePolicy(outputPath.getParent(), key); 406 } 407 FSDataOutputStream out = outputFs.create(outputPath, true); 408 409 long stime = EnvironmentEdgeManager.currentTime(); 410 long totalBytesWritten = 411 copyData(context, inputPath, in, outputPath, out, inputStat.getLen()); 412 413 // Verify the file length and checksum 414 verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath)); 415 416 long etime = EnvironmentEdgeManager.currentTime(); 417 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 418 LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten) 419 + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec", 420 (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0)); 421 context.getCounter(Counter.FILES_COPIED).increment(1); 422 423 // Try to Preserve attributes 424 if (!preserveAttributes(outputPath, inputStat)) { 425 LOG.warn("You may have to run manually chown on: " + outputPath); 426 } 427 } catch (IOException e) { 428 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 429 context.getCounter(Counter.COPY_FAILED).increment(1); 430 throw e; 431 } finally { 432 injectTestFailure(context, inputInfo); 433 } 434 } 435 436 /** 437 * Create the output folder and optionally set ownership. 438 */ 439 private void createOutputPath(final Path path) throws IOException { 440 if (filesUser == null && filesGroup == null) { 441 outputFs.mkdirs(path); 442 } else { 443 Path parent = path.getParent(); 444 if (!outputFs.exists(parent) && !parent.isRoot()) { 445 createOutputPath(parent); 446 } 447 outputFs.mkdirs(path); 448 if (filesUser != null || filesGroup != null) { 449 // override the owner when non-null user/group is specified 450 outputFs.setOwner(path, filesUser, filesGroup); 451 } 452 if (filesMode > 0) { 453 outputFs.setPermission(path, new FsPermission(filesMode)); 454 } 455 } 456 } 457 458 /** 459 * Try to Preserve the files attribute selected by the user copying them from the source file 460 * This is only required when you are exporting as a different user than "hbase" or on a system 461 * that doesn't have the "hbase" user. This is not considered a blocking failure since the user 462 * can force a chmod with the user that knows is available on the system. 463 */ 464 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 465 FileStatus stat; 466 try { 467 stat = outputFs.getFileStatus(path); 468 } catch (IOException e) { 469 LOG.warn("Unable to get the status for file=" + path); 470 return false; 471 } 472 473 try { 474 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 475 outputFs.setPermission(path, new FsPermission(filesMode)); 476 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 477 outputFs.setPermission(path, refStat.getPermission()); 478 } 479 } catch (IOException e) { 480 LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage()); 481 return false; 482 } 483 484 boolean hasRefStat = (refStat != null); 485 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 486 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 487 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 488 try { 489 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 490 outputFs.setOwner(path, user, group); 491 } 492 } catch (IOException e) { 493 LOG.warn( 494 "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage()); 495 LOG.warn("The user/group may not exist on the destination cluster: user=" + user 496 + " group=" + group); 497 return false; 498 } 499 } 500 501 return true; 502 } 503 504 private boolean stringIsNotEmpty(final String str) { 505 return str != null && !str.isEmpty(); 506 } 507 508 private long copyData(final Context context, final Path inputPath, final InputStream in, 509 final Path outputPath, final FSDataOutputStream out, final long inputFileSize) 510 throws IOException { 511 final String statusMessage = 512 "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)"; 513 514 try { 515 byte[] buffer = new byte[bufferSize]; 516 long totalBytesWritten = 0; 517 int reportBytes = 0; 518 int bytesRead; 519 520 while ((bytesRead = in.read(buffer)) > 0) { 521 out.write(buffer, 0, bytesRead); 522 totalBytesWritten += bytesRead; 523 reportBytes += bytesRead; 524 525 if (reportBytes >= reportSize) { 526 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 527 context 528 .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 529 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath 530 + " to " + outputPath); 531 reportBytes = 0; 532 } 533 } 534 535 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 536 context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten), 537 (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to " 538 + outputPath); 539 540 return totalBytesWritten; 541 } finally { 542 out.close(); 543 in.close(); 544 } 545 } 546 547 /** 548 * Try to open the "source" file. Throws an IOException if the communication with the inputFs 549 * fail or if the file is not found. 550 */ 551 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 552 throws IOException { 553 try { 554 Configuration conf = context.getConfiguration(); 555 FileLink link = null; 556 switch (fileInfo.getType()) { 557 case HFILE: 558 Path inputPath = new Path(fileInfo.getHfile()); 559 link = getFileLink(inputPath, conf); 560 break; 561 case WAL: 562 String serverName = fileInfo.getWalServer(); 563 String logName = fileInfo.getWalName(); 564 link = new WALLink(inputRoot, serverName, logName); 565 break; 566 default: 567 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 568 } 569 return link.open(inputFs); 570 } catch (IOException e) { 571 context.getCounter(Counter.MISSING_FILES).increment(1); 572 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 573 throw e; 574 } 575 } 576 577 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 578 throws IOException { 579 try { 580 Configuration conf = context.getConfiguration(); 581 FileLink link = null; 582 switch (fileInfo.getType()) { 583 case HFILE: 584 Path inputPath = new Path(fileInfo.getHfile()); 585 link = getFileLink(inputPath, conf); 586 break; 587 case WAL: 588 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 589 break; 590 default: 591 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 592 } 593 return link.getFileStatus(inputFs); 594 } catch (FileNotFoundException e) { 595 context.getCounter(Counter.MISSING_FILES).increment(1); 596 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 597 throw e; 598 } catch (IOException e) { 599 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 600 throw e; 601 } 602 } 603 604 private FileLink getFileLink(Path path, Configuration conf) throws IOException { 605 String regionName = HFileLink.getReferencedRegionName(path.getName()); 606 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 607 if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 608 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 609 HFileArchiveUtil.getArchivePath(conf), path); 610 } 611 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 612 } 613 614 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 615 try { 616 return fs.getFileChecksum(path); 617 } catch (IOException e) { 618 LOG.warn("Unable to get checksum for file=" + path, e); 619 return null; 620 } 621 } 622 623 /** 624 * Utility to compare the file length and checksums for the paths specified. 625 */ 626 private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat) 627 throws IOException { 628 long inputLen = inputStat.getLen(); 629 long outputLen = outputStat.getLen(); 630 Path inputPath = inputStat.getPath(); 631 Path outputPath = outputStat.getPath(); 632 633 if (inputLen != outputLen) { 634 throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen 635 + ") and output:" + outputPath + " (" + outputLen + ")"); 636 } 637 638 // If length==0, we will skip checksum 639 if (inputLen != 0 && verifyChecksum) { 640 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 641 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 642 643 ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum); 644 if (!checksumComparison.equals(ChecksumComparison.TRUE)) { 645 StringBuilder errMessage = new StringBuilder("Checksum mismatch between ") 646 .append(inputPath).append(" and ").append(outputPath).append("."); 647 648 boolean addSkipHint = false; 649 String inputScheme = inputFs.getScheme(); 650 String outputScheme = outputFs.getScheme(); 651 if (!inputScheme.equals(outputScheme)) { 652 errMessage.append(" Input and output filesystems are of different types.\n") 653 .append("Their checksum algorithms may be incompatible."); 654 addSkipHint = true; 655 } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) { 656 errMessage.append(" Input and output differ in block-size."); 657 addSkipHint = true; 658 } else if ( 659 inChecksum != null && outChecksum != null 660 && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 661 ) { 662 errMessage.append(" Input and output checksum algorithms are of different types."); 663 addSkipHint = true; 664 } 665 if (addSkipHint) { 666 errMessage 667 .append(" You can choose file-level checksum validation via " 668 + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes" 669 + " or filesystems are different.\n") 670 .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,") 671 .append( 672 " for the table backup scenario, you should use -i option to skip checksum-checks.\n") 673 .append(" (NOTE: By skipping checksums, one runs the risk of " 674 + "masking data-corruption during file-transfer.)\n"); 675 } 676 throw new IOException(errMessage.toString()); 677 } 678 } 679 } 680 681 /** 682 * Utility to compare checksums 683 */ 684 private ChecksumComparison verifyChecksum(final FileChecksum inChecksum, 685 final FileChecksum outChecksum) { 686 // If the input or output checksum is null, or the algorithms of input and output are not 687 // equal, that means there is no comparison 688 // and return not compatible. else if matched, return compatible with the matched result. 689 if ( 690 inChecksum == null || outChecksum == null 691 || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName()) 692 ) { 693 return ChecksumComparison.INCOMPATIBLE; 694 } else if (inChecksum.equals(outChecksum)) { 695 return ChecksumComparison.TRUE; 696 } 697 return ChecksumComparison.FALSE; 698 } 699 700 /** 701 * Check if the two files are equal by looking at the file length, and at the checksum (if user 702 * has specified the verifyChecksum flag). 703 */ 704 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 705 // Not matching length 706 if (inputStat.getLen() != outputStat.getLen()) return false; 707 708 // Mark files as equals, since user asked for no checksum verification 709 if (!verifyChecksum) return true; 710 711 // If checksums are not available, files are not the same. 712 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 713 if (inChecksum == null) return false; 714 715 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 716 if (outChecksum == null) return false; 717 718 return inChecksum.equals(outChecksum); 719 } 720 } 721 722 // ========================================================================== 723 // Input Format 724 // ========================================================================== 725 726 /** 727 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 728 * @return list of files referenced by the snapshot (pair of path and size) 729 */ 730 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 731 final FileSystem fs, final Path snapshotDir) throws IOException { 732 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 733 734 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 735 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 736 737 // Get snapshot files 738 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 739 Set<String> addedFiles = new HashSet<>(); 740 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 741 new SnapshotReferenceUtil.SnapshotVisitor() { 742 @Override 743 public void storeFile(final RegionInfo regionInfo, final String family, 744 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 745 Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null; 746 if (!storeFile.hasReference()) { 747 String region = regionInfo.getEncodedName(); 748 String hfile = storeFile.getName(); 749 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile, 750 storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 751 } else { 752 Pair<String, String> referredToRegionAndFile = 753 StoreFileInfo.getReferredToRegionAndFile(storeFile.getName()); 754 String referencedRegion = referredToRegionAndFile.getFirst(); 755 String referencedHFile = referredToRegionAndFile.getSecond(); 756 snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family, 757 referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1); 758 } 759 String fileToExport = snapshotFileAndSize.getFirst().getHfile(); 760 if (!addedFiles.contains(fileToExport)) { 761 files.add(snapshotFileAndSize); 762 addedFiles.add(fileToExport); 763 } else { 764 LOG.debug("Skip the existing file: {}.", fileToExport); 765 } 766 } 767 }); 768 769 return files; 770 } 771 772 private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs, 773 Configuration conf, TableName table, String region, String family, String hfile, long size) 774 throws IOException { 775 Path path = HFileLink.createPath(table, region, family, hfile); 776 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) 777 .setHfile(path.toString()).build(); 778 if (size == -1) { 779 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 780 } 781 return new Pair<>(fileInfo, size); 782 } 783 784 /** 785 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 786 * The groups created will have similar amounts of bytes. 787 * <p> 788 * The algorithm used is pretty straightforward; the file list is sorted by size, and then each 789 * group fetch the bigger file available, iterating through groups alternating the direction. 790 */ 791 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( 792 final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int ngroups) { 793 List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles); 794 // Sort files by size, from small to big 795 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 796 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 797 long r = a.getSecond() - b.getSecond(); 798 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 799 } 800 }); 801 802 // create balanced groups 803 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 804 int hi = files.size() - 1; 805 int lo = 0; 806 807 List<Pair<SnapshotFileInfo, Long>> group; 808 int dir = 1; 809 int g = 0; 810 811 while (hi >= lo) { 812 if (g == fileGroups.size()) { 813 group = new LinkedList<>(); 814 fileGroups.add(group); 815 } else { 816 group = fileGroups.get(g); 817 } 818 819 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 820 821 // add the hi one 822 group.add(fileInfo); 823 824 // change direction when at the end or the beginning 825 g += dir; 826 if (g == ngroups) { 827 dir = -1; 828 g = ngroups - 1; 829 } else if (g < 0) { 830 dir = 1; 831 g = 0; 832 } 833 } 834 835 return fileGroups; 836 } 837 838 static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 839 @Override 840 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 841 TaskAttemptContext tac) throws IOException, InterruptedException { 842 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys()); 843 } 844 845 @Override 846 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 847 Configuration conf = context.getConfiguration(); 848 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 849 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 850 851 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 852 853 Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups = 854 groupFilesForSplits(conf, snapshotFiles); 855 856 Class<? extends FileLocationResolver> fileLocationResolverClass = 857 conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class, 858 FileLocationResolver.class); 859 FileLocationResolver fileLocationResolver = 860 ReflectionUtils.newInstance(fileLocationResolverClass, conf); 861 LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit", 862 fileLocationResolverClass); 863 864 List<InputSplit> splits = new ArrayList<>(balancedGroups.size()); 865 for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) { 866 splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver)); 867 } 868 return splits; 869 } 870 871 Collection<List<Pair<SnapshotFileInfo, Long>>> groupFilesForSplits(Configuration conf, 872 List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 873 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 874 if (mappers == 0 && !snapshotFiles.isEmpty()) { 875 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 876 mappers = Math.min(mappers, snapshotFiles.size()); 877 conf.setInt(CONF_NUM_SPLITS, mappers); 878 conf.setInt(MR_NUM_MAPS, mappers); 879 } 880 881 Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass( 882 CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class); 883 CustomFileGrouper customFileGrouper = 884 ReflectionUtils.newInstance(inputFileGrouperClass, conf); 885 Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups = 886 customFileGrouper.getGroupedInputFiles(snapshotFiles); 887 888 LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass, 889 groups.size()); 890 int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1); 891 LOG.info( 892 "Splitting each group into {} InputSplits, " 893 + "to achieve closest possible amount of mappers to target of {}", 894 mappersPerGroup, mappers); 895 896 // Within each group, create splits of equal size. Groups are not mixed together. 897 return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup)) 898 .flatMap(Collection::stream).toList(); 899 } 900 901 static class ExportSnapshotInputSplit extends InputSplit implements Writable { 902 903 private List<Pair<BytesWritable, Long>> files; 904 private String[] locations; 905 private long length; 906 907 public ExportSnapshotInputSplit() { 908 this.files = null; 909 this.locations = null; 910 } 911 912 public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles, 913 FileLocationResolver fileLocationResolver) { 914 this.files = new ArrayList<>(snapshotFiles.size()); 915 for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) { 916 this.files.add( 917 new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 918 this.length += fileInfo.getSecond(); 919 } 920 this.locations = 921 fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]); 922 LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, " 923 + "with location hints: {}", files, length, locations); 924 } 925 926 private List<Pair<BytesWritable, Long>> getSplitKeys() { 927 return files; 928 } 929 930 @Override 931 public long getLength() throws IOException, InterruptedException { 932 return length; 933 } 934 935 @Override 936 public String[] getLocations() throws IOException, InterruptedException { 937 return locations; 938 } 939 940 @Override 941 public void readFields(DataInput in) throws IOException { 942 int count = in.readInt(); 943 files = new ArrayList<>(count); 944 length = 0; 945 for (int i = 0; i < count; ++i) { 946 BytesWritable fileInfo = new BytesWritable(); 947 fileInfo.readFields(in); 948 long size = in.readLong(); 949 files.add(new Pair<>(fileInfo, size)); 950 length += size; 951 } 952 int locationCount = in.readInt(); 953 List<String> locations = new ArrayList<>(locationCount); 954 for (int i = 0; i < locationCount; ++i) { 955 locations.add(in.readUTF()); 956 } 957 this.locations = locations.toArray(new String[0]); 958 } 959 960 @Override 961 public void write(DataOutput out) throws IOException { 962 out.writeInt(files.size()); 963 for (final Pair<BytesWritable, Long> fileInfo : files) { 964 fileInfo.getFirst().write(out); 965 out.writeLong(fileInfo.getSecond()); 966 } 967 out.writeInt(locations.length); 968 for (String location : locations) { 969 out.writeUTF(location); 970 } 971 } 972 } 973 974 private static class ExportSnapshotRecordReader 975 extends RecordReader<BytesWritable, NullWritable> { 976 private final List<Pair<BytesWritable, Long>> files; 977 private long totalSize = 0; 978 private long procSize = 0; 979 private int index = -1; 980 981 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 982 this.files = files; 983 for (Pair<BytesWritable, Long> fileInfo : files) { 984 totalSize += fileInfo.getSecond(); 985 } 986 } 987 988 @Override 989 public void close() { 990 } 991 992 @Override 993 public BytesWritable getCurrentKey() { 994 return files.get(index).getFirst(); 995 } 996 997 @Override 998 public NullWritable getCurrentValue() { 999 return NullWritable.get(); 1000 } 1001 1002 @Override 1003 public float getProgress() { 1004 return (float) procSize / totalSize; 1005 } 1006 1007 @Override 1008 public void initialize(InputSplit split, TaskAttemptContext tac) { 1009 } 1010 1011 @Override 1012 public boolean nextKeyValue() { 1013 if (index >= 0) { 1014 procSize += files.get(index).getSecond(); 1015 } 1016 return (++index < files.size()); 1017 } 1018 } 1019 } 1020 1021 // ========================================================================== 1022 // Tool 1023 // ========================================================================== 1024 1025 /** 1026 * Run Map-Reduce Job to perform the files copy. 1027 */ 1028 private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, 1029 final Path snapshotDir, final boolean verifyChecksum, final String filesUser, 1030 final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB, 1031 final String storagePolicy, final String customFileGrouper, final String fileLocationResolver) 1032 throws IOException, InterruptedException, ClassNotFoundException { 1033 Configuration conf = getConf(); 1034 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 1035 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 1036 if (mappers > 0) { 1037 conf.setInt(CONF_NUM_SPLITS, mappers); 1038 conf.setInt(MR_NUM_MAPS, mappers); 1039 } 1040 conf.setInt(CONF_FILES_MODE, filesMode); 1041 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 1042 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 1043 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 1044 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 1045 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 1046 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 1047 if (storagePolicy != null) { 1048 for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) { 1049 conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue()); 1050 } 1051 } 1052 if (customFileGrouper != null) { 1053 conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper); 1054 } 1055 if (fileLocationResolver != null) { 1056 conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver); 1057 } 1058 1059 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 1060 Job job = new Job(conf); 1061 job.setJobName(jobname); 1062 job.setJarByClass(ExportSnapshot.class); 1063 TableMapReduceUtil.addDependencyJars(job); 1064 job.setMapperClass(ExportMapper.class); 1065 job.setInputFormatClass(ExportSnapshotInputFormat.class); 1066 job.setOutputFormatClass(NullOutputFormat.class); 1067 job.setMapSpeculativeExecution(false); 1068 job.setNumReduceTasks(0); 1069 1070 // Acquire the delegation Tokens 1071 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1072 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf); 1073 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1074 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf); 1075 1076 // Run the MR Job 1077 if (!job.waitForCompletion(true)) { 1078 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 1079 } 1080 } 1081 1082 private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf, 1083 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 1084 // Update the conf with the current root dir, since may be a different cluster 1085 Configuration conf = new Configuration(baseConf); 1086 CommonFSUtils.setRootDir(conf, rootDir); 1087 CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf)); 1088 boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), 1089 snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); 1090 if (isExpired) { 1091 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); 1092 } 1093 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 1094 } 1095 1096 private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath, 1097 BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException { 1098 ExecutorService pool = Executors 1099 .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1100 List<Future<Void>> futures = new ArrayList<>(); 1101 for (Path dstPath : traversedPath) { 1102 Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath)); 1103 futures.add(future); 1104 } 1105 try { 1106 for (Future<Void> future : futures) { 1107 future.get(); 1108 } 1109 } catch (InterruptedException | ExecutionException e) { 1110 throw new IOException(e); 1111 } finally { 1112 pool.shutdownNow(); 1113 } 1114 } 1115 1116 private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, 1117 Configuration conf, List<Path> traversedPath) throws IOException { 1118 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1119 try { 1120 fs.setOwner(path, filesUser, filesGroup); 1121 } catch (IOException e) { 1122 throw new RuntimeException( 1123 "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e); 1124 } 1125 }, conf); 1126 } 1127 1128 private void setPermissionParallel(final FileSystem outputFs, final short filesMode, 1129 final List<Path> traversedPath, final Configuration conf) throws IOException { 1130 if (filesMode <= 0) { 1131 return; 1132 } 1133 FsPermission perm = new FsPermission(filesMode); 1134 setConfigParallel(outputFs, traversedPath, (fs, path) -> { 1135 try { 1136 fs.setPermission(path, perm); 1137 } catch (IOException e) { 1138 throw new RuntimeException( 1139 "set permission for file " + path + " to " + filesMode + " failed", e); 1140 } 1141 }, conf); 1142 } 1143 1144 private Map<String, String> storagePolicyPerFamily(String storagePolicy) { 1145 Map<String, String> familyStoragePolicy = new HashMap<>(); 1146 for (String familyConf : storagePolicy.split("&")) { 1147 String[] familySplit = familyConf.split("="); 1148 if (familySplit.length != 2) { 1149 continue; 1150 } 1151 // family is key, storage policy is value 1152 familyStoragePolicy.put(familySplit[0], familySplit[1]); 1153 } 1154 return familyStoragePolicy; 1155 } 1156 1157 private static String generateFamilyStoragePolicyKey(String family) { 1158 return CONF_STORAGE_POLICY + "." + family; 1159 } 1160 1161 private boolean verifyTarget = true; 1162 private boolean verifySource = true; 1163 private boolean verifyChecksum = true; 1164 private String snapshotName = null; 1165 private String targetName = null; 1166 private boolean overwrite = false; 1167 private String filesGroup = null; 1168 private String filesUser = null; 1169 private Path outputRoot = null; 1170 private Path inputRoot = null; 1171 private int bandwidthMB = Integer.MAX_VALUE; 1172 private int filesMode = 0; 1173 private int mappers = 0; 1174 private boolean resetTtl = false; 1175 private String storagePolicy = null; 1176 private String customFileGrouper = null; 1177 private String fileLocationResolver = null; 1178 1179 @Override 1180 protected void processOptions(CommandLine cmd) { 1181 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 1182 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 1183 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 1184 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 1185 } 1186 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 1187 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 1188 } 1189 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 1190 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 1191 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 1192 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8); 1193 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 1194 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 1195 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 1196 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 1197 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 1198 verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt()); 1199 resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt()); 1200 if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) { 1201 storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt()); 1202 } 1203 if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) { 1204 customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt()); 1205 } 1206 if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) { 1207 fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt()); 1208 } 1209 } 1210 1211 /** 1212 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 1213 * @return 0 on success, and != 0 upon failure. 1214 */ 1215 @Override 1216 public int doWork() throws IOException { 1217 Configuration conf = getConf(); 1218 1219 // Check user options 1220 if (snapshotName == null) { 1221 System.err.println("Snapshot name not provided."); 1222 LOG.error("Use -h or --help for usage instructions."); 1223 return EXIT_FAILURE; 1224 } 1225 1226 if (outputRoot == null) { 1227 System.err 1228 .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided."); 1229 LOG.error("Use -h or --help for usage instructions."); 1230 return EXIT_FAILURE; 1231 } 1232 1233 if (targetName == null) { 1234 targetName = snapshotName; 1235 } 1236 if (inputRoot == null) { 1237 inputRoot = CommonFSUtils.getRootDir(conf); 1238 } else { 1239 CommonFSUtils.setRootDir(conf, inputRoot); 1240 } 1241 1242 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 1243 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 1244 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 1245 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 1246 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) 1247 || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null; 1248 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 1249 Path snapshotTmpDir = 1250 SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf); 1251 Path outputSnapshotDir = 1252 SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 1253 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 1254 LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot); 1255 LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs, 1256 outputRoot.toString(), skipTmp, initialOutputSnapshotDir); 1257 1258 // throw CorruptedSnapshotException if we can't read the snapshot info. 1259 SnapshotDescription sourceSnapshotDesc = 1260 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir); 1261 1262 // Verify snapshot source before copying files 1263 if (verifySource) { 1264 LOG.info("Verify the source snapshot's expiration status and integrity."); 1265 verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir); 1266 } 1267 1268 // Find the necessary directory which need to change owner and group 1269 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 1270 if (outputFs.exists(needSetOwnerDir)) { 1271 if (skipTmp) { 1272 needSetOwnerDir = outputSnapshotDir; 1273 } else { 1274 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf); 1275 if (outputFs.exists(needSetOwnerDir)) { 1276 needSetOwnerDir = snapshotTmpDir; 1277 } 1278 } 1279 } 1280 1281 // Check if the snapshot already exists 1282 if (outputFs.exists(outputSnapshotDir)) { 1283 if (overwrite) { 1284 if (!outputFs.delete(outputSnapshotDir, true)) { 1285 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 1286 return EXIT_FAILURE; 1287 } 1288 } else { 1289 System.err.println("The snapshot '" + targetName + "' already exists in the destination: " 1290 + outputSnapshotDir); 1291 return EXIT_FAILURE; 1292 } 1293 } 1294 1295 if (!skipTmp) { 1296 // Check if the snapshot already in-progress 1297 if (outputFs.exists(snapshotTmpDir)) { 1298 if (overwrite) { 1299 if (!outputFs.delete(snapshotTmpDir, true)) { 1300 System.err 1301 .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir); 1302 return EXIT_FAILURE; 1303 } 1304 } else { 1305 System.err 1306 .println("A snapshot with the same name '" + targetName + "' may be in-progress"); 1307 System.err 1308 .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, "); 1309 System.err 1310 .println("consider removing " + snapshotTmpDir + " by using the -overwrite option"); 1311 return EXIT_FAILURE; 1312 } 1313 } 1314 } 1315 1316 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1317 // The snapshot references must be copied before the hfiles otherwise the cleaner 1318 // will remove them because they are unreferenced. 1319 List<Path> travesedPaths = new ArrayList<>(); 1320 boolean copySucceeded = false; 1321 try { 1322 LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir); 1323 travesedPaths = 1324 FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, 1325 conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); 1326 copySucceeded = true; 1327 } catch (IOException e) { 1328 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir 1329 + " to=" + initialOutputSnapshotDir, e); 1330 } finally { 1331 if (copySucceeded) { 1332 if (filesUser != null || filesGroup != null) { 1333 LOG.warn( 1334 (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser) 1335 + (filesGroup == null 1336 ? "" 1337 : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); 1338 setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); 1339 } 1340 if (filesMode > 0) { 1341 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1342 setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf); 1343 } 1344 } 1345 } 1346 1347 // Write a new .snapshotinfo if the target name is different from the source name or we want to 1348 // reset TTL for target snapshot. 1349 if (!targetName.equals(snapshotName) || resetTtl) { 1350 SnapshotDescription.Builder snapshotDescBuilder = 1351 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder(); 1352 if (!targetName.equals(snapshotName)) { 1353 snapshotDescBuilder.setName(targetName); 1354 } 1355 if (resetTtl) { 1356 snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL); 1357 } 1358 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(), 1359 initialOutputSnapshotDir, outputFs); 1360 if (filesUser != null || filesGroup != null) { 1361 outputFs.setOwner( 1362 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, 1363 filesGroup); 1364 } 1365 if (filesMode > 0) { 1366 outputFs.setPermission( 1367 new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), 1368 new FsPermission((short) filesMode)); 1369 } 1370 } 1371 1372 // Step 2 - Start MR Job to copy files 1373 // The snapshot references must be copied before the files otherwise the files gets removed 1374 // by the HFileArchiver, since they have no references. 1375 try { 1376 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, 1377 filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper, 1378 fileLocationResolver); 1379 1380 LOG.info("Finalize the Snapshot Export"); 1381 if (!skipTmp) { 1382 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1383 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1384 throw new ExportSnapshotException("Unable to rename snapshot directory from=" 1385 + snapshotTmpDir + " to=" + outputSnapshotDir); 1386 } 1387 } 1388 1389 // Step 4 - Verify snapshot integrity 1390 if (verifyTarget) { 1391 LOG.info("Verify the exported snapshot's expiration status and integrity."); 1392 SnapshotDescription targetSnapshotDesc = 1393 SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir); 1394 verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir); 1395 } 1396 1397 LOG.info("Export Completed: " + targetName); 1398 return EXIT_SUCCESS; 1399 } catch (Exception e) { 1400 LOG.error("Snapshot export failed", e); 1401 if (!skipTmp) { 1402 outputFs.delete(snapshotTmpDir, true); 1403 } 1404 outputFs.delete(outputSnapshotDir, true); 1405 return EXIT_FAILURE; 1406 } 1407 } 1408 1409 @Override 1410 protected void printUsage() { 1411 super.printUsage(); 1412 System.out.println("\n" + "Examples:\n" + " hbase snapshot export \\\n" 1413 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1414 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n" 1415 + " hbase snapshot export \\\n" 1416 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1417 + " --copy-to hdfs://srv1:50070/hbase"); 1418 } 1419 1420 @Override 1421 protected void addOptions() { 1422 addRequiredOption(Options.SNAPSHOT); 1423 addOption(Options.COPY_TO); 1424 addOption(Options.COPY_FROM); 1425 addOption(Options.TARGET_NAME); 1426 addOption(Options.NO_CHECKSUM_VERIFY); 1427 addOption(Options.NO_TARGET_VERIFY); 1428 addOption(Options.NO_SOURCE_VERIFY); 1429 addOption(Options.OVERWRITE); 1430 addOption(Options.CHUSER); 1431 addOption(Options.CHGROUP); 1432 addOption(Options.CHMOD); 1433 addOption(Options.MAPPERS); 1434 addOption(Options.BANDWIDTH); 1435 addOption(Options.RESET_TTL); 1436 addOption(Options.CUSTOM_FILE_GROUPER); 1437 addOption(Options.FILE_LOCATION_RESOLVER); 1438 } 1439 1440 public static void main(String[] args) { 1441 new ExportSnapshot().doStaticMain(args); 1442 } 1443}