001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.function.BiConsumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileChecksum;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.io.FileLink;
049import org.apache.hadoop.hbase.io.HFileLink;
050import org.apache.hadoop.hbase.io.WALLink;
051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mob.MobUtils;
054import org.apache.hadoop.hbase.util.AbstractHBaseTool;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.io.BytesWritable;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
081
082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
085
086/**
087 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
088 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
089 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
090 */
091@InterfaceAudience.Public
092public class ExportSnapshot extends AbstractHBaseTool implements Tool {
093  public static final String NAME = "exportsnapshot";
094  /** Configuration prefix for overrides for the source filesystem */
095  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
096  /** Configuration prefix for overrides for the destination filesystem */
097  public static final String CONF_DEST_PREFIX = NAME + ".to.";
098
099  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
100
101  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
102  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
103  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
104  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
105  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
106  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
107  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
108  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
109  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
110  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
111  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
112  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
113  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
114  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
115  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
116  private static final String CONF_COPY_MANIFEST_THREADS =
117    "snapshot.export.copy.references.threads";
118  private static final int DEFAULT_COPY_MANIFEST_THREADS =
119    Runtime.getRuntime().availableProcessors();
120
121  static class Testing {
122    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
123    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
124    int failuresCountToInject = 0;
125    int injectedFailureCount = 0;
126  }
127
128  // Command line options and defaults.
129  static final class Options {
130    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
131    static final Option TARGET_NAME =
132      new Option(null, "target", true, "Target name for the snapshot.");
133    static final Option COPY_TO =
134      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
135    static final Option COPY_FROM =
136      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
137    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
138      "Do not verify checksum, use name+length only.");
139    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
140      "Do not verify the integrity of the exported snapshot.");
141    static final Option NO_SOURCE_VERIFY =
142      new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot.");
143    static final Option OVERWRITE =
144      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
145    static final Option CHUSER =
146      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
147    static final Option CHGROUP =
148      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
149    static final Option CHMOD =
150      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
151    static final Option MAPPERS = new Option(null, "mappers", true,
152      "Number of mappers to use during the copy (mapreduce.job.maps).");
153    static final Option BANDWIDTH =
154      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
155    static final Option RESET_TTL =
156      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
157  }
158
159  // Export Map-Reduce Counters, to keep track of the progress
160  public enum Counter {
161    MISSING_FILES,
162    FILES_COPIED,
163    FILES_SKIPPED,
164    COPY_FAILED,
165    BYTES_EXPECTED,
166    BYTES_SKIPPED,
167    BYTES_COPIED
168  }
169
170  private static class ExportMapper
171    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
172    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
173    final static int REPORT_SIZE = 1 * 1024 * 1024;
174    final static int BUFFER_SIZE = 64 * 1024;
175
176    private boolean verifyChecksum;
177    private String filesGroup;
178    private String filesUser;
179    private short filesMode;
180    private int bufferSize;
181
182    private FileSystem outputFs;
183    private Path outputArchive;
184    private Path outputRoot;
185
186    private FileSystem inputFs;
187    private Path inputArchive;
188    private Path inputRoot;
189
190    private static Testing testing = new Testing();
191
192    @Override
193    public void setup(Context context) throws IOException {
194      Configuration conf = context.getConfiguration();
195
196      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
197      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
198
199      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
200
201      filesGroup = conf.get(CONF_FILES_GROUP);
202      filesUser = conf.get(CONF_FILES_USER);
203      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
204      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
205      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
206
207      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
208      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
209
210      try {
211        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
212        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
213      } catch (IOException e) {
214        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
215      }
216
217      try {
218        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
219        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
220      } catch (IOException e) {
221        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
222      }
223
224      // Use the default block size of the outputFs if bigger
225      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
226      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
227      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
228
229      for (Counter c : Counter.values()) {
230        context.getCounter(c).increment(0);
231      }
232      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
233        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
234        // Get number of times we have already injected failure based on attempt number of this
235        // task.
236        testing.injectedFailureCount = context.getTaskAttemptID().getId();
237      }
238    }
239
240    @Override
241    protected void cleanup(Context context) {
242      IOUtils.closeStream(inputFs);
243      IOUtils.closeStream(outputFs);
244    }
245
246    @Override
247    public void map(BytesWritable key, NullWritable value, Context context)
248      throws InterruptedException, IOException {
249      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
250      Path outputPath = getOutputPath(inputInfo);
251
252      copyFile(context, inputInfo, outputPath);
253    }
254
255    /**
256     * Returns the location where the inputPath will be copied.
257     */
258    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
259      Path path = null;
260      switch (inputInfo.getType()) {
261        case HFILE:
262          Path inputPath = new Path(inputInfo.getHfile());
263          String family = inputPath.getParent().getName();
264          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
265          String region = HFileLink.getReferencedRegionName(inputPath.getName());
266          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
267          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
268            new Path(region, new Path(family, hfile)));
269          break;
270        case WAL:
271          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
272          break;
273        default:
274          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
275      }
276      return new Path(outputArchive, path);
277    }
278
279    @SuppressWarnings("checkstyle:linelength")
280    /**
281     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
282     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
283     */
284    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
285      throws IOException {
286      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
287      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
288      testing.injectedFailureCount++;
289      context.getCounter(Counter.COPY_FAILED).increment(1);
290      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
291      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
292        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
293    }
294
295    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
296      final Path outputPath) throws IOException {
297      // Get the file information
298      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
299
300      // Verify if the output file exists and is the same that we want to copy
301      if (outputFs.exists(outputPath)) {
302        FileStatus outputStat = outputFs.getFileStatus(outputPath);
303        if (outputStat != null && sameFile(inputStat, outputStat)) {
304          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
305          context.getCounter(Counter.FILES_SKIPPED).increment(1);
306          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
307          return;
308        }
309      }
310
311      InputStream in = openSourceFile(context, inputInfo);
312      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
313      if (Integer.MAX_VALUE != bandwidthMB) {
314        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
315      }
316
317      try {
318        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
319
320        // Ensure that the output folder is there and copy the file
321        createOutputPath(outputPath.getParent());
322        FSDataOutputStream out = outputFs.create(outputPath, true);
323        try {
324          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
325        } finally {
326          out.close();
327        }
328
329        // Try to Preserve attributes
330        if (!preserveAttributes(outputPath, inputStat)) {
331          LOG.warn("You may have to run manually chown on: " + outputPath);
332        }
333      } finally {
334        in.close();
335        injectTestFailure(context, inputInfo);
336      }
337    }
338
339    /**
340     * Create the output folder and optionally set ownership.
341     */
342    private void createOutputPath(final Path path) throws IOException {
343      if (filesUser == null && filesGroup == null) {
344        outputFs.mkdirs(path);
345      } else {
346        Path parent = path.getParent();
347        if (!outputFs.exists(parent) && !parent.isRoot()) {
348          createOutputPath(parent);
349        }
350        outputFs.mkdirs(path);
351        if (filesUser != null || filesGroup != null) {
352          // override the owner when non-null user/group is specified
353          outputFs.setOwner(path, filesUser, filesGroup);
354        }
355        if (filesMode > 0) {
356          outputFs.setPermission(path, new FsPermission(filesMode));
357        }
358      }
359    }
360
361    /**
362     * Try to Preserve the files attribute selected by the user copying them from the source file
363     * This is only required when you are exporting as a different user than "hbase" or on a system
364     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
365     * can force a chmod with the user that knows is available on the system.
366     */
367    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
368      FileStatus stat;
369      try {
370        stat = outputFs.getFileStatus(path);
371      } catch (IOException e) {
372        LOG.warn("Unable to get the status for file=" + path);
373        return false;
374      }
375
376      try {
377        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
378          outputFs.setPermission(path, new FsPermission(filesMode));
379        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
380          outputFs.setPermission(path, refStat.getPermission());
381        }
382      } catch (IOException e) {
383        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
384        return false;
385      }
386
387      boolean hasRefStat = (refStat != null);
388      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
389      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
390      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
391        try {
392          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
393            outputFs.setOwner(path, user, group);
394          }
395        } catch (IOException e) {
396          LOG.warn(
397            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
398          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
399            + " group=" + group);
400          return false;
401        }
402      }
403
404      return true;
405    }
406
407    private boolean stringIsNotEmpty(final String str) {
408      return str != null && str.length() > 0;
409    }
410
411    private void copyData(final Context context, final Path inputPath, final InputStream in,
412      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
413      throws IOException {
414      final String statusMessage =
415        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
416
417      try {
418        byte[] buffer = new byte[bufferSize];
419        long totalBytesWritten = 0;
420        int reportBytes = 0;
421        int bytesRead;
422
423        long stime = EnvironmentEdgeManager.currentTime();
424        while ((bytesRead = in.read(buffer)) > 0) {
425          out.write(buffer, 0, bytesRead);
426          totalBytesWritten += bytesRead;
427          reportBytes += bytesRead;
428
429          if (reportBytes >= REPORT_SIZE) {
430            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
431            context.setStatus(
432              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
433                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
434                + " to " + outputPath);
435            reportBytes = 0;
436          }
437        }
438        long etime = EnvironmentEdgeManager.currentTime();
439
440        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
441        context
442          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
443            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
444            + outputPath);
445
446        // Verify that the written size match
447        if (totalBytesWritten != inputFileSize) {
448          String msg = "number of bytes copied not matching copied=" + totalBytesWritten
449            + " expected=" + inputFileSize + " for file=" + inputPath;
450          throw new IOException(msg);
451        }
452
453        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
454        LOG
455          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
456            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
457              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
458        context.getCounter(Counter.FILES_COPIED).increment(1);
459      } catch (IOException e) {
460        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
461        context.getCounter(Counter.COPY_FAILED).increment(1);
462        throw e;
463      }
464    }
465
466    /**
467     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
468     * fail or if the file is not found.
469     */
470    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
471      throws IOException {
472      try {
473        Configuration conf = context.getConfiguration();
474        FileLink link = null;
475        switch (fileInfo.getType()) {
476          case HFILE:
477            Path inputPath = new Path(fileInfo.getHfile());
478            link = getFileLink(inputPath, conf);
479            break;
480          case WAL:
481            String serverName = fileInfo.getWalServer();
482            String logName = fileInfo.getWalName();
483            link = new WALLink(inputRoot, serverName, logName);
484            break;
485          default:
486            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
487        }
488        return link.open(inputFs);
489      } catch (IOException e) {
490        context.getCounter(Counter.MISSING_FILES).increment(1);
491        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
492        throw e;
493      }
494    }
495
496    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
497      throws IOException {
498      try {
499        Configuration conf = context.getConfiguration();
500        FileLink link = null;
501        switch (fileInfo.getType()) {
502          case HFILE:
503            Path inputPath = new Path(fileInfo.getHfile());
504            link = getFileLink(inputPath, conf);
505            break;
506          case WAL:
507            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
508            break;
509          default:
510            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
511        }
512        return link.getFileStatus(inputFs);
513      } catch (FileNotFoundException e) {
514        context.getCounter(Counter.MISSING_FILES).increment(1);
515        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
516        throw e;
517      } catch (IOException e) {
518        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
519        throw e;
520      }
521    }
522
523    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
524      String regionName = HFileLink.getReferencedRegionName(path.getName());
525      TableName tableName = HFileLink.getReferencedTableName(path.getName());
526      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
527        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
528          HFileArchiveUtil.getArchivePath(conf), path);
529      }
530      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
531    }
532
533    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
534      try {
535        return fs.getFileChecksum(path);
536      } catch (IOException e) {
537        LOG.warn("Unable to get checksum for file=" + path, e);
538        return null;
539      }
540    }
541
542    /**
543     * Check if the two files are equal by looking at the file length, and at the checksum (if user
544     * has specified the verifyChecksum flag).
545     */
546    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
547      // Not matching length
548      if (inputStat.getLen() != outputStat.getLen()) return false;
549
550      // Mark files as equals, since user asked for no checksum verification
551      if (!verifyChecksum) return true;
552
553      // If checksums are not available, files are not the same.
554      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
555      if (inChecksum == null) return false;
556
557      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
558      if (outChecksum == null) return false;
559
560      return inChecksum.equals(outChecksum);
561    }
562  }
563
564  // ==========================================================================
565  // Input Format
566  // ==========================================================================
567
568  /**
569   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
570   * @return list of files referenced by the snapshot (pair of path and size)
571   */
572  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
573    final FileSystem fs, final Path snapshotDir) throws IOException {
574    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
575
576    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
577    final TableName table = TableName.valueOf(snapshotDesc.getTable());
578
579    // Get snapshot files
580    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
581    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
582      new SnapshotReferenceUtil.SnapshotVisitor() {
583        @Override
584        public void storeFile(final RegionInfo regionInfo, final String family,
585          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
586          // for storeFile.hasReference() case, copied as part of the manifest
587          if (!storeFile.hasReference()) {
588            String region = regionInfo.getEncodedName();
589            String hfile = storeFile.getName();
590            Path path = HFileLink.createPath(table, region, family, hfile);
591
592            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
593              .setType(SnapshotFileInfo.Type.HFILE).setHfile(path.toString()).build();
594
595            long size;
596            if (storeFile.hasFileSize()) {
597              size = storeFile.getFileSize();
598            } else {
599              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
600            }
601            files.add(new Pair<>(fileInfo, size));
602          }
603        }
604      });
605
606    return files;
607  }
608
609  /**
610   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
611   * The groups created will have similar amounts of bytes.
612   * <p>
613   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
614   * group fetch the bigger file available, iterating through groups alternating the direction.
615   */
616  static List<List<Pair<SnapshotFileInfo, Long>>>
617    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
618    // Sort files by size, from small to big
619    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
620      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
621        long r = a.getSecond() - b.getSecond();
622        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
623      }
624    });
625
626    // create balanced groups
627    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
628    long[] sizeGroups = new long[ngroups];
629    int hi = files.size() - 1;
630    int lo = 0;
631
632    List<Pair<SnapshotFileInfo, Long>> group;
633    int dir = 1;
634    int g = 0;
635
636    while (hi >= lo) {
637      if (g == fileGroups.size()) {
638        group = new LinkedList<>();
639        fileGroups.add(group);
640      } else {
641        group = fileGroups.get(g);
642      }
643
644      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
645
646      // add the hi one
647      sizeGroups[g] += fileInfo.getSecond();
648      group.add(fileInfo);
649
650      // change direction when at the end or the beginning
651      g += dir;
652      if (g == ngroups) {
653        dir = -1;
654        g = ngroups - 1;
655      } else if (g < 0) {
656        dir = 1;
657        g = 0;
658      }
659    }
660
661    if (LOG.isDebugEnabled()) {
662      for (int i = 0; i < sizeGroups.length; ++i) {
663        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
664      }
665    }
666
667    return fileGroups;
668  }
669
670  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
671    @Override
672    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
673      TaskAttemptContext tac) throws IOException, InterruptedException {
674      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
675    }
676
677    @Override
678    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
679      Configuration conf = context.getConfiguration();
680      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
681      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
682
683      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
684      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
685      if (mappers == 0 && snapshotFiles.size() > 0) {
686        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
687        mappers = Math.min(mappers, snapshotFiles.size());
688        conf.setInt(CONF_NUM_SPLITS, mappers);
689        conf.setInt(MR_NUM_MAPS, mappers);
690      }
691
692      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
693      List<InputSplit> splits = new ArrayList(groups.size());
694      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
695        splits.add(new ExportSnapshotInputSplit(files));
696      }
697      return splits;
698    }
699
700    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
701      private List<Pair<BytesWritable, Long>> files;
702      private long length;
703
704      public ExportSnapshotInputSplit() {
705        this.files = null;
706      }
707
708      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
709        this.files = new ArrayList(snapshotFiles.size());
710        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
711          this.files.add(
712            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
713          this.length += fileInfo.getSecond();
714        }
715      }
716
717      private List<Pair<BytesWritable, Long>> getSplitKeys() {
718        return files;
719      }
720
721      @Override
722      public long getLength() throws IOException, InterruptedException {
723        return length;
724      }
725
726      @Override
727      public String[] getLocations() throws IOException, InterruptedException {
728        return new String[] {};
729      }
730
731      @Override
732      public void readFields(DataInput in) throws IOException {
733        int count = in.readInt();
734        files = new ArrayList<>(count);
735        length = 0;
736        for (int i = 0; i < count; ++i) {
737          BytesWritable fileInfo = new BytesWritable();
738          fileInfo.readFields(in);
739          long size = in.readLong();
740          files.add(new Pair<>(fileInfo, size));
741          length += size;
742        }
743      }
744
745      @Override
746      public void write(DataOutput out) throws IOException {
747        out.writeInt(files.size());
748        for (final Pair<BytesWritable, Long> fileInfo : files) {
749          fileInfo.getFirst().write(out);
750          out.writeLong(fileInfo.getSecond());
751        }
752      }
753    }
754
755    private static class ExportSnapshotRecordReader
756      extends RecordReader<BytesWritable, NullWritable> {
757      private final List<Pair<BytesWritable, Long>> files;
758      private long totalSize = 0;
759      private long procSize = 0;
760      private int index = -1;
761
762      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
763        this.files = files;
764        for (Pair<BytesWritable, Long> fileInfo : files) {
765          totalSize += fileInfo.getSecond();
766        }
767      }
768
769      @Override
770      public void close() {
771      }
772
773      @Override
774      public BytesWritable getCurrentKey() {
775        return files.get(index).getFirst();
776      }
777
778      @Override
779      public NullWritable getCurrentValue() {
780        return NullWritable.get();
781      }
782
783      @Override
784      public float getProgress() {
785        return (float) procSize / totalSize;
786      }
787
788      @Override
789      public void initialize(InputSplit split, TaskAttemptContext tac) {
790      }
791
792      @Override
793      public boolean nextKeyValue() {
794        if (index >= 0) {
795          procSize += files.get(index).getSecond();
796        }
797        return (++index < files.size());
798      }
799    }
800  }
801
802  // ==========================================================================
803  // Tool
804  // ==========================================================================
805
806  /**
807   * Run Map-Reduce Job to perform the files copy.
808   */
809  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
810    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
811    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
812    throws IOException, InterruptedException, ClassNotFoundException {
813    Configuration conf = getConf();
814    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
815    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
816    if (mappers > 0) {
817      conf.setInt(CONF_NUM_SPLITS, mappers);
818      conf.setInt(MR_NUM_MAPS, mappers);
819    }
820    conf.setInt(CONF_FILES_MODE, filesMode);
821    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
822    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
823    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
824    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
825    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
826    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
827
828    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
829    Job job = new Job(conf);
830    job.setJobName(jobname);
831    job.setJarByClass(ExportSnapshot.class);
832    TableMapReduceUtil.addDependencyJars(job);
833    job.setMapperClass(ExportMapper.class);
834    job.setInputFormatClass(ExportSnapshotInputFormat.class);
835    job.setOutputFormatClass(NullOutputFormat.class);
836    job.setMapSpeculativeExecution(false);
837    job.setNumReduceTasks(0);
838
839    // Acquire the delegation Tokens
840    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
841    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
842    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
843    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
844
845    // Run the MR Job
846    if (!job.waitForCompletion(true)) {
847      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
848    }
849  }
850
851  private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir,
852    final Path snapshotDir) throws IOException {
853    // Update the conf with the current root dir, since may be a different cluster
854    Configuration conf = new Configuration(baseConf);
855    CommonFSUtils.setRootDir(conf, rootDir);
856    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
857    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
858    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
859  }
860
861  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
862    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
863    ExecutorService pool = Executors
864      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
865    List<Future<Void>> futures = new ArrayList<>();
866    for (Path dstPath : traversedPath) {
867      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
868      futures.add(future);
869    }
870    try {
871      for (Future<Void> future : futures) {
872        future.get();
873      }
874    } catch (InterruptedException | ExecutionException e) {
875      throw new IOException(e);
876    } finally {
877      pool.shutdownNow();
878    }
879  }
880
881  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
882    Configuration conf, List<Path> traversedPath) throws IOException {
883    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
884      try {
885        fs.setOwner(path, filesUser, filesGroup);
886      } catch (IOException e) {
887        throw new RuntimeException(
888          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
889      }
890    }, conf);
891  }
892
893  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
894    final List<Path> traversedPath, final Configuration conf) throws IOException {
895    if (filesMode <= 0) {
896      return;
897    }
898    FsPermission perm = new FsPermission(filesMode);
899    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
900      try {
901        fs.setPermission(path, perm);
902      } catch (IOException e) {
903        throw new RuntimeException(
904          "set permission for file " + path + " to " + filesMode + " failed", e);
905      }
906    }, conf);
907  }
908
909  private boolean verifyTarget = true;
910  private boolean verifySource = true;
911  private boolean verifyChecksum = true;
912  private String snapshotName = null;
913  private String targetName = null;
914  private boolean overwrite = false;
915  private String filesGroup = null;
916  private String filesUser = null;
917  private Path outputRoot = null;
918  private Path inputRoot = null;
919  private int bandwidthMB = Integer.MAX_VALUE;
920  private int filesMode = 0;
921  private int mappers = 0;
922  private boolean resetTtl = false;
923
924  @Override
925  protected void processOptions(CommandLine cmd) {
926    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
927    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
928    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
929      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
930    }
931    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
932      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
933    }
934    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
935    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
936    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
937    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
938    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
939    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
940    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
941    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
942    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
943    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
944    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
945  }
946
947  /**
948   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
949   * @return 0 on success, and != 0 upon failure.
950   */
951  @Override
952  public int doWork() throws IOException {
953    Configuration conf = getConf();
954
955    // Check user options
956    if (snapshotName == null) {
957      System.err.println("Snapshot name not provided.");
958      LOG.error("Use -h or --help for usage instructions.");
959      return 0;
960    }
961
962    if (outputRoot == null) {
963      System.err
964        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
965      LOG.error("Use -h or --help for usage instructions.");
966      return 0;
967    }
968
969    if (targetName == null) {
970      targetName = snapshotName;
971    }
972    if (inputRoot == null) {
973      inputRoot = CommonFSUtils.getRootDir(conf);
974    } else {
975      CommonFSUtils.setRootDir(conf, inputRoot);
976    }
977
978    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
979    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
980    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
981    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
982    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
983    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
984    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
985      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
986    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
987    Path snapshotTmpDir =
988      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
989    Path outputSnapshotDir =
990      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
991    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
992    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
993    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
994      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
995
996    // Verify snapshot source before copying files
997    if (verifySource) {
998      LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.",
999        inputFs.getUri(), inputRoot, snapshotDir);
1000      verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir);
1001    }
1002
1003    // Find the necessary directory which need to change owner and group
1004    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1005    if (outputFs.exists(needSetOwnerDir)) {
1006      if (skipTmp) {
1007        needSetOwnerDir = outputSnapshotDir;
1008      } else {
1009        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1010        if (outputFs.exists(needSetOwnerDir)) {
1011          needSetOwnerDir = snapshotTmpDir;
1012        }
1013      }
1014    }
1015
1016    // Check if the snapshot already exists
1017    if (outputFs.exists(outputSnapshotDir)) {
1018      if (overwrite) {
1019        if (!outputFs.delete(outputSnapshotDir, true)) {
1020          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1021          return 1;
1022        }
1023      } else {
1024        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1025          + outputSnapshotDir);
1026        return 1;
1027      }
1028    }
1029
1030    if (!skipTmp) {
1031      // Check if the snapshot already in-progress
1032      if (outputFs.exists(snapshotTmpDir)) {
1033        if (overwrite) {
1034          if (!outputFs.delete(snapshotTmpDir, true)) {
1035            System.err
1036              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1037            return 1;
1038          }
1039        } else {
1040          System.err
1041            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1042          System.err
1043            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1044          System.err
1045            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1046          return 1;
1047        }
1048      }
1049    }
1050
1051    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1052    // The snapshot references must be copied before the hfiles otherwise the cleaner
1053    // will remove them because they are unreferenced.
1054    List<Path> travesedPaths = new ArrayList<>();
1055    boolean copySucceeded = false;
1056    try {
1057      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1058      travesedPaths =
1059        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1060          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1061      copySucceeded = true;
1062    } catch (IOException e) {
1063      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1064        + " to=" + initialOutputSnapshotDir, e);
1065    } finally {
1066      if (copySucceeded) {
1067        if (filesUser != null || filesGroup != null) {
1068          LOG.warn(
1069            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1070              + (filesGroup == null
1071                ? ""
1072                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1073          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1074        }
1075        if (filesMode > 0) {
1076          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1077          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1078        }
1079      }
1080    }
1081
1082    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1083    // reset TTL for target snapshot.
1084    if (!targetName.equals(snapshotName) || resetTtl) {
1085      SnapshotDescription.Builder snapshotDescBuilder =
1086        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1087      if (!targetName.equals(snapshotName)) {
1088        snapshotDescBuilder.setName(targetName);
1089      }
1090      if (resetTtl) {
1091        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1092      }
1093      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1094        initialOutputSnapshotDir, outputFs);
1095      if (filesUser != null || filesGroup != null) {
1096        outputFs.setOwner(
1097          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1098          filesGroup);
1099      }
1100      if (filesMode > 0) {
1101        outputFs.setPermission(
1102          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1103          new FsPermission((short) filesMode));
1104      }
1105    }
1106
1107    // Step 2 - Start MR Job to copy files
1108    // The snapshot references must be copied before the files otherwise the files gets removed
1109    // by the HFileArchiver, since they have no references.
1110    try {
1111      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1112        filesGroup, filesMode, mappers, bandwidthMB);
1113
1114      LOG.info("Finalize the Snapshot Export");
1115      if (!skipTmp) {
1116        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1117        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1118          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1119            + snapshotTmpDir + " to=" + outputSnapshotDir);
1120        }
1121      }
1122
1123      // Step 4 - Verify snapshot integrity
1124      if (verifyTarget) {
1125        LOG.info("Verify snapshot integrity");
1126        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1127      }
1128
1129      LOG.info("Export Completed: " + targetName);
1130      return 0;
1131    } catch (Exception e) {
1132      LOG.error("Snapshot export failed", e);
1133      if (!skipTmp) {
1134        outputFs.delete(snapshotTmpDir, true);
1135      }
1136      outputFs.delete(outputSnapshotDir, true);
1137      return 1;
1138    } finally {
1139      IOUtils.closeStream(inputFs);
1140      IOUtils.closeStream(outputFs);
1141    }
1142  }
1143
1144  @Override
1145  protected void printUsage() {
1146    super.printUsage();
1147    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1148      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1149      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1150      + "  hbase snapshot export \\\n"
1151      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1152      + "    --copy-to hdfs://srv1:50070/hbase");
1153  }
1154
1155  @Override
1156  protected void addOptions() {
1157    addRequiredOption(Options.SNAPSHOT);
1158    addOption(Options.COPY_TO);
1159    addOption(Options.COPY_FROM);
1160    addOption(Options.TARGET_NAME);
1161    addOption(Options.NO_CHECKSUM_VERIFY);
1162    addOption(Options.NO_TARGET_VERIFY);
1163    addOption(Options.NO_SOURCE_VERIFY);
1164    addOption(Options.OVERWRITE);
1165    addOption(Options.CHUSER);
1166    addOption(Options.CHGROUP);
1167    addOption(Options.CHMOD);
1168    addOption(Options.MAPPERS);
1169    addOption(Options.BANDWIDTH);
1170    addOption(Options.RESET_TTL);
1171  }
1172
1173  public static void main(String[] args) {
1174    new ExportSnapshot().doStaticMain(args);
1175  }
1176}