001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.HashSet;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Set;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.function.BiConsumer;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileChecksum;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.FsPermission;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.io.FileLink;
051import org.apache.hadoop.hbase.io.HFileLink;
052import org.apache.hadoop.hbase.io.WALLink;
053import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
055import org.apache.hadoop.hbase.mob.MobUtils;
056import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
057import org.apache.hadoop.hbase.util.AbstractHBaseTool;
058import org.apache.hadoop.hbase.util.CommonFSUtils;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.FSUtils;
061import org.apache.hadoop.hbase.util.HFileArchiveUtil;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.Strings;
064import org.apache.hadoop.io.BytesWritable;
065import org.apache.hadoop.io.NullWritable;
066import org.apache.hadoop.io.Writable;
067import org.apache.hadoop.mapreduce.InputFormat;
068import org.apache.hadoop.mapreduce.InputSplit;
069import org.apache.hadoop.mapreduce.Job;
070import org.apache.hadoop.mapreduce.JobContext;
071import org.apache.hadoop.mapreduce.Mapper;
072import org.apache.hadoop.mapreduce.RecordReader;
073import org.apache.hadoop.mapreduce.TaskAttemptContext;
074import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
075import org.apache.hadoop.mapreduce.security.TokenCache;
076import org.apache.hadoop.util.StringUtils;
077import org.apache.hadoop.util.Tool;
078import org.apache.yetus.audience.InterfaceAudience;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
083import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
084
085import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
089
090/**
091 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
092 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
093 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
094 */
095@InterfaceAudience.Public
096public class ExportSnapshot extends AbstractHBaseTool implements Tool {
097  public static final String NAME = "exportsnapshot";
098  /** Configuration prefix for overrides for the source filesystem */
099  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
100  /** Configuration prefix for overrides for the destination filesystem */
101  public static final String CONF_DEST_PREFIX = NAME + ".to.";
102
103  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
104
105  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
106  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
107  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
108  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
109  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
110  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
111  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
112  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
113  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
114  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
115  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
116  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
117  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
118  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
119  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
120  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
121  private static final String CONF_COPY_MANIFEST_THREADS =
122    "snapshot.export.copy.references.threads";
123  private static final int DEFAULT_COPY_MANIFEST_THREADS =
124    Runtime.getRuntime().availableProcessors();
125
126  static class Testing {
127    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
128    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
129    int failuresCountToInject = 0;
130    int injectedFailureCount = 0;
131  }
132
133  // Command line options and defaults.
134  static final class Options {
135    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
136    static final Option TARGET_NAME =
137      new Option(null, "target", true, "Target name for the snapshot.");
138    static final Option COPY_TO =
139      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
140    static final Option COPY_FROM =
141      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
142    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
143      "Do not verify checksum, use name+length only.");
144    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
145      "Do not verify the exported snapshot's expiration status and integrity.");
146    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
147      "Do not verify the source snapshot's expiration status and integrity.");
148    static final Option OVERWRITE =
149      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
150    static final Option CHUSER =
151      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
152    static final Option CHGROUP =
153      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
154    static final Option CHMOD =
155      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
156    static final Option MAPPERS = new Option(null, "mappers", true,
157      "Number of mappers to use during the copy (mapreduce.job.maps).");
158    static final Option BANDWIDTH =
159      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
160    static final Option RESET_TTL =
161      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
162  }
163
164  // Export Map-Reduce Counters, to keep track of the progress
165  public enum Counter {
166    MISSING_FILES,
167    FILES_COPIED,
168    FILES_SKIPPED,
169    COPY_FAILED,
170    BYTES_EXPECTED,
171    BYTES_SKIPPED,
172    BYTES_COPIED
173  }
174
175  /**
176   * Indicates the checksum comparison result.
177   */
178  public enum ChecksumComparison {
179    TRUE, // checksum comparison is compatible and true.
180    FALSE, // checksum comparison is compatible and false.
181    INCOMPATIBLE, // checksum comparison is not compatible.
182  }
183
184  private static class ExportMapper
185    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
186    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
187    final static int REPORT_SIZE = 1 * 1024 * 1024;
188    final static int BUFFER_SIZE = 64 * 1024;
189
190    private boolean verifyChecksum;
191    private String filesGroup;
192    private String filesUser;
193    private short filesMode;
194    private int bufferSize;
195    private int reportSize;
196
197    private FileSystem outputFs;
198    private Path outputArchive;
199    private Path outputRoot;
200
201    private FileSystem inputFs;
202    private Path inputArchive;
203    private Path inputRoot;
204
205    private static Testing testing = new Testing();
206
207    @Override
208    public void setup(Context context) throws IOException {
209      Configuration conf = context.getConfiguration();
210
211      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
212      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
213
214      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
215
216      filesGroup = conf.get(CONF_FILES_GROUP);
217      filesUser = conf.get(CONF_FILES_USER);
218      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
219      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
220      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
221
222      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
223      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
224
225      try {
226        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
227      } catch (IOException e) {
228        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
229      }
230
231      try {
232        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
233      } catch (IOException e) {
234        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
235      }
236
237      // Use the default block size of the outputFs if bigger
238      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
239      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
240      LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize));
241      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
242
243      for (Counter c : Counter.values()) {
244        context.getCounter(c).increment(0);
245      }
246      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
247        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
248        // Get number of times we have already injected failure based on attempt number of this
249        // task.
250        testing.injectedFailureCount = context.getTaskAttemptID().getId();
251      }
252    }
253
254    @Override
255    public void map(BytesWritable key, NullWritable value, Context context)
256      throws InterruptedException, IOException {
257      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
258      Path outputPath = getOutputPath(inputInfo);
259
260      copyFile(context, inputInfo, outputPath);
261    }
262
263    /**
264     * Returns the location where the inputPath will be copied.
265     */
266    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
267      Path path = null;
268      switch (inputInfo.getType()) {
269        case HFILE:
270          Path inputPath = new Path(inputInfo.getHfile());
271          String family = inputPath.getParent().getName();
272          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
273          String region = HFileLink.getReferencedRegionName(inputPath.getName());
274          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
275          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
276            new Path(region, new Path(family, hfile)));
277          break;
278        case WAL:
279          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
280          break;
281        default:
282          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
283      }
284      return new Path(outputArchive, path);
285    }
286
287    @SuppressWarnings("checkstyle:linelength")
288    /**
289     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
290     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
291     */
292    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
293      throws IOException {
294      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
295      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
296      testing.injectedFailureCount++;
297      context.getCounter(Counter.COPY_FAILED).increment(1);
298      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
299      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
300        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
301    }
302
303    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
304      final Path outputPath) throws IOException {
305      // Get the file information
306      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
307
308      // Verify if the output file exists and is the same that we want to copy
309      if (outputFs.exists(outputPath)) {
310        FileStatus outputStat = outputFs.getFileStatus(outputPath);
311        if (outputStat != null && sameFile(inputStat, outputStat)) {
312          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
313          context.getCounter(Counter.FILES_SKIPPED).increment(1);
314          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
315          return;
316        }
317      }
318
319      InputStream in = openSourceFile(context, inputInfo);
320      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
321      if (Integer.MAX_VALUE != bandwidthMB) {
322        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
323      }
324
325      Path inputPath = inputStat.getPath();
326      try {
327        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
328
329        // Ensure that the output folder is there and copy the file
330        createOutputPath(outputPath.getParent());
331        FSDataOutputStream out = outputFs.create(outputPath, true);
332
333        long stime = EnvironmentEdgeManager.currentTime();
334        long totalBytesWritten =
335          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
336
337        // Verify the file length and checksum
338        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
339
340        long etime = EnvironmentEdgeManager.currentTime();
341        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
342        LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten)
343          + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec",
344            (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
345        context.getCounter(Counter.FILES_COPIED).increment(1);
346
347        // Try to Preserve attributes
348        if (!preserveAttributes(outputPath, inputStat)) {
349          LOG.warn("You may have to run manually chown on: " + outputPath);
350        }
351      } catch (IOException e) {
352        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
353        context.getCounter(Counter.COPY_FAILED).increment(1);
354        throw e;
355      } finally {
356        injectTestFailure(context, inputInfo);
357      }
358    }
359
360    /**
361     * Create the output folder and optionally set ownership.
362     */
363    private void createOutputPath(final Path path) throws IOException {
364      if (filesUser == null && filesGroup == null) {
365        outputFs.mkdirs(path);
366      } else {
367        Path parent = path.getParent();
368        if (!outputFs.exists(parent) && !parent.isRoot()) {
369          createOutputPath(parent);
370        }
371        outputFs.mkdirs(path);
372        if (filesUser != null || filesGroup != null) {
373          // override the owner when non-null user/group is specified
374          outputFs.setOwner(path, filesUser, filesGroup);
375        }
376        if (filesMode > 0) {
377          outputFs.setPermission(path, new FsPermission(filesMode));
378        }
379      }
380    }
381
382    /**
383     * Try to Preserve the files attribute selected by the user copying them from the source file
384     * This is only required when you are exporting as a different user than "hbase" or on a system
385     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
386     * can force a chmod with the user that knows is available on the system.
387     */
388    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
389      FileStatus stat;
390      try {
391        stat = outputFs.getFileStatus(path);
392      } catch (IOException e) {
393        LOG.warn("Unable to get the status for file=" + path);
394        return false;
395      }
396
397      try {
398        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
399          outputFs.setPermission(path, new FsPermission(filesMode));
400        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
401          outputFs.setPermission(path, refStat.getPermission());
402        }
403      } catch (IOException e) {
404        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
405        return false;
406      }
407
408      boolean hasRefStat = (refStat != null);
409      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
410      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
411      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
412        try {
413          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
414            outputFs.setOwner(path, user, group);
415          }
416        } catch (IOException e) {
417          LOG.warn(
418            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
419          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
420            + " group=" + group);
421          return false;
422        }
423      }
424
425      return true;
426    }
427
428    private boolean stringIsNotEmpty(final String str) {
429      return str != null && str.length() > 0;
430    }
431
432    private long copyData(final Context context, final Path inputPath, final InputStream in,
433      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
434      throws IOException {
435      final String statusMessage =
436        "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)";
437
438      try {
439        byte[] buffer = new byte[bufferSize];
440        long totalBytesWritten = 0;
441        int reportBytes = 0;
442        int bytesRead;
443
444        while ((bytesRead = in.read(buffer)) > 0) {
445          out.write(buffer, 0, bytesRead);
446          totalBytesWritten += bytesRead;
447          reportBytes += bytesRead;
448
449          if (reportBytes >= reportSize) {
450            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
451            context
452              .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
453                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
454                + " to " + outputPath);
455            reportBytes = 0;
456          }
457        }
458
459        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
460        context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
461          (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
462          + outputPath);
463
464        return totalBytesWritten;
465      } finally {
466        out.close();
467        in.close();
468      }
469    }
470
471    /**
472     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
473     * fail or if the file is not found.
474     */
475    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
476      throws IOException {
477      try {
478        Configuration conf = context.getConfiguration();
479        FileLink link = null;
480        switch (fileInfo.getType()) {
481          case HFILE:
482            Path inputPath = new Path(fileInfo.getHfile());
483            link = getFileLink(inputPath, conf);
484            break;
485          case WAL:
486            String serverName = fileInfo.getWalServer();
487            String logName = fileInfo.getWalName();
488            link = new WALLink(inputRoot, serverName, logName);
489            break;
490          default:
491            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
492        }
493        return link.open(inputFs);
494      } catch (IOException e) {
495        context.getCounter(Counter.MISSING_FILES).increment(1);
496        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
497        throw e;
498      }
499    }
500
501    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
502      throws IOException {
503      try {
504        Configuration conf = context.getConfiguration();
505        FileLink link = null;
506        switch (fileInfo.getType()) {
507          case HFILE:
508            Path inputPath = new Path(fileInfo.getHfile());
509            link = getFileLink(inputPath, conf);
510            break;
511          case WAL:
512            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
513            break;
514          default:
515            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
516        }
517        return link.getFileStatus(inputFs);
518      } catch (FileNotFoundException e) {
519        context.getCounter(Counter.MISSING_FILES).increment(1);
520        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
521        throw e;
522      } catch (IOException e) {
523        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
524        throw e;
525      }
526    }
527
528    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
529      String regionName = HFileLink.getReferencedRegionName(path.getName());
530      TableName tableName = HFileLink.getReferencedTableName(path.getName());
531      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
532        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
533          HFileArchiveUtil.getArchivePath(conf), path);
534      }
535      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
536    }
537
538    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
539      try {
540        return fs.getFileChecksum(path);
541      } catch (IOException e) {
542        LOG.warn("Unable to get checksum for file=" + path, e);
543        return null;
544      }
545    }
546
547    /**
548     * Utility to compare the file length and checksums for the paths specified.
549     */
550    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
551      throws IOException {
552      long inputLen = inputStat.getLen();
553      long outputLen = outputStat.getLen();
554      Path inputPath = inputStat.getPath();
555      Path outputPath = outputStat.getPath();
556
557      if (inputLen != outputLen) {
558        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
559          + ") and output:" + outputPath + " (" + outputLen + ")");
560      }
561
562      // If length==0, we will skip checksum
563      if (inputLen != 0 && verifyChecksum) {
564        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
565        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
566
567        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
568        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
569          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
570            .append(inputPath).append(" and ").append(outputPath).append(".");
571
572          boolean addSkipHint = false;
573          String inputScheme = inputFs.getScheme();
574          String outputScheme = outputFs.getScheme();
575          if (!inputScheme.equals(outputScheme)) {
576            errMessage.append(" Input and output filesystems are of different types.\n")
577              .append("Their checksum algorithms may be incompatible.");
578            addSkipHint = true;
579          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
580            errMessage.append(" Input and output differ in block-size.");
581            addSkipHint = true;
582          } else if (
583            inChecksum != null && outChecksum != null
584              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
585          ) {
586            errMessage.append(" Input and output checksum algorithms are of different types.");
587            addSkipHint = true;
588          }
589          if (addSkipHint) {
590            errMessage
591              .append(" You can choose file-level checksum validation via "
592                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
593                + " or filesystems are different.\n")
594              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
595              .append(
596                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
597              .append(" (NOTE: By skipping checksums, one runs the risk of "
598                + "masking data-corruption during file-transfer.)\n");
599          }
600          throw new IOException(errMessage.toString());
601        }
602      }
603    }
604
605    /**
606     * Utility to compare checksums
607     */
608    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
609      final FileChecksum outChecksum) {
610      // If the input or output checksum is null, or the algorithms of input and output are not
611      // equal, that means there is no comparison
612      // and return not compatible. else if matched, return compatible with the matched result.
613      if (
614        inChecksum == null || outChecksum == null
615          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
616      ) {
617        return ChecksumComparison.INCOMPATIBLE;
618      } else if (inChecksum.equals(outChecksum)) {
619        return ChecksumComparison.TRUE;
620      }
621      return ChecksumComparison.FALSE;
622    }
623
624    /**
625     * Check if the two files are equal by looking at the file length, and at the checksum (if user
626     * has specified the verifyChecksum flag).
627     */
628    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
629      // Not matching length
630      if (inputStat.getLen() != outputStat.getLen()) return false;
631
632      // Mark files as equals, since user asked for no checksum verification
633      if (!verifyChecksum) return true;
634
635      // If checksums are not available, files are not the same.
636      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
637      if (inChecksum == null) return false;
638
639      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
640      if (outChecksum == null) return false;
641
642      return inChecksum.equals(outChecksum);
643    }
644  }
645
646  // ==========================================================================
647  // Input Format
648  // ==========================================================================
649
650  /**
651   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
652   * @return list of files referenced by the snapshot (pair of path and size)
653   */
654  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
655    final FileSystem fs, final Path snapshotDir) throws IOException {
656    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
657
658    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
659    final TableName table = TableName.valueOf(snapshotDesc.getTable());
660
661    // Get snapshot files
662    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
663    Set<String> addedFiles = new HashSet<>();
664    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
665      new SnapshotReferenceUtil.SnapshotVisitor() {
666        @Override
667        public void storeFile(final RegionInfo regionInfo, final String family,
668          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
669          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
670          if (!storeFile.hasReference()) {
671            String region = regionInfo.getEncodedName();
672            String hfile = storeFile.getName();
673            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
674              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
675          } else {
676            Pair<String, String> referredToRegionAndFile =
677              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
678            String referencedRegion = referredToRegionAndFile.getFirst();
679            String referencedHFile = referredToRegionAndFile.getSecond();
680            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
681              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
682          }
683          String fileToExport = snapshotFileAndSize.getFirst().getHfile();
684          if (!addedFiles.contains(fileToExport)) {
685            files.add(snapshotFileAndSize);
686            addedFiles.add(fileToExport);
687          } else {
688            LOG.debug("Skip the existing file: {}.", fileToExport);
689          }
690        }
691      });
692
693    return files;
694  }
695
696  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
697    Configuration conf, TableName table, String region, String family, String hfile, long size)
698    throws IOException {
699    Path path = HFileLink.createPath(table, region, family, hfile);
700    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
701      .setHfile(path.toString()).build();
702    if (size == -1) {
703      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
704    }
705    return new Pair<>(fileInfo, size);
706  }
707
708  /**
709   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
710   * The groups created will have similar amounts of bytes.
711   * <p>
712   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
713   * group fetch the bigger file available, iterating through groups alternating the direction.
714   */
715  static List<List<Pair<SnapshotFileInfo, Long>>>
716    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
717    // Sort files by size, from small to big
718    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
719      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
720        long r = a.getSecond() - b.getSecond();
721        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
722      }
723    });
724
725    // create balanced groups
726    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
727    long[] sizeGroups = new long[ngroups];
728    int hi = files.size() - 1;
729    int lo = 0;
730
731    List<Pair<SnapshotFileInfo, Long>> group;
732    int dir = 1;
733    int g = 0;
734
735    while (hi >= lo) {
736      if (g == fileGroups.size()) {
737        group = new LinkedList<>();
738        fileGroups.add(group);
739      } else {
740        group = fileGroups.get(g);
741      }
742
743      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
744
745      // add the hi one
746      sizeGroups[g] += fileInfo.getSecond();
747      group.add(fileInfo);
748
749      // change direction when at the end or the beginning
750      g += dir;
751      if (g == ngroups) {
752        dir = -1;
753        g = ngroups - 1;
754      } else if (g < 0) {
755        dir = 1;
756        g = 0;
757      }
758    }
759
760    if (LOG.isDebugEnabled()) {
761      for (int i = 0; i < sizeGroups.length; ++i) {
762        LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i]));
763      }
764    }
765
766    return fileGroups;
767  }
768
769  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
770    @Override
771    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
772      TaskAttemptContext tac) throws IOException, InterruptedException {
773      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
774    }
775
776    @Override
777    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
778      Configuration conf = context.getConfiguration();
779      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
780      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
781
782      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
783      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
784      if (mappers == 0 && snapshotFiles.size() > 0) {
785        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
786        mappers = Math.min(mappers, snapshotFiles.size());
787        conf.setInt(CONF_NUM_SPLITS, mappers);
788        conf.setInt(MR_NUM_MAPS, mappers);
789      }
790
791      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
792      List<InputSplit> splits = new ArrayList(groups.size());
793      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
794        splits.add(new ExportSnapshotInputSplit(files));
795      }
796      return splits;
797    }
798
799    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
800      private List<Pair<BytesWritable, Long>> files;
801      private long length;
802
803      public ExportSnapshotInputSplit() {
804        this.files = null;
805      }
806
807      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
808        this.files = new ArrayList(snapshotFiles.size());
809        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
810          this.files.add(
811            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
812          this.length += fileInfo.getSecond();
813        }
814      }
815
816      private List<Pair<BytesWritable, Long>> getSplitKeys() {
817        return files;
818      }
819
820      @Override
821      public long getLength() throws IOException, InterruptedException {
822        return length;
823      }
824
825      @Override
826      public String[] getLocations() throws IOException, InterruptedException {
827        return new String[] {};
828      }
829
830      @Override
831      public void readFields(DataInput in) throws IOException {
832        int count = in.readInt();
833        files = new ArrayList<>(count);
834        length = 0;
835        for (int i = 0; i < count; ++i) {
836          BytesWritable fileInfo = new BytesWritable();
837          fileInfo.readFields(in);
838          long size = in.readLong();
839          files.add(new Pair<>(fileInfo, size));
840          length += size;
841        }
842      }
843
844      @Override
845      public void write(DataOutput out) throws IOException {
846        out.writeInt(files.size());
847        for (final Pair<BytesWritable, Long> fileInfo : files) {
848          fileInfo.getFirst().write(out);
849          out.writeLong(fileInfo.getSecond());
850        }
851      }
852    }
853
854    private static class ExportSnapshotRecordReader
855      extends RecordReader<BytesWritable, NullWritable> {
856      private final List<Pair<BytesWritable, Long>> files;
857      private long totalSize = 0;
858      private long procSize = 0;
859      private int index = -1;
860
861      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
862        this.files = files;
863        for (Pair<BytesWritable, Long> fileInfo : files) {
864          totalSize += fileInfo.getSecond();
865        }
866      }
867
868      @Override
869      public void close() {
870      }
871
872      @Override
873      public BytesWritable getCurrentKey() {
874        return files.get(index).getFirst();
875      }
876
877      @Override
878      public NullWritable getCurrentValue() {
879        return NullWritable.get();
880      }
881
882      @Override
883      public float getProgress() {
884        return (float) procSize / totalSize;
885      }
886
887      @Override
888      public void initialize(InputSplit split, TaskAttemptContext tac) {
889      }
890
891      @Override
892      public boolean nextKeyValue() {
893        if (index >= 0) {
894          procSize += files.get(index).getSecond();
895        }
896        return (++index < files.size());
897      }
898    }
899  }
900
901  // ==========================================================================
902  // Tool
903  // ==========================================================================
904
905  /**
906   * Run Map-Reduce Job to perform the files copy.
907   */
908  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
909    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
910    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
911    throws IOException, InterruptedException, ClassNotFoundException {
912    Configuration conf = getConf();
913    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
914    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
915    if (mappers > 0) {
916      conf.setInt(CONF_NUM_SPLITS, mappers);
917      conf.setInt(MR_NUM_MAPS, mappers);
918    }
919    conf.setInt(CONF_FILES_MODE, filesMode);
920    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
921    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
922    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
923    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
924    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
925    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
926
927    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
928    Job job = new Job(conf);
929    job.setJobName(jobname);
930    job.setJarByClass(ExportSnapshot.class);
931    TableMapReduceUtil.addDependencyJars(job);
932    job.setMapperClass(ExportMapper.class);
933    job.setInputFormatClass(ExportSnapshotInputFormat.class);
934    job.setOutputFormatClass(NullOutputFormat.class);
935    job.setMapSpeculativeExecution(false);
936    job.setNumReduceTasks(0);
937
938    // Acquire the delegation Tokens
939    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
940    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
941    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
942    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
943
944    // Run the MR Job
945    if (!job.waitForCompletion(true)) {
946      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
947    }
948  }
949
950  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
951    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
952    // Update the conf with the current root dir, since may be a different cluster
953    Configuration conf = new Configuration(baseConf);
954    CommonFSUtils.setRootDir(conf, rootDir);
955    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
956    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
957      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
958    if (isExpired) {
959      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
960    }
961    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
962  }
963
964  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
965    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
966    ExecutorService pool = Executors
967      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
968    List<Future<Void>> futures = new ArrayList<>();
969    for (Path dstPath : traversedPath) {
970      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
971      futures.add(future);
972    }
973    try {
974      for (Future<Void> future : futures) {
975        future.get();
976      }
977    } catch (InterruptedException | ExecutionException e) {
978      throw new IOException(e);
979    } finally {
980      pool.shutdownNow();
981    }
982  }
983
984  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
985    Configuration conf, List<Path> traversedPath) throws IOException {
986    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
987      try {
988        fs.setOwner(path, filesUser, filesGroup);
989      } catch (IOException e) {
990        throw new RuntimeException(
991          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
992      }
993    }, conf);
994  }
995
996  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
997    final List<Path> traversedPath, final Configuration conf) throws IOException {
998    if (filesMode <= 0) {
999      return;
1000    }
1001    FsPermission perm = new FsPermission(filesMode);
1002    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1003      try {
1004        fs.setPermission(path, perm);
1005      } catch (IOException e) {
1006        throw new RuntimeException(
1007          "set permission for file " + path + " to " + filesMode + " failed", e);
1008      }
1009    }, conf);
1010  }
1011
1012  private boolean verifyTarget = true;
1013  private boolean verifySource = true;
1014  private boolean verifyChecksum = true;
1015  private String snapshotName = null;
1016  private String targetName = null;
1017  private boolean overwrite = false;
1018  private String filesGroup = null;
1019  private String filesUser = null;
1020  private Path outputRoot = null;
1021  private Path inputRoot = null;
1022  private int bandwidthMB = Integer.MAX_VALUE;
1023  private int filesMode = 0;
1024  private int mappers = 0;
1025  private boolean resetTtl = false;
1026
1027  @Override
1028  protected void processOptions(CommandLine cmd) {
1029    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1030    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1031    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1032      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1033    }
1034    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1035      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1036    }
1037    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1038    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1039    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1040    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1041    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1042    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1043    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1044    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1045    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1046    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1047    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1048  }
1049
1050  /**
1051   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1052   * @return 0 on success, and != 0 upon failure.
1053   */
1054  @Override
1055  public int doWork() throws IOException {
1056    Configuration conf = getConf();
1057
1058    // Check user options
1059    if (snapshotName == null) {
1060      System.err.println("Snapshot name not provided.");
1061      LOG.error("Use -h or --help for usage instructions.");
1062      return EXIT_FAILURE;
1063    }
1064
1065    if (outputRoot == null) {
1066      System.err
1067        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1068      LOG.error("Use -h or --help for usage instructions.");
1069      return EXIT_FAILURE;
1070    }
1071
1072    if (targetName == null) {
1073      targetName = snapshotName;
1074    }
1075    if (inputRoot == null) {
1076      inputRoot = CommonFSUtils.getRootDir(conf);
1077    } else {
1078      CommonFSUtils.setRootDir(conf, inputRoot);
1079    }
1080
1081    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1082    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1083    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1084    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1085    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1086      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1087    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1088    Path snapshotTmpDir =
1089      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1090    Path outputSnapshotDir =
1091      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1092    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1093    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1094    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1095      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1096
1097    // throw CorruptedSnapshotException if we can't read the snapshot info.
1098    SnapshotDescription sourceSnapshotDesc =
1099      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1100
1101    // Verify snapshot source before copying files
1102    if (verifySource) {
1103      LOG.info("Verify the source snapshot's expiration status and integrity.");
1104      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1105    }
1106
1107    // Find the necessary directory which need to change owner and group
1108    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1109    if (outputFs.exists(needSetOwnerDir)) {
1110      if (skipTmp) {
1111        needSetOwnerDir = outputSnapshotDir;
1112      } else {
1113        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1114        if (outputFs.exists(needSetOwnerDir)) {
1115          needSetOwnerDir = snapshotTmpDir;
1116        }
1117      }
1118    }
1119
1120    // Check if the snapshot already exists
1121    if (outputFs.exists(outputSnapshotDir)) {
1122      if (overwrite) {
1123        if (!outputFs.delete(outputSnapshotDir, true)) {
1124          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1125          return EXIT_FAILURE;
1126        }
1127      } else {
1128        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1129          + outputSnapshotDir);
1130        return EXIT_FAILURE;
1131      }
1132    }
1133
1134    if (!skipTmp) {
1135      // Check if the snapshot already in-progress
1136      if (outputFs.exists(snapshotTmpDir)) {
1137        if (overwrite) {
1138          if (!outputFs.delete(snapshotTmpDir, true)) {
1139            System.err
1140              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1141            return EXIT_FAILURE;
1142          }
1143        } else {
1144          System.err
1145            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1146          System.err
1147            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1148          System.err
1149            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1150          return EXIT_FAILURE;
1151        }
1152      }
1153    }
1154
1155    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1156    // The snapshot references must be copied before the hfiles otherwise the cleaner
1157    // will remove them because they are unreferenced.
1158    List<Path> travesedPaths = new ArrayList<>();
1159    boolean copySucceeded = false;
1160    try {
1161      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1162      travesedPaths =
1163        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1164          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1165      copySucceeded = true;
1166    } catch (IOException e) {
1167      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1168        + " to=" + initialOutputSnapshotDir, e);
1169    } finally {
1170      if (copySucceeded) {
1171        if (filesUser != null || filesGroup != null) {
1172          LOG.warn(
1173            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1174              + (filesGroup == null
1175                ? ""
1176                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1177          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1178        }
1179        if (filesMode > 0) {
1180          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1181          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1182        }
1183      }
1184    }
1185
1186    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1187    // reset TTL for target snapshot.
1188    if (!targetName.equals(snapshotName) || resetTtl) {
1189      SnapshotDescription.Builder snapshotDescBuilder =
1190        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1191      if (!targetName.equals(snapshotName)) {
1192        snapshotDescBuilder.setName(targetName);
1193      }
1194      if (resetTtl) {
1195        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1196      }
1197      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1198        initialOutputSnapshotDir, outputFs);
1199      if (filesUser != null || filesGroup != null) {
1200        outputFs.setOwner(
1201          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1202          filesGroup);
1203      }
1204      if (filesMode > 0) {
1205        outputFs.setPermission(
1206          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1207          new FsPermission((short) filesMode));
1208      }
1209    }
1210
1211    // Step 2 - Start MR Job to copy files
1212    // The snapshot references must be copied before the files otherwise the files gets removed
1213    // by the HFileArchiver, since they have no references.
1214    try {
1215      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1216        filesGroup, filesMode, mappers, bandwidthMB);
1217
1218      LOG.info("Finalize the Snapshot Export");
1219      if (!skipTmp) {
1220        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1221        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1222          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1223            + snapshotTmpDir + " to=" + outputSnapshotDir);
1224        }
1225      }
1226
1227      // Step 4 - Verify snapshot integrity
1228      if (verifyTarget) {
1229        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1230        SnapshotDescription targetSnapshotDesc =
1231          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1232        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1233      }
1234
1235      LOG.info("Export Completed: " + targetName);
1236      return EXIT_SUCCESS;
1237    } catch (Exception e) {
1238      LOG.error("Snapshot export failed", e);
1239      if (!skipTmp) {
1240        outputFs.delete(snapshotTmpDir, true);
1241      }
1242      outputFs.delete(outputSnapshotDir, true);
1243      return EXIT_FAILURE;
1244    }
1245  }
1246
1247  @Override
1248  protected void printUsage() {
1249    super.printUsage();
1250    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1251      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1252      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1253      + "  hbase snapshot export \\\n"
1254      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1255      + "    --copy-to hdfs://srv1:50070/hbase");
1256  }
1257
1258  @Override
1259  protected void addOptions() {
1260    addRequiredOption(Options.SNAPSHOT);
1261    addOption(Options.COPY_TO);
1262    addOption(Options.COPY_FROM);
1263    addOption(Options.TARGET_NAME);
1264    addOption(Options.NO_CHECKSUM_VERIFY);
1265    addOption(Options.NO_TARGET_VERIFY);
1266    addOption(Options.NO_SOURCE_VERIFY);
1267    addOption(Options.OVERWRITE);
1268    addOption(Options.CHUSER);
1269    addOption(Options.CHGROUP);
1270    addOption(Options.CHMOD);
1271    addOption(Options.MAPPERS);
1272    addOption(Options.BANDWIDTH);
1273    addOption(Options.RESET_TTL);
1274  }
1275
1276  public static void main(String[] args) {
1277    new ExportSnapshot().doStaticMain(args);
1278  }
1279}