001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.function.BiConsumer;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.fs.FileChecksum;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.io.FileLink;
049import org.apache.hadoop.hbase.io.HFileLink;
050import org.apache.hadoop.hbase.io.WALLink;
051import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mob.MobUtils;
054import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
081
082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
085
086/**
087 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
088 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
089 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
090 */
091@InterfaceAudience.Public
092public class ExportSnapshot extends AbstractHBaseTool implements Tool {
093  public static final String NAME = "exportsnapshot";
094  /** Configuration prefix for overrides for the source filesystem */
095  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
096  /** Configuration prefix for overrides for the destination filesystem */
097  public static final String CONF_DEST_PREFIX = NAME + ".to.";
098
099  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
100
101  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
102  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
103  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
104  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
105  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
106  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
107  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
108  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
109  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
110  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
111  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
112  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
113  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
114  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
115  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
116  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
117  private static final String CONF_COPY_MANIFEST_THREADS =
118    "snapshot.export.copy.references.threads";
119  private static final int DEFAULT_COPY_MANIFEST_THREADS =
120    Runtime.getRuntime().availableProcessors();
121
122  static class Testing {
123    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
124    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
125    int failuresCountToInject = 0;
126    int injectedFailureCount = 0;
127  }
128
129  // Command line options and defaults.
130  static final class Options {
131    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
132    static final Option TARGET_NAME =
133      new Option(null, "target", true, "Target name for the snapshot.");
134    static final Option COPY_TO =
135      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
136    static final Option COPY_FROM =
137      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
138    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
139      "Do not verify checksum, use name+length only.");
140    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
141      "Do not verify the integrity of the exported snapshot.");
142    static final Option NO_SOURCE_VERIFY =
143      new Option(null, "no-source-verify", false, "Do not verify the source of the snapshot.");
144    static final Option OVERWRITE =
145      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
146    static final Option CHUSER =
147      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
148    static final Option CHGROUP =
149      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
150    static final Option CHMOD =
151      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
152    static final Option MAPPERS = new Option(null, "mappers", true,
153      "Number of mappers to use during the copy (mapreduce.job.maps).");
154    static final Option BANDWIDTH =
155      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
156    static final Option RESET_TTL =
157      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
158  }
159
160  // Export Map-Reduce Counters, to keep track of the progress
161  public enum Counter {
162    MISSING_FILES,
163    FILES_COPIED,
164    FILES_SKIPPED,
165    COPY_FAILED,
166    BYTES_EXPECTED,
167    BYTES_SKIPPED,
168    BYTES_COPIED
169  }
170
171  private static class ExportMapper
172    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
173    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
174    final static int REPORT_SIZE = 1 * 1024 * 1024;
175    final static int BUFFER_SIZE = 64 * 1024;
176
177    private boolean verifyChecksum;
178    private String filesGroup;
179    private String filesUser;
180    private short filesMode;
181    private int bufferSize;
182    private int reportSize;
183
184    private FileSystem outputFs;
185    private Path outputArchive;
186    private Path outputRoot;
187
188    private FileSystem inputFs;
189    private Path inputArchive;
190    private Path inputRoot;
191
192    private static Testing testing = new Testing();
193
194    @Override
195    public void setup(Context context) throws IOException {
196      Configuration conf = context.getConfiguration();
197
198      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
199      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
200
201      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
202
203      filesGroup = conf.get(CONF_FILES_GROUP);
204      filesUser = conf.get(CONF_FILES_USER);
205      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
206      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
207      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
208
209      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
210      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
211
212      try {
213        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
214      } catch (IOException e) {
215        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
216      }
217
218      try {
219        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
220      } catch (IOException e) {
221        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
222      }
223
224      // Use the default block size of the outputFs if bigger
225      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
226      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
227      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
228      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
229
230      for (Counter c : Counter.values()) {
231        context.getCounter(c).increment(0);
232      }
233      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
234        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
235        // Get number of times we have already injected failure based on attempt number of this
236        // task.
237        testing.injectedFailureCount = context.getTaskAttemptID().getId();
238      }
239    }
240
241    @Override
242    public void map(BytesWritable key, NullWritable value, Context context)
243      throws InterruptedException, IOException {
244      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
245      Path outputPath = getOutputPath(inputInfo);
246
247      copyFile(context, inputInfo, outputPath);
248    }
249
250    /**
251     * Returns the location where the inputPath will be copied.
252     */
253    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
254      Path path = null;
255      switch (inputInfo.getType()) {
256        case HFILE:
257          Path inputPath = new Path(inputInfo.getHfile());
258          String family = inputPath.getParent().getName();
259          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
260          String region = HFileLink.getReferencedRegionName(inputPath.getName());
261          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
262          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
263            new Path(region, new Path(family, hfile)));
264          break;
265        case WAL:
266          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
267          break;
268        default:
269          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
270      }
271      return new Path(outputArchive, path);
272    }
273
274    @SuppressWarnings("checkstyle:linelength")
275    /**
276     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
277     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
278     */
279    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
280      throws IOException {
281      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
282      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
283      testing.injectedFailureCount++;
284      context.getCounter(Counter.COPY_FAILED).increment(1);
285      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
286      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
287        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
288    }
289
290    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
291      final Path outputPath) throws IOException {
292      // Get the file information
293      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
294
295      // Verify if the output file exists and is the same that we want to copy
296      if (outputFs.exists(outputPath)) {
297        FileStatus outputStat = outputFs.getFileStatus(outputPath);
298        if (outputStat != null && sameFile(inputStat, outputStat)) {
299          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
300          context.getCounter(Counter.FILES_SKIPPED).increment(1);
301          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
302          return;
303        }
304      }
305
306      InputStream in = openSourceFile(context, inputInfo);
307      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
308      if (Integer.MAX_VALUE != bandwidthMB) {
309        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
310      }
311
312      try {
313        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
314
315        // Ensure that the output folder is there and copy the file
316        createOutputPath(outputPath.getParent());
317        FSDataOutputStream out = outputFs.create(outputPath, true);
318        try {
319          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
320        } finally {
321          out.close();
322        }
323
324        // Try to Preserve attributes
325        if (!preserveAttributes(outputPath, inputStat)) {
326          LOG.warn("You may have to run manually chown on: " + outputPath);
327        }
328      } finally {
329        in.close();
330        injectTestFailure(context, inputInfo);
331      }
332    }
333
334    /**
335     * Create the output folder and optionally set ownership.
336     */
337    private void createOutputPath(final Path path) throws IOException {
338      if (filesUser == null && filesGroup == null) {
339        outputFs.mkdirs(path);
340      } else {
341        Path parent = path.getParent();
342        if (!outputFs.exists(parent) && !parent.isRoot()) {
343          createOutputPath(parent);
344        }
345        outputFs.mkdirs(path);
346        if (filesUser != null || filesGroup != null) {
347          // override the owner when non-null user/group is specified
348          outputFs.setOwner(path, filesUser, filesGroup);
349        }
350        if (filesMode > 0) {
351          outputFs.setPermission(path, new FsPermission(filesMode));
352        }
353      }
354    }
355
356    /**
357     * Try to Preserve the files attribute selected by the user copying them from the source file
358     * This is only required when you are exporting as a different user than "hbase" or on a system
359     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
360     * can force a chmod with the user that knows is available on the system.
361     */
362    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
363      FileStatus stat;
364      try {
365        stat = outputFs.getFileStatus(path);
366      } catch (IOException e) {
367        LOG.warn("Unable to get the status for file=" + path);
368        return false;
369      }
370
371      try {
372        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
373          outputFs.setPermission(path, new FsPermission(filesMode));
374        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
375          outputFs.setPermission(path, refStat.getPermission());
376        }
377      } catch (IOException e) {
378        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
379        return false;
380      }
381
382      boolean hasRefStat = (refStat != null);
383      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
384      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
385      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
386        try {
387          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
388            outputFs.setOwner(path, user, group);
389          }
390        } catch (IOException e) {
391          LOG.warn(
392            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
393          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
394            + " group=" + group);
395          return false;
396        }
397      }
398
399      return true;
400    }
401
402    private boolean stringIsNotEmpty(final String str) {
403      return str != null && str.length() > 0;
404    }
405
406    private void copyData(final Context context, final Path inputPath, final InputStream in,
407      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
408      throws IOException {
409      final String statusMessage =
410        "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + " (%.1f%%)";
411
412      try {
413        byte[] buffer = new byte[bufferSize];
414        long totalBytesWritten = 0;
415        int reportBytes = 0;
416        int bytesRead;
417
418        long stime = EnvironmentEdgeManager.currentTime();
419        while ((bytesRead = in.read(buffer)) > 0) {
420          out.write(buffer, 0, bytesRead);
421          totalBytesWritten += bytesRead;
422          reportBytes += bytesRead;
423
424          if (reportBytes >= reportSize) {
425            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
426            context.setStatus(
427              String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
428                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
429                + " to " + outputPath);
430            reportBytes = 0;
431          }
432        }
433        long etime = EnvironmentEdgeManager.currentTime();
434
435        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
436        context
437          .setStatus(String.format(statusMessage, StringUtils.humanReadableInt(totalBytesWritten),
438            (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
439            + outputPath);
440
441        // Verify that the written size match
442        if (totalBytesWritten != inputFileSize) {
443          String msg = "number of bytes copied not matching copied=" + totalBytesWritten
444            + " expected=" + inputFileSize + " for file=" + inputPath;
445          throw new IOException(msg);
446        }
447
448        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
449        LOG
450          .info("size=" + totalBytesWritten + " (" + StringUtils.humanReadableInt(totalBytesWritten)
451            + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String
452              .format(" %.3fM/sec", (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
453        context.getCounter(Counter.FILES_COPIED).increment(1);
454      } catch (IOException e) {
455        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
456        context.getCounter(Counter.COPY_FAILED).increment(1);
457        throw e;
458      }
459    }
460
461    /**
462     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
463     * fail or if the file is not found.
464     */
465    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
466      throws IOException {
467      try {
468        Configuration conf = context.getConfiguration();
469        FileLink link = null;
470        switch (fileInfo.getType()) {
471          case HFILE:
472            Path inputPath = new Path(fileInfo.getHfile());
473            link = getFileLink(inputPath, conf);
474            break;
475          case WAL:
476            String serverName = fileInfo.getWalServer();
477            String logName = fileInfo.getWalName();
478            link = new WALLink(inputRoot, serverName, logName);
479            break;
480          default:
481            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
482        }
483        return link.open(inputFs);
484      } catch (IOException e) {
485        context.getCounter(Counter.MISSING_FILES).increment(1);
486        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
487        throw e;
488      }
489    }
490
491    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
492      throws IOException {
493      try {
494        Configuration conf = context.getConfiguration();
495        FileLink link = null;
496        switch (fileInfo.getType()) {
497          case HFILE:
498            Path inputPath = new Path(fileInfo.getHfile());
499            link = getFileLink(inputPath, conf);
500            break;
501          case WAL:
502            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
503            break;
504          default:
505            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
506        }
507        return link.getFileStatus(inputFs);
508      } catch (FileNotFoundException e) {
509        context.getCounter(Counter.MISSING_FILES).increment(1);
510        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
511        throw e;
512      } catch (IOException e) {
513        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
514        throw e;
515      }
516    }
517
518    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
519      String regionName = HFileLink.getReferencedRegionName(path.getName());
520      TableName tableName = HFileLink.getReferencedTableName(path.getName());
521      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
522        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
523          HFileArchiveUtil.getArchivePath(conf), path);
524      }
525      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
526    }
527
528    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
529      try {
530        return fs.getFileChecksum(path);
531      } catch (IOException e) {
532        LOG.warn("Unable to get checksum for file=" + path, e);
533        return null;
534      }
535    }
536
537    /**
538     * Check if the two files are equal by looking at the file length, and at the checksum (if user
539     * has specified the verifyChecksum flag).
540     */
541    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
542      // Not matching length
543      if (inputStat.getLen() != outputStat.getLen()) return false;
544
545      // Mark files as equals, since user asked for no checksum verification
546      if (!verifyChecksum) return true;
547
548      // If checksums are not available, files are not the same.
549      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
550      if (inChecksum == null) return false;
551
552      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
553      if (outChecksum == null) return false;
554
555      return inChecksum.equals(outChecksum);
556    }
557  }
558
559  // ==========================================================================
560  // Input Format
561  // ==========================================================================
562
563  /**
564   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
565   * @return list of files referenced by the snapshot (pair of path and size)
566   */
567  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
568    final FileSystem fs, final Path snapshotDir) throws IOException {
569    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
570
571    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
572    final TableName table = TableName.valueOf(snapshotDesc.getTable());
573
574    // Get snapshot files
575    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
576    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
577      new SnapshotReferenceUtil.SnapshotVisitor() {
578        @Override
579        public void storeFile(final RegionInfo regionInfo, final String family,
580          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
581          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
582          if (!storeFile.hasReference()) {
583            String region = regionInfo.getEncodedName();
584            String hfile = storeFile.getName();
585            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
586              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
587          } else {
588            Pair<String, String> referredToRegionAndFile =
589              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
590            String referencedRegion = referredToRegionAndFile.getFirst();
591            String referencedHFile = referredToRegionAndFile.getSecond();
592            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
593              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
594          }
595          files.add(snapshotFileAndSize);
596        }
597      });
598
599    return files;
600  }
601
602  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
603    Configuration conf, TableName table, String region, String family, String hfile, long size)
604    throws IOException {
605    Path path = HFileLink.createPath(table, region, family, hfile);
606    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
607      .setHfile(path.toString()).build();
608    if (size == -1) {
609      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
610    }
611    return new Pair<>(fileInfo, size);
612  }
613
614  /**
615   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
616   * The groups created will have similar amounts of bytes.
617   * <p>
618   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
619   * group fetch the bigger file available, iterating through groups alternating the direction.
620   */
621  static List<List<Pair<SnapshotFileInfo, Long>>>
622    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
623    // Sort files by size, from small to big
624    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
625      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
626        long r = a.getSecond() - b.getSecond();
627        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
628      }
629    });
630
631    // create balanced groups
632    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
633    long[] sizeGroups = new long[ngroups];
634    int hi = files.size() - 1;
635    int lo = 0;
636
637    List<Pair<SnapshotFileInfo, Long>> group;
638    int dir = 1;
639    int g = 0;
640
641    while (hi >= lo) {
642      if (g == fileGroups.size()) {
643        group = new LinkedList<>();
644        fileGroups.add(group);
645      } else {
646        group = fileGroups.get(g);
647      }
648
649      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
650
651      // add the hi one
652      sizeGroups[g] += fileInfo.getSecond();
653      group.add(fileInfo);
654
655      // change direction when at the end or the beginning
656      g += dir;
657      if (g == ngroups) {
658        dir = -1;
659        g = ngroups - 1;
660      } else if (g < 0) {
661        dir = 1;
662        g = 0;
663      }
664    }
665
666    if (LOG.isDebugEnabled()) {
667      for (int i = 0; i < sizeGroups.length; ++i) {
668        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
669      }
670    }
671
672    return fileGroups;
673  }
674
675  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
676    @Override
677    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
678      TaskAttemptContext tac) throws IOException, InterruptedException {
679      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
680    }
681
682    @Override
683    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
684      Configuration conf = context.getConfiguration();
685      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
686      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
687
688      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
689      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
690      if (mappers == 0 && snapshotFiles.size() > 0) {
691        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
692        mappers = Math.min(mappers, snapshotFiles.size());
693        conf.setInt(CONF_NUM_SPLITS, mappers);
694        conf.setInt(MR_NUM_MAPS, mappers);
695      }
696
697      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
698      List<InputSplit> splits = new ArrayList(groups.size());
699      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
700        splits.add(new ExportSnapshotInputSplit(files));
701      }
702      return splits;
703    }
704
705    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
706      private List<Pair<BytesWritable, Long>> files;
707      private long length;
708
709      public ExportSnapshotInputSplit() {
710        this.files = null;
711      }
712
713      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
714        this.files = new ArrayList(snapshotFiles.size());
715        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
716          this.files.add(
717            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
718          this.length += fileInfo.getSecond();
719        }
720      }
721
722      private List<Pair<BytesWritable, Long>> getSplitKeys() {
723        return files;
724      }
725
726      @Override
727      public long getLength() throws IOException, InterruptedException {
728        return length;
729      }
730
731      @Override
732      public String[] getLocations() throws IOException, InterruptedException {
733        return new String[] {};
734      }
735
736      @Override
737      public void readFields(DataInput in) throws IOException {
738        int count = in.readInt();
739        files = new ArrayList<>(count);
740        length = 0;
741        for (int i = 0; i < count; ++i) {
742          BytesWritable fileInfo = new BytesWritable();
743          fileInfo.readFields(in);
744          long size = in.readLong();
745          files.add(new Pair<>(fileInfo, size));
746          length += size;
747        }
748      }
749
750      @Override
751      public void write(DataOutput out) throws IOException {
752        out.writeInt(files.size());
753        for (final Pair<BytesWritable, Long> fileInfo : files) {
754          fileInfo.getFirst().write(out);
755          out.writeLong(fileInfo.getSecond());
756        }
757      }
758    }
759
760    private static class ExportSnapshotRecordReader
761      extends RecordReader<BytesWritable, NullWritable> {
762      private final List<Pair<BytesWritable, Long>> files;
763      private long totalSize = 0;
764      private long procSize = 0;
765      private int index = -1;
766
767      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
768        this.files = files;
769        for (Pair<BytesWritable, Long> fileInfo : files) {
770          totalSize += fileInfo.getSecond();
771        }
772      }
773
774      @Override
775      public void close() {
776      }
777
778      @Override
779      public BytesWritable getCurrentKey() {
780        return files.get(index).getFirst();
781      }
782
783      @Override
784      public NullWritable getCurrentValue() {
785        return NullWritable.get();
786      }
787
788      @Override
789      public float getProgress() {
790        return (float) procSize / totalSize;
791      }
792
793      @Override
794      public void initialize(InputSplit split, TaskAttemptContext tac) {
795      }
796
797      @Override
798      public boolean nextKeyValue() {
799        if (index >= 0) {
800          procSize += files.get(index).getSecond();
801        }
802        return (++index < files.size());
803      }
804    }
805  }
806
807  // ==========================================================================
808  // Tool
809  // ==========================================================================
810
811  /**
812   * Run Map-Reduce Job to perform the files copy.
813   */
814  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
815    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
816    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
817    throws IOException, InterruptedException, ClassNotFoundException {
818    Configuration conf = getConf();
819    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
820    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
821    if (mappers > 0) {
822      conf.setInt(CONF_NUM_SPLITS, mappers);
823      conf.setInt(MR_NUM_MAPS, mappers);
824    }
825    conf.setInt(CONF_FILES_MODE, filesMode);
826    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
827    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
828    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
829    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
830    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
831    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
832
833    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
834    Job job = new Job(conf);
835    job.setJobName(jobname);
836    job.setJarByClass(ExportSnapshot.class);
837    TableMapReduceUtil.addDependencyJars(job);
838    job.setMapperClass(ExportMapper.class);
839    job.setInputFormatClass(ExportSnapshotInputFormat.class);
840    job.setOutputFormatClass(NullOutputFormat.class);
841    job.setMapSpeculativeExecution(false);
842    job.setNumReduceTasks(0);
843
844    // Acquire the delegation Tokens
845    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
846    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
847    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
848    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
849
850    // Run the MR Job
851    if (!job.waitForCompletion(true)) {
852      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
853    }
854  }
855
856  private void verifySnapshot(final Configuration baseConf, final FileSystem fs, final Path rootDir,
857    final Path snapshotDir) throws IOException {
858    // Update the conf with the current root dir, since may be a different cluster
859    Configuration conf = new Configuration(baseConf);
860    CommonFSUtils.setRootDir(conf, rootDir);
861    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
862    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
863    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
864  }
865
866  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
867    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
868    ExecutorService pool = Executors
869      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
870    List<Future<Void>> futures = new ArrayList<>();
871    for (Path dstPath : traversedPath) {
872      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
873      futures.add(future);
874    }
875    try {
876      for (Future<Void> future : futures) {
877        future.get();
878      }
879    } catch (InterruptedException | ExecutionException e) {
880      throw new IOException(e);
881    } finally {
882      pool.shutdownNow();
883    }
884  }
885
886  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
887    Configuration conf, List<Path> traversedPath) throws IOException {
888    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
889      try {
890        fs.setOwner(path, filesUser, filesGroup);
891      } catch (IOException e) {
892        throw new RuntimeException(
893          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
894      }
895    }, conf);
896  }
897
898  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
899    final List<Path> traversedPath, final Configuration conf) throws IOException {
900    if (filesMode <= 0) {
901      return;
902    }
903    FsPermission perm = new FsPermission(filesMode);
904    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
905      try {
906        fs.setPermission(path, perm);
907      } catch (IOException e) {
908        throw new RuntimeException(
909          "set permission for file " + path + " to " + filesMode + " failed", e);
910      }
911    }, conf);
912  }
913
914  private boolean verifyTarget = true;
915  private boolean verifySource = true;
916  private boolean verifyChecksum = true;
917  private String snapshotName = null;
918  private String targetName = null;
919  private boolean overwrite = false;
920  private String filesGroup = null;
921  private String filesUser = null;
922  private Path outputRoot = null;
923  private Path inputRoot = null;
924  private int bandwidthMB = Integer.MAX_VALUE;
925  private int filesMode = 0;
926  private int mappers = 0;
927  private boolean resetTtl = false;
928
929  @Override
930  protected void processOptions(CommandLine cmd) {
931    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
932    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
933    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
934      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
935    }
936    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
937      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
938    }
939    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
940    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
941    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
942    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
943    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
944    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
945    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
946    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
947    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
948    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
949    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
950  }
951
952  /**
953   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
954   * @return 0 on success, and != 0 upon failure.
955   */
956  @Override
957  public int doWork() throws IOException {
958    Configuration conf = getConf();
959
960    // Check user options
961    if (snapshotName == null) {
962      System.err.println("Snapshot name not provided.");
963      LOG.error("Use -h or --help for usage instructions.");
964      return 0;
965    }
966
967    if (outputRoot == null) {
968      System.err
969        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
970      LOG.error("Use -h or --help for usage instructions.");
971      return 0;
972    }
973
974    if (targetName == null) {
975      targetName = snapshotName;
976    }
977    if (inputRoot == null) {
978      inputRoot = CommonFSUtils.getRootDir(conf);
979    } else {
980      CommonFSUtils.setRootDir(conf, inputRoot);
981    }
982
983    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
984    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
985    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
986    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
987    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
988      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
989    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
990    Path snapshotTmpDir =
991      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
992    Path outputSnapshotDir =
993      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
994    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
995    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
996    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
997      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
998
999    // Verify snapshot source before copying files
1000    if (verifySource) {
1001      LOG.info("Verify snapshot source, inputFs={}, inputRoot={}, snapshotDir={}.",
1002        inputFs.getUri(), inputRoot, snapshotDir);
1003      verifySnapshot(srcConf, inputFs, inputRoot, snapshotDir);
1004    }
1005
1006    // Find the necessary directory which need to change owner and group
1007    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1008    if (outputFs.exists(needSetOwnerDir)) {
1009      if (skipTmp) {
1010        needSetOwnerDir = outputSnapshotDir;
1011      } else {
1012        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1013        if (outputFs.exists(needSetOwnerDir)) {
1014          needSetOwnerDir = snapshotTmpDir;
1015        }
1016      }
1017    }
1018
1019    // Check if the snapshot already exists
1020    if (outputFs.exists(outputSnapshotDir)) {
1021      if (overwrite) {
1022        if (!outputFs.delete(outputSnapshotDir, true)) {
1023          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1024          return 1;
1025        }
1026      } else {
1027        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1028          + outputSnapshotDir);
1029        return 1;
1030      }
1031    }
1032
1033    if (!skipTmp) {
1034      // Check if the snapshot already in-progress
1035      if (outputFs.exists(snapshotTmpDir)) {
1036        if (overwrite) {
1037          if (!outputFs.delete(snapshotTmpDir, true)) {
1038            System.err
1039              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1040            return 1;
1041          }
1042        } else {
1043          System.err
1044            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1045          System.err
1046            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1047          System.err
1048            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1049          return 1;
1050        }
1051      }
1052    }
1053
1054    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1055    // The snapshot references must be copied before the hfiles otherwise the cleaner
1056    // will remove them because they are unreferenced.
1057    List<Path> travesedPaths = new ArrayList<>();
1058    boolean copySucceeded = false;
1059    try {
1060      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1061      travesedPaths =
1062        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1063          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1064      copySucceeded = true;
1065    } catch (IOException e) {
1066      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1067        + " to=" + initialOutputSnapshotDir, e);
1068    } finally {
1069      if (copySucceeded) {
1070        if (filesUser != null || filesGroup != null) {
1071          LOG.warn(
1072            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1073              + (filesGroup == null
1074                ? ""
1075                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1076          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1077        }
1078        if (filesMode > 0) {
1079          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1080          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1081        }
1082      }
1083    }
1084
1085    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1086    // reset TTL for target snapshot.
1087    if (!targetName.equals(snapshotName) || resetTtl) {
1088      SnapshotDescription.Builder snapshotDescBuilder =
1089        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1090      if (!targetName.equals(snapshotName)) {
1091        snapshotDescBuilder.setName(targetName);
1092      }
1093      if (resetTtl) {
1094        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1095      }
1096      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1097        initialOutputSnapshotDir, outputFs);
1098      if (filesUser != null || filesGroup != null) {
1099        outputFs.setOwner(
1100          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1101          filesGroup);
1102      }
1103      if (filesMode > 0) {
1104        outputFs.setPermission(
1105          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1106          new FsPermission((short) filesMode));
1107      }
1108    }
1109
1110    // Step 2 - Start MR Job to copy files
1111    // The snapshot references must be copied before the files otherwise the files gets removed
1112    // by the HFileArchiver, since they have no references.
1113    try {
1114      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1115        filesGroup, filesMode, mappers, bandwidthMB);
1116
1117      LOG.info("Finalize the Snapshot Export");
1118      if (!skipTmp) {
1119        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1120        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1121          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1122            + snapshotTmpDir + " to=" + outputSnapshotDir);
1123        }
1124      }
1125
1126      // Step 4 - Verify snapshot integrity
1127      if (verifyTarget) {
1128        LOG.info("Verify snapshot integrity");
1129        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1130      }
1131
1132      LOG.info("Export Completed: " + targetName);
1133      return 0;
1134    } catch (Exception e) {
1135      LOG.error("Snapshot export failed", e);
1136      if (!skipTmp) {
1137        outputFs.delete(snapshotTmpDir, true);
1138      }
1139      outputFs.delete(outputSnapshotDir, true);
1140      return 1;
1141    }
1142  }
1143
1144  @Override
1145  protected void printUsage() {
1146    super.printUsage();
1147    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1148      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1149      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1150      + "  hbase snapshot export \\\n"
1151      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1152      + "    --copy-to hdfs://srv1:50070/hbase");
1153  }
1154
1155  @Override
1156  protected void addOptions() {
1157    addRequiredOption(Options.SNAPSHOT);
1158    addOption(Options.COPY_TO);
1159    addOption(Options.COPY_FROM);
1160    addOption(Options.TARGET_NAME);
1161    addOption(Options.NO_CHECKSUM_VERIFY);
1162    addOption(Options.NO_TARGET_VERIFY);
1163    addOption(Options.NO_SOURCE_VERIFY);
1164    addOption(Options.OVERWRITE);
1165    addOption(Options.CHUSER);
1166    addOption(Options.CHGROUP);
1167    addOption(Options.CHMOD);
1168    addOption(Options.MAPPERS);
1169    addOption(Options.BANDWIDTH);
1170    addOption(Options.RESET_TTL);
1171  }
1172
1173  public static void main(String[] args) {
1174    new ExportSnapshot().doStaticMain(args);
1175  }
1176}