001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.function.BiConsumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileChecksum;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.io.FileLink;
049import org.apache.hadoop.hbase.io.HFileLink;
050import org.apache.hadoop.hbase.io.WALLink;
051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mob.MobUtils;
054import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.IOUtils;
063import org.apache.hadoop.io.NullWritable;
064import org.apache.hadoop.io.Writable;
065import org.apache.hadoop.mapreduce.InputFormat;
066import org.apache.hadoop.mapreduce.InputSplit;
067import org.apache.hadoop.mapreduce.Job;
068import org.apache.hadoop.mapreduce.JobContext;
069import org.apache.hadoop.mapreduce.Mapper;
070import org.apache.hadoop.mapreduce.RecordReader;
071import org.apache.hadoop.mapreduce.TaskAttemptContext;
072import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
073import org.apache.hadoop.mapreduce.security.TokenCache;
074import org.apache.hadoop.util.StringUtils;
075import org.apache.hadoop.util.Tool;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
081import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
082
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
086
087/**
088 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
089 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
090 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
091 */
092@InterfaceAudience.Public
093public class ExportSnapshot extends AbstractHBaseTool implements Tool {
094  public static final String NAME = "exportsnapshot";
095  /** Configuration prefix for overrides for the source filesystem */
096  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
097  /** Configuration prefix for overrides for the destination filesystem */
098  public static final String CONF_DEST_PREFIX = NAME + ".to.";
099
100  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
101
102  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
103  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
104  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
105  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
106  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
107  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
108  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
109  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
110  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
111  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
112  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
113  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
114  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
115  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
116  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
117  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
118  private static final String CONF_COPY_MANIFEST_THREADS =
119    "snapshot.export.copy.references.threads";
120  private static final int DEFAULT_COPY_MANIFEST_THREADS =
121    Runtime.getRuntime().availableProcessors();
122
123  static class Testing {
124    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
125    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
126    int failuresCountToInject = 0;
127    int injectedFailureCount = 0;
128  }
129
130  // Command line options and defaults.
131  static final class Options {
132    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
133    static final Option TARGET_NAME =
134      new Option(null, "target", true, "Target name for the snapshot.");
135    static final Option COPY_TO =
136      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
137    static final Option COPY_FROM =
138      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
139    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
140      "Do not verify checksum, use name+length only.");
141    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
142      "Do not verify the integrity of the exported snapshot.");
143    static final Option NO_SOURCE_VERIFY =
144      new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot.");
145    static final Option OVERWRITE =
146      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
147    static final Option CHUSER =
148      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
149    static final Option CHGROUP =
150      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
151    static final Option CHMOD =
152      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
153    static final Option MAPPERS = new Option(null, "mappers", true,
154      "Number of mappers to use during the copy (mapreduce.job.maps).");
155    static final Option BANDWIDTH =
156      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
157    static final Option RESET_TTL =
158      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
159  }
160
161  // Export Map-Reduce Counters, to keep track of the progress
162  public enum Counter {
163    MISSING_FILES,
164    FILES_COPIED,
165    FILES_SKIPPED,
166    COPY_FAILED,
167    BYTES_EXPECTED,
168    BYTES_SKIPPED,
169    BYTES_COPIED
170  }
171
172  private static class ExportMapper
173    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
174    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
175    final static int REPORT_SIZE = 1 * 1024 * 1024;
176    final static int BUFFER_SIZE = 64 * 1024;
177
178    private boolean verifyChecksum;
179    private String filesGroup;
180    private String filesUser;
181    private short filesMode;
182    private int bufferSize;
183    private int reportSize;
184
185    private FileSystem outputFs;
186    private Path outputArchive;
187    private Path outputRoot;
188
189    private FileSystem inputFs;
190    private Path inputArchive;
191    private Path inputRoot;
192
193    private static Testing testing = new Testing();
194
195    @Override
196    public void setup(Context context) throws IOException {
197      Configuration conf = context.getConfiguration();
198
199      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
200      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
201
202      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
203
204      filesGroup = conf.get(CONF_FILES_GROUP);
205      filesUser = conf.get(CONF_FILES_USER);
206      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
207      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
208      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
209
210      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
211      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
212
213      try {
214        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
215        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
216      } catch (IOException e) {
217        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
218      }
219
220      try {
221        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
222        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
223      } catch (IOException e) {
224        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
225      }
226
227      // Use the default block size of the outputFs if bigger
228      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
229      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
230      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
231      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
232
233      for (Counter c : Counter.values()) {
234        context.getCounter(c).increment(0);
235      }
236      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
237        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
238        // Get number of times we have already injected failure based on attempt number of this
239        // task.
240        testing.injectedFailureCount = context.getTaskAttemptID().getId();
241      }
242    }
243
244    @Override
245    protected void cleanup(Context context) {
246      IOUtils.closeStream(inputFs);
247      IOUtils.closeStream(outputFs);
248    }
249
250    @Override
251    public void map(BytesWritable key, NullWritable value, Context context)
252      throws InterruptedException, IOException {
253      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
254      Path outputPath = getOutputPath(inputInfo);
255
256      copyFile(context, inputInfo, outputPath);
257    }
258
259    /**
260     * Returns the location where the inputPath will be copied.
261     */
262    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
263      Path path = null;
264      switch (inputInfo.getType()) {
265        case HFILE:
266          Path inputPath = new Path(inputInfo.getHfile());
267          String family = inputPath.getParent().getName();
268          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
269          String region = HFileLink.getReferencedRegionName(inputPath.getName());
270          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
271          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
272            new Path(region, new Path(family, hfile)));
273          break;
274        case WAL:
275          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
276          break;
277        default:
278          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
279      }
280      return new Path(outputArchive, path);
281    }
282
283    @SuppressWarnings("checkstyle:linelength")
284    /**
285     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
286     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
287     */
288    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
289      throws IOException {
290      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
291      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
292      testing.injectedFailureCount++;
293      context.getCounter(Counter.COPY_FAILED).increment(1);
294      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
295      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
296        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
297    }
298
299    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
300      final Path outputPath) throws IOException {
301      // Get the file information
302      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
303
304      // Verify if the output file exists and is the same that we want to copy
305      if (outputFs.exists(outputPath)) {
306        FileStatus outputStat = outputFs.getFileStatus(outputPath);
307        if (outputStat != null && sameFile(inputStat, outputStat)) {
308          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
309          context.getCounter(Counter.FILES_SKIPPED).increment(1);
310          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
311          return;
312        }
313      }
314
315      InputStream in = openSourceFile(context, inputInfo);
316      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
317      if (Integer.MAX_VALUE != bandwidthMB) {
318        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
319      }
320
321      try {
322        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
323
324        // Ensure that the output folder is there and copy the file
325        createOutputPath(outputPath.getParent());
326        FSDataOutputStream out = outputFs.create(outputPath, true);
327        try {
328          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
329        } finally {
330          out.close();
331        }
332
333        // Try to Preserve attributes
334        if (!preserveAttributes(outputPath, inputStat)) {
335          LOG.warn("You may have to run manually chown on: " + outputPath);
336        }
337      } finally {
338        in.close();
339        injectTestFailure(context, inputInfo);
340      }
341    }
342
343    /**
344     * Create the output folder and optionally set ownership.
345     */
346    private void createOutputPath(final Path path) throws IOException {
347      if (filesUser == null && filesGroup == null) {
348        outputFs.mkdirs(path);
349      } else {
350        Path parent = path.getParent();
351        if (!outputFs.exists(parent) && !parent.isRoot()) {
352          createOutputPath(parent);
353        }
354        outputFs.mkdirs(path);
355        if (filesUser != null || filesGroup != null) {
356          // override the owner when non-null user/group is specified
357          outputFs.setOwner(path, filesUser, filesGroup);
358        }
359        if (filesMode > 0) {
360          outputFs.setPermission(path, new FsPermission(filesMode));
361        }
362      }
363    }
364
365    /**
366     * Try to Preserve the files attribute selected by the user copying them from the source file
367     * This is only required when you are exporting as a different user than "hbase" or on a system
368     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
369     * can force a chmod with the user that knows is available on the system.
370     */
371    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
372      FileStatus stat;
373      try {
374        stat = outputFs.getFileStatus(path);
375      } catch (IOException e) {
376        LOG.warn("Unable to get the status for file=" + path);
377        return false;
378      }
379
380      try {
381        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
382          outputFs.setPermission(path, new FsPermission(filesMode));
383        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
384          outputFs.setPermission(path, refStat.getPermission());
385        }
386      } catch (IOException e) {
387        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
388        return false;
389      }
390
391      boolean hasRefStat = (refStat != null);
392      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
393      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
394      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
395        try {
396          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
397            outputFs.setOwner(path, user, group);
398          }
399        } catch (IOException e) {
400          LOG.warn(
401            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
402          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
403            + " group=" + group);
404          return false;
405        }
406      }
407
408      return true;
409    }
410
411    private boolean stringIsNotEmpty(final String str) {
412      return str != null && str.length() > 0;
413    }
414
415    private void copyData(final Context context, final Path inputPath, final InputStream in,
416      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
417      throws IOException {
418      final String statusMessage =
419        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
420
421      try {
422        byte[] buffer = new byte[bufferSize];
423        long totalBytesWritten = 0;
424        int reportBytes = 0;
425        int bytesRead;
426
427        long stime = EnvironmentEdgeManager.currentTime();
428        while ((bytesRead = in.read(buffer)) > 0) {
429          out.write(buffer, 0, bytesRead);
430          totalBytesWritten += bytesRead;
431          reportBytes += bytesRead;
432
433          if (reportBytes >= reportSize) {
434            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
435            context.setStatus(
436              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
437                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
438                + " to " + outputPath);
439            reportBytes = 0;
440          }
441        }
442        long etime = EnvironmentEdgeManager.currentTime();
443
444        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
445        context
446          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
447            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
448            + outputPath);
449
450        // Verify that the written size match
451        if (totalBytesWritten != inputFileSize) {
452          String msg = "number of bytes copied not matching copied=" + totalBytesWritten
453            + " expected=" + inputFileSize + " for file=" + inputPath;
454          throw new IOException(msg);
455        }
456
457        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
458        LOG
459          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
460            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
461              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
462        context.getCounter(Counter.FILES_COPIED).increment(1);
463      } catch (IOException e) {
464        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
465        context.getCounter(Counter.COPY_FAILED).increment(1);
466        throw e;
467      }
468    }
469
470    /**
471     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
472     * fail or if the file is not found.
473     */
474    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
475      throws IOException {
476      try {
477        Configuration conf = context.getConfiguration();
478        FileLink link = null;
479        switch (fileInfo.getType()) {
480          case HFILE:
481            Path inputPath = new Path(fileInfo.getHfile());
482            link = getFileLink(inputPath, conf);
483            break;
484          case WAL:
485            String serverName = fileInfo.getWalServer();
486            String logName = fileInfo.getWalName();
487            link = new WALLink(inputRoot, serverName, logName);
488            break;
489          default:
490            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
491        }
492        return link.open(inputFs);
493      } catch (IOException e) {
494        context.getCounter(Counter.MISSING_FILES).increment(1);
495        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
496        throw e;
497      }
498    }
499
500    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
501      throws IOException {
502      try {
503        Configuration conf = context.getConfiguration();
504        FileLink link = null;
505        switch (fileInfo.getType()) {
506          case HFILE:
507            Path inputPath = new Path(fileInfo.getHfile());
508            link = getFileLink(inputPath, conf);
509            break;
510          case WAL:
511            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
512            break;
513          default:
514            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
515        }
516        return link.getFileStatus(inputFs);
517      } catch (FileNotFoundException e) {
518        context.getCounter(Counter.MISSING_FILES).increment(1);
519        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
520        throw e;
521      } catch (IOException e) {
522        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
523        throw e;
524      }
525    }
526
527    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
528      String regionName = HFileLink.getReferencedRegionName(path.getName());
529      TableName tableName = HFileLink.getReferencedTableName(path.getName());
530      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
531        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
532          HFileArchiveUtil.getArchivePath(conf), path);
533      }
534      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
535    }
536
537    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
538      try {
539        return fs.getFileChecksum(path);
540      } catch (IOException e) {
541        LOG.warn("Unable to get checksum for file=" + path, e);
542        return null;
543      }
544    }
545
546    /**
547     * Check if the two files are equal by looking at the file length, and at the checksum (if user
548     * has specified the verifyChecksum flag).
549     */
550    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
551      // Not matching length
552      if (inputStat.getLen() != outputStat.getLen()) return false;
553
554      // Mark files as equals, since user asked for no checksum verification
555      if (!verifyChecksum) return true;
556
557      // If checksums are not available, files are not the same.
558      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
559      if (inChecksum == null) return false;
560
561      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
562      if (outChecksum == null) return false;
563
564      return inChecksum.equals(outChecksum);
565    }
566  }
567
568  // ==========================================================================
569  // Input Format
570  // ==========================================================================
571
572  /**
573   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
574   * @return list of files referenced by the snapshot (pair of path and size)
575   */
576  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
577    final FileSystem fs, final Path snapshotDir) throws IOException {
578    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
579
580    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
581    final TableName table = TableName.valueOf(snapshotDesc.getTable());
582
583    // Get snapshot files
584    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
585    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
586      new SnapshotReferenceUtil.SnapshotVisitor() {
587        @Override
588        public void storeFile(final RegionInfo regionInfo, final String family,
589          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
590          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
591          if (!storeFile.hasReference()) {
592            String region = regionInfo.getEncodedName();
593            String hfile = storeFile.getName();
594            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
595              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
596          } else {
597            Pair<String, String> referredToRegionAndFile =
598              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
599            String referencedRegion = referredToRegionAndFile.getFirst();
600            String referencedHFile = referredToRegionAndFile.getSecond();
601            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
602              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
603          }
604          files.add(snapshotFileAndSize);
605        }
606      });
607
608    return files;
609  }
610
611  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
612    Configuration conf, TableName table, String region, String family, String hfile, long size)
613    throws IOException {
614    Path path = HFileLink.createPath(table, region, family, hfile);
615    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
616      .setHfile(path.toString()).build();
617    if (size == -1) {
618      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
619    }
620    return new Pair<>(fileInfo, size);
621  }
622
623  /**
624   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
625   * The groups created will have similar amounts of bytes.
626   * <p>
627   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
628   * group fetch the bigger file available, iterating through groups alternating the direction.
629   */
630  static List<List<Pair<SnapshotFileInfo, Long>>>
631    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
632    // Sort files by size, from small to big
633    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
634      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
635        long r = a.getSecond() - b.getSecond();
636        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
637      }
638    });
639
640    // create balanced groups
641    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
642    long[] sizeGroups = new long[ngroups];
643    int hi = files.size() - 1;
644    int lo = 0;
645
646    List<Pair<SnapshotFileInfo, Long>> group;
647    int dir = 1;
648    int g = 0;
649
650    while (hi >= lo) {
651      if (g == fileGroups.size()) {
652        group = new LinkedList<>();
653        fileGroups.add(group);
654      } else {
655        group = fileGroups.get(g);
656      }
657
658      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
659
660      // add the hi one
661      sizeGroups[g] += fileInfo.getSecond();
662      group.add(fileInfo);
663
664      // change direction when at the end or the beginning
665      g += dir;
666      if (g == ngroups) {
667        dir = -1;
668        g = ngroups - 1;
669      } else if (g < 0) {
670        dir = 1;
671        g = 0;
672      }
673    }
674
675    if (LOG.isDebugEnabled()) {
676      for (int i = 0; i < sizeGroups.length; ++i) {
677        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
678      }
679    }
680
681    return fileGroups;
682  }
683
684  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
685    @Override
686    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
687      TaskAttemptContext tac) throws IOException, InterruptedException {
688      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
689    }
690
691    @Override
692    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
693      Configuration conf = context.getConfiguration();
694      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
695      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
696
697      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
698      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
699      if (mappers == 0 && snapshotFiles.size() > 0) {
700        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
701        mappers = Math.min(mappers, snapshotFiles.size());
702        conf.setInt(CONF_NUM_SPLITS, mappers);
703        conf.setInt(MR_NUM_MAPS, mappers);
704      }
705
706      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
707      List<InputSplit> splits = new ArrayList(groups.size());
708      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
709        splits.add(new ExportSnapshotInputSplit(files));
710      }
711      return splits;
712    }
713
714    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
715      private List<Pair<BytesWritable, Long>> files;
716      private long length;
717
718      public ExportSnapshotInputSplit() {
719        this.files = null;
720      }
721
722      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
723        this.files = new ArrayList(snapshotFiles.size());
724        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
725          this.files.add(
726            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
727          this.length += fileInfo.getSecond();
728        }
729      }
730
731      private List<Pair<BytesWritable, Long>> getSplitKeys() {
732        return files;
733      }
734
735      @Override
736      public long getLength() throws IOException, InterruptedException {
737        return length;
738      }
739
740      @Override
741      public String[] getLocations() throws IOException, InterruptedException {
742        return new String[] {};
743      }
744
745      @Override
746      public void readFields(DataInput in) throws IOException {
747        int count = in.readInt();
748        files = new ArrayList<>(count);
749        length = 0;
750        for (int i = 0; i < count; ++i) {
751          BytesWritable fileInfo = new BytesWritable();
752          fileInfo.readFields(in);
753          long size = in.readLong();
754          files.add(new Pair<>(fileInfo, size));
755          length += size;
756        }
757      }
758
759      @Override
760      public void write(DataOutput out) throws IOException {
761        out.writeInt(files.size());
762        for (final Pair<BytesWritable, Long> fileInfo : files) {
763          fileInfo.getFirst().write(out);
764          out.writeLong(fileInfo.getSecond());
765        }
766      }
767    }
768
769    private static class ExportSnapshotRecordReader
770      extends RecordReader<BytesWritable, NullWritable> {
771      private final List<Pair<BytesWritable, Long>> files;
772      private long totalSize = 0;
773      private long procSize = 0;
774      private int index = -1;
775
776      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
777        this.files = files;
778        for (Pair<BytesWritable, Long> fileInfo : files) {
779          totalSize += fileInfo.getSecond();
780        }
781      }
782
783      @Override
784      public void close() {
785      }
786
787      @Override
788      public BytesWritable getCurrentKey() {
789        return files.get(index).getFirst();
790      }
791
792      @Override
793      public NullWritable getCurrentValue() {
794        return NullWritable.get();
795      }
796
797      @Override
798      public float getProgress() {
799        return (float) procSize / totalSize;
800      }
801
802      @Override
803      public void initialize(InputSplit split, TaskAttemptContext tac) {
804      }
805
806      @Override
807      public boolean nextKeyValue() {
808        if (index >= 0) {
809          procSize += files.get(index).getSecond();
810        }
811        return (++index < files.size());
812      }
813    }
814  }
815
816  // ==========================================================================
817  // Tool
818  // ==========================================================================
819
820  /**
821   * Run Map-Reduce Job to perform the files copy.
822   */
823  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
824    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
825    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
826    throws IOException, InterruptedException, ClassNotFoundException {
827    Configuration conf = getConf();
828    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
829    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
830    if (mappers > 0) {
831      conf.setInt(CONF_NUM_SPLITS, mappers);
832      conf.setInt(MR_NUM_MAPS, mappers);
833    }
834    conf.setInt(CONF_FILES_MODE, filesMode);
835    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
836    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
837    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
838    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
839    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
840    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
841
842    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
843    Job job = new Job(conf);
844    job.setJobName(jobname);
845    job.setJarByClass(ExportSnapshot.class);
846    TableMapReduceUtil.addDependencyJars(job);
847    job.setMapperClass(ExportMapper.class);
848    job.setInputFormatClass(ExportSnapshotInputFormat.class);
849    job.setOutputFormatClass(NullOutputFormat.class);
850    job.setMapSpeculativeExecution(false);
851    job.setNumReduceTasks(0);
852
853    // Acquire the delegation Tokens
854    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
855    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
856    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
857    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
858
859    // Run the MR Job
860    if (!job.waitForCompletion(true)) {
861      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
862    }
863  }
864
865  private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir,
866    final Path snapshotDir) throws IOException {
867    // Update the conf with the current root dir, since may be a different cluster
868    Configuration conf = new Configuration(baseConf);
869    CommonFSUtils.setRootDir(conf, rootDir);
870    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
871    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
872    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
873  }
874
875  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
876    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
877    ExecutorService pool = Executors
878      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
879    List<Future<Void>> futures = new ArrayList<>();
880    for (Path dstPath : traversedPath) {
881      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
882      futures.add(future);
883    }
884    try {
885      for (Future<Void> future : futures) {
886        future.get();
887      }
888    } catch (InterruptedException | ExecutionException e) {
889      throw new IOException(e);
890    } finally {
891      pool.shutdownNow();
892    }
893  }
894
895  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
896    Configuration conf, List<Path> traversedPath) throws IOException {
897    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
898      try {
899        fs.setOwner(path, filesUser, filesGroup);
900      } catch (IOException e) {
901        throw new RuntimeException(
902          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
903      }
904    }, conf);
905  }
906
907  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
908    final List<Path> traversedPath, final Configuration conf) throws IOException {
909    if (filesMode <= 0) {
910      return;
911    }
912    FsPermission perm = new FsPermission(filesMode);
913    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
914      try {
915        fs.setPermission(path, perm);
916      } catch (IOException e) {
917        throw new RuntimeException(
918          "set permission for file " + path + " to " + filesMode + " failed", e);
919      }
920    }, conf);
921  }
922
923  private boolean verifyTarget = true;
924  private boolean verifySource = true;
925  private boolean verifyChecksum = true;
926  private String snapshotName = null;
927  private String targetName = null;
928  private boolean overwrite = false;
929  private String filesGroup = null;
930  private String filesUser = null;
931  private Path outputRoot = null;
932  private Path inputRoot = null;
933  private int bandwidthMB = Integer.MAX_VALUE;
934  private int filesMode = 0;
935  private int mappers = 0;
936  private boolean resetTtl = false;
937
938  @Override
939  protected void processOptions(CommandLine cmd) {
940    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
941    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
942    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
943      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
944    }
945    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
946      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
947    }
948    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
949    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
950    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
951    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
952    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
953    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
954    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
955    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
956    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
957    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
958    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
959  }
960
961  /**
962   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
963   * @return 0 on success, and != 0 upon failure.
964   */
965  @Override
966  public int doWork() throws IOException {
967    Configuration conf = getConf();
968
969    // Check user options
970    if (snapshotName == null) {
971      System.err.println("Snapshot name not provided.");
972      LOG.error("Use -h or --help for usage instructions.");
973      return 0;
974    }
975
976    if (outputRoot == null) {
977      System.err
978        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
979      LOG.error("Use -h or --help for usage instructions.");
980      return 0;
981    }
982
983    if (targetName == null) {
984      targetName = snapshotName;
985    }
986    if (inputRoot == null) {
987      inputRoot = CommonFSUtils.getRootDir(conf);
988    } else {
989      CommonFSUtils.setRootDir(conf, inputRoot);
990    }
991
992    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
993    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
994    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
995    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
996    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
997    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
998    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
999      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1000    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1001    Path snapshotTmpDir =
1002      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1003    Path outputSnapshotDir =
1004      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1005    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1006    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1007    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1008      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1009
1010    // Verify snapshot source before copying files
1011    if (verifySource) {
1012      LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.",
1013        inputFs.getUri(), inputRoot, snapshotDir);
1014      verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir);
1015    }
1016
1017    // Find the necessary directory which need to change owner and group
1018    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1019    if (outputFs.exists(needSetOwnerDir)) {
1020      if (skipTmp) {
1021        needSetOwnerDir = outputSnapshotDir;
1022      } else {
1023        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1024        if (outputFs.exists(needSetOwnerDir)) {
1025          needSetOwnerDir = snapshotTmpDir;
1026        }
1027      }
1028    }
1029
1030    // Check if the snapshot already exists
1031    if (outputFs.exists(outputSnapshotDir)) {
1032      if (overwrite) {
1033        if (!outputFs.delete(outputSnapshotDir, true)) {
1034          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1035          return 1;
1036        }
1037      } else {
1038        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1039          + outputSnapshotDir);
1040        return 1;
1041      }
1042    }
1043
1044    if (!skipTmp) {
1045      // Check if the snapshot already in-progress
1046      if (outputFs.exists(snapshotTmpDir)) {
1047        if (overwrite) {
1048          if (!outputFs.delete(snapshotTmpDir, true)) {
1049            System.err
1050              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1051            return 1;
1052          }
1053        } else {
1054          System.err
1055            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1056          System.err
1057            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1058          System.err
1059            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1060          return 1;
1061        }
1062      }
1063    }
1064
1065    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1066    // The snapshot references must be copied before the hfiles otherwise the cleaner
1067    // will remove them because they are unreferenced.
1068    List<Path> travesedPaths = new ArrayList<>();
1069    boolean copySucceeded = false;
1070    try {
1071      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1072      travesedPaths =
1073        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1074          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1075      copySucceeded = true;
1076    } catch (IOException e) {
1077      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1078        + " to=" + initialOutputSnapshotDir, e);
1079    } finally {
1080      if (copySucceeded) {
1081        if (filesUser != null || filesGroup != null) {
1082          LOG.warn(
1083            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1084              + (filesGroup == null
1085                ? ""
1086                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1087          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1088        }
1089        if (filesMode > 0) {
1090          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1091          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1092        }
1093      }
1094    }
1095
1096    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1097    // reset TTL for target snapshot.
1098    if (!targetName.equals(snapshotName) || resetTtl) {
1099      SnapshotDescription.Builder snapshotDescBuilder =
1100        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1101      if (!targetName.equals(snapshotName)) {
1102        snapshotDescBuilder.setName(targetName);
1103      }
1104      if (resetTtl) {
1105        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1106      }
1107      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1108        initialOutputSnapshotDir, outputFs);
1109      if (filesUser != null || filesGroup != null) {
1110        outputFs.setOwner(
1111          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1112          filesGroup);
1113      }
1114      if (filesMode > 0) {
1115        outputFs.setPermission(
1116          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1117          new FsPermission((short) filesMode));
1118      }
1119    }
1120
1121    // Step 2 - Start MR Job to copy files
1122    // The snapshot references must be copied before the files otherwise the files gets removed
1123    // by the HFileArchiver, since they have no references.
1124    try {
1125      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1126        filesGroup, filesMode, mappers, bandwidthMB);
1127
1128      LOG.info("Finalize the Snapshot Export");
1129      if (!skipTmp) {
1130        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1131        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1132          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1133            + snapshotTmpDir + " to=" + outputSnapshotDir);
1134        }
1135      }
1136
1137      // Step 4 - Verify snapshot integrity
1138      if (verifyTarget) {
1139        LOG.info("Verify snapshot integrity");
1140        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1141      }
1142
1143      LOG.info("Export Completed: " + targetName);
1144      return 0;
1145    } catch (Exception e) {
1146      LOG.error("Snapshot export failed", e);
1147      if (!skipTmp) {
1148        outputFs.delete(snapshotTmpDir, true);
1149      }
1150      outputFs.delete(outputSnapshotDir, true);
1151      return 1;
1152    } finally {
1153      IOUtils.closeStream(inputFs);
1154      IOUtils.closeStream(outputFs);
1155    }
1156  }
1157
1158  @Override
1159  protected void printUsage() {
1160    super.printUsage();
1161    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1162      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1163      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1164      + "  hbase snapshot export \\\n"
1165      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1166      + "    --copy-to hdfs://srv1:50070/hbase");
1167  }
1168
1169  @Override
1170  protected void addOptions() {
1171    addRequiredOption(Options.SNAPSHOT);
1172    addOption(Options.COPY_TO);
1173    addOption(Options.COPY_FROM);
1174    addOption(Options.TARGET_NAME);
1175    addOption(Options.NO_CHECKSUM_VERIFY);
1176    addOption(Options.NO_TARGET_VERIFY);
1177    addOption(Options.NO_SOURCE_VERIFY);
1178    addOption(Options.OVERWRITE);
1179    addOption(Options.CHUSER);
1180    addOption(Options.CHGROUP);
1181    addOption(Options.CHMOD);
1182    addOption(Options.MAPPERS);
1183    addOption(Options.BANDWIDTH);
1184    addOption(Options.RESET_TTL);
1185  }
1186
1187  public static void main(String[] args) {
1188    new ExportSnapshot().doStaticMain(args);
1189  }
1190}