001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.snapshot;
020
021import java.io.BufferedInputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.function.BiConsumer;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileChecksum;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.permission.FsPermission;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.io.FileLink;
050import org.apache.hadoop.hbase.io.HFileLink;
051import org.apache.hadoop.hbase.io.WALLink;
052import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
053import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
054import org.apache.hadoop.hbase.mob.MobUtils;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.io.BytesWritable;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
080import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
081
082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
085
086/**
087 * Export the specified snapshot to a given FileSystem.
088 *
089 * The .snapshot/name folder is copied to the destination cluster
090 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
091 * When everything is done, the second cluster can restore the snapshot.
092 */
093@InterfaceAudience.Public
094public class ExportSnapshot extends AbstractHBaseTool implements Tool {
095  public static final String NAME = "exportsnapshot";
096  /** Configuration prefix for overrides for the source filesystem */
097  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
098  /** Configuration prefix for overrides for the destination filesystem */
099  public static final String CONF_DEST_PREFIX = NAME + ".to.";
100
101  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
102
103  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
104  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
105  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
106  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
107  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
108  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
109  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
110  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
111  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
112  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
113  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
114  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
115  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
116  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
117  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
118  private static final String CONF_COPY_MANIFEST_THREADS =
119      "snapshot.export.copy.references.threads";
120  private static final int DEFAULT_COPY_MANIFEST_THREADS =
121      Runtime.getRuntime().availableProcessors();
122
123  static class Testing {
124    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
125    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
126    int failuresCountToInject = 0;
127    int injectedFailureCount = 0;
128  }
129
130  // Command line options and defaults.
131  static final class Options {
132    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
133    static final Option TARGET_NAME = new Option(null, "target", true,
134        "Target name for the snapshot.");
135    static final Option COPY_TO = new Option(null, "copy-to", true, "Remote "
136        + "destination hdfs://");
137    static final Option COPY_FROM = new Option(null, "copy-from", true,
138        "Input folder hdfs:// (default hbase.rootdir)");
139    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
140        "Do not verify checksum, use name+length only.");
141    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
142        "Do not verify the integrity of the exported snapshot.");
143    static final Option OVERWRITE = new Option(null, "overwrite", false,
144        "Rewrite the snapshot manifest if already exists.");
145    static final Option CHUSER = new Option(null, "chuser", true,
146        "Change the owner of the files to the specified one.");
147    static final Option CHGROUP = new Option(null, "chgroup", true,
148        "Change the group of the files to the specified one.");
149    static final Option CHMOD = new Option(null, "chmod", true,
150        "Change the permission of the files to the specified one.");
151    static final Option MAPPERS = new Option(null, "mappers", true,
152        "Number of mappers to use during the copy (mapreduce.job.maps).");
153    static final Option BANDWIDTH = new Option(null, "bandwidth", true,
154        "Limit bandwidth to this value in MB/second.");
155  }
156
157  // Export Map-Reduce Counters, to keep track of the progress
158  public enum Counter {
159    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
160    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
161  }
162
163  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
164                                                   NullWritable, NullWritable> {
165    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
166    final static int REPORT_SIZE = 1 * 1024 * 1024;
167    final static int BUFFER_SIZE = 64 * 1024;
168
169    private boolean verifyChecksum;
170    private String filesGroup;
171    private String filesUser;
172    private short filesMode;
173    private int bufferSize;
174
175    private FileSystem outputFs;
176    private Path outputArchive;
177    private Path outputRoot;
178
179    private FileSystem inputFs;
180    private Path inputArchive;
181    private Path inputRoot;
182
183    private static Testing testing = new Testing();
184
185    @Override
186    public void setup(Context context) throws IOException {
187      Configuration conf = context.getConfiguration();
188
189      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
190      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
191
192      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
193
194      filesGroup = conf.get(CONF_FILES_GROUP);
195      filesUser = conf.get(CONF_FILES_USER);
196      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
197      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
198      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
199
200      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
201      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
202
203      try {
204        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
205        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
206      } catch (IOException e) {
207        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
208      }
209
210      try {
211        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
212        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
213      } catch (IOException e) {
214        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
215      }
216
217      // Use the default block size of the outputFs if bigger
218      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
219      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
220      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
221
222      for (Counter c : Counter.values()) {
223        context.getCounter(c).increment(0);
224      }
225      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
226        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
227        // Get number of times we have already injected failure based on attempt number of this
228        // task.
229        testing.injectedFailureCount = context.getTaskAttemptID().getId();
230      }
231    }
232
233    @Override
234    protected void cleanup(Context context) {
235      IOUtils.closeStream(inputFs);
236      IOUtils.closeStream(outputFs);
237    }
238
239    @Override
240    public void map(BytesWritable key, NullWritable value, Context context)
241        throws InterruptedException, IOException {
242      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
243      Path outputPath = getOutputPath(inputInfo);
244
245      copyFile(context, inputInfo, outputPath);
246    }
247
248    /**
249     * Returns the location where the inputPath will be copied.
250     */
251    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
252      Path path = null;
253      switch (inputInfo.getType()) {
254        case HFILE:
255          Path inputPath = new Path(inputInfo.getHfile());
256          String family = inputPath.getParent().getName();
257          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
258          String region = HFileLink.getReferencedRegionName(inputPath.getName());
259          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
260          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
261              new Path(region, new Path(family, hfile)));
262          break;
263        case WAL:
264          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
265          break;
266        default:
267          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
268      }
269      return new Path(outputArchive, path);
270    }
271
272    @SuppressWarnings("checkstyle:linelength")
273    /**
274     * Used by TestExportSnapshot to test for retries when failures happen.
275     * Failure is injected in {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
276     */
277    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
278        throws IOException {
279      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
280      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
281      testing.injectedFailureCount++;
282      context.getCounter(Counter.COPY_FAILED).increment(1);
283      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
284      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
285          testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
286    }
287
288    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
289        final Path outputPath) throws IOException {
290      // Get the file information
291      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
292
293      // Verify if the output file exists and is the same that we want to copy
294      if (outputFs.exists(outputPath)) {
295        FileStatus outputStat = outputFs.getFileStatus(outputPath);
296        if (outputStat != null && sameFile(inputStat, outputStat)) {
297          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
298          context.getCounter(Counter.FILES_SKIPPED).increment(1);
299          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
300          return;
301        }
302      }
303
304      InputStream in = openSourceFile(context, inputInfo);
305      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
306      if (Integer.MAX_VALUE != bandwidthMB) {
307        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
308      }
309
310      try {
311        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
312
313        // Ensure that the output folder is there and copy the file
314        createOutputPath(outputPath.getParent());
315        FSDataOutputStream out = outputFs.create(outputPath, true);
316        try {
317          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
318        } finally {
319          out.close();
320        }
321
322        // Try to Preserve attributes
323        if (!preserveAttributes(outputPath, inputStat)) {
324          LOG.warn("You may have to run manually chown on: " + outputPath);
325        }
326      } finally {
327        in.close();
328        injectTestFailure(context, inputInfo);
329      }
330    }
331
332    /**
333     * Create the output folder and optionally set ownership.
334     */
335    private void createOutputPath(final Path path) throws IOException {
336      if (filesUser == null && filesGroup == null) {
337        outputFs.mkdirs(path);
338      } else {
339        Path parent = path.getParent();
340        if (!outputFs.exists(parent) && !parent.isRoot()) {
341          createOutputPath(parent);
342        }
343        outputFs.mkdirs(path);
344        if (filesUser != null || filesGroup != null) {
345          // override the owner when non-null user/group is specified
346          outputFs.setOwner(path, filesUser, filesGroup);
347        }
348        if (filesMode > 0) {
349          outputFs.setPermission(path, new FsPermission(filesMode));
350        }
351      }
352    }
353
354    /**
355     * Try to Preserve the files attribute selected by the user copying them from the source file
356     * This is only required when you are exporting as a different user than "hbase" or on a system
357     * that doesn't have the "hbase" user.
358     *
359     * This is not considered a blocking failure since the user can force a chmod with the user
360     * that knows is available on the system.
361     */
362    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
363      FileStatus stat;
364      try {
365        stat = outputFs.getFileStatus(path);
366      } catch (IOException e) {
367        LOG.warn("Unable to get the status for file=" + path);
368        return false;
369      }
370
371      try {
372        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
373          outputFs.setPermission(path, new FsPermission(filesMode));
374        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
375          outputFs.setPermission(path, refStat.getPermission());
376        }
377      } catch (IOException e) {
378        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
379        return false;
380      }
381
382      boolean hasRefStat = (refStat != null);
383      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
384      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
385      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
386        try {
387          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
388            outputFs.setOwner(path, user, group);
389          }
390        } catch (IOException e) {
391          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
392          LOG.warn("The user/group may not exist on the destination cluster: user=" +
393                   user + " group=" + group);
394          return false;
395        }
396      }
397
398      return true;
399    }
400
401    private boolean stringIsNotEmpty(final String str) {
402      return str != null && str.length() > 0;
403    }
404
405    private void copyData(final Context context,
406        final Path inputPath, final InputStream in,
407        final Path outputPath, final FSDataOutputStream out,
408        final long inputFileSize)
409        throws IOException {
410      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
411                                   " (%.1f%%)";
412
413      try {
414        byte[] buffer = new byte[bufferSize];
415        long totalBytesWritten = 0;
416        int reportBytes = 0;
417        int bytesRead;
418
419        long stime = System.currentTimeMillis();
420        while ((bytesRead = in.read(buffer)) > 0) {
421          out.write(buffer, 0, bytesRead);
422          totalBytesWritten += bytesRead;
423          reportBytes += bytesRead;
424
425          if (reportBytes >= REPORT_SIZE) {
426            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
427            context.setStatus(String.format(statusMessage,
428                              StringUtils.humanReadableInt(totalBytesWritten),
429                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
430                              " from " + inputPath + " to " + outputPath);
431            reportBytes = 0;
432          }
433        }
434        long etime = System.currentTimeMillis();
435
436        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
437        context.setStatus(String.format(statusMessage,
438                          StringUtils.humanReadableInt(totalBytesWritten),
439                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
440                          " from " + inputPath + " to " + outputPath);
441
442        // Verify that the written size match
443        if (totalBytesWritten != inputFileSize) {
444          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
445                       " expected=" + inputFileSize + " for file=" + inputPath;
446          throw new IOException(msg);
447        }
448
449        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
450        LOG.info("size=" + totalBytesWritten +
451            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
452            " time=" + StringUtils.formatTimeDiff(etime, stime) +
453            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
454        context.getCounter(Counter.FILES_COPIED).increment(1);
455      } catch (IOException e) {
456        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
457        context.getCounter(Counter.COPY_FAILED).increment(1);
458        throw e;
459      }
460    }
461
462    /**
463     * Try to open the "source" file.
464     * Throws an IOException if the communication with the inputFs fail or
465     * if the file is not found.
466     */
467    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
468            throws IOException {
469      try {
470        Configuration conf = context.getConfiguration();
471        FileLink link = null;
472        switch (fileInfo.getType()) {
473          case HFILE:
474            Path inputPath = new Path(fileInfo.getHfile());
475            link = getFileLink(inputPath, conf);
476            break;
477          case WAL:
478            String serverName = fileInfo.getWalServer();
479            String logName = fileInfo.getWalName();
480            link = new WALLink(inputRoot, serverName, logName);
481            break;
482          default:
483            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
484        }
485        return link.open(inputFs);
486      } catch (IOException e) {
487        context.getCounter(Counter.MISSING_FILES).increment(1);
488        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
489        throw e;
490      }
491    }
492
493    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
494        throws IOException {
495      try {
496        Configuration conf = context.getConfiguration();
497        FileLink link = null;
498        switch (fileInfo.getType()) {
499          case HFILE:
500            Path inputPath = new Path(fileInfo.getHfile());
501            link = getFileLink(inputPath, conf);
502            break;
503          case WAL:
504            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
505            break;
506          default:
507            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
508        }
509        return link.getFileStatus(inputFs);
510      } catch (FileNotFoundException e) {
511        context.getCounter(Counter.MISSING_FILES).increment(1);
512        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
513        throw e;
514      } catch (IOException e) {
515        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
516        throw e;
517      }
518    }
519
520    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
521      String regionName = HFileLink.getReferencedRegionName(path.getName());
522      TableName tableName = HFileLink.getReferencedTableName(path.getName());
523      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
524        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
525                HFileArchiveUtil.getArchivePath(conf), path);
526      }
527      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
528    }
529
530    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
531      try {
532        return fs.getFileChecksum(path);
533      } catch (IOException e) {
534        LOG.warn("Unable to get checksum for file=" + path, e);
535        return null;
536      }
537    }
538
539    /**
540     * Check if the two files are equal by looking at the file length,
541     * and at the checksum (if user has specified the verifyChecksum flag).
542     */
543    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
544      // Not matching length
545      if (inputStat.getLen() != outputStat.getLen()) return false;
546
547      // Mark files as equals, since user asked for no checksum verification
548      if (!verifyChecksum) return true;
549
550      // If checksums are not available, files are not the same.
551      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
552      if (inChecksum == null) return false;
553
554      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
555      if (outChecksum == null) return false;
556
557      return inChecksum.equals(outChecksum);
558    }
559  }
560
561  // ==========================================================================
562  //  Input Format
563  // ==========================================================================
564
565  /**
566   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
567   * @return list of files referenced by the snapshot (pair of path and size)
568   */
569  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
570      final FileSystem fs, final Path snapshotDir) throws IOException {
571    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
572
573    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
574    final TableName table = TableName.valueOf(snapshotDesc.getTable());
575
576    // Get snapshot files
577    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
578    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
579      new SnapshotReferenceUtil.SnapshotVisitor() {
580        @Override
581        public void storeFile(final RegionInfo regionInfo, final String family,
582            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
583          // for storeFile.hasReference() case, copied as part of the manifest
584          if (!storeFile.hasReference()) {
585            String region = regionInfo.getEncodedName();
586            String hfile = storeFile.getName();
587            Path path = HFileLink.createPath(table, region, family, hfile);
588
589            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
590              .setType(SnapshotFileInfo.Type.HFILE)
591              .setHfile(path.toString())
592              .build();
593
594            long size;
595            if (storeFile.hasFileSize()) {
596              size = storeFile.getFileSize();
597            } else {
598              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
599            }
600            files.add(new Pair<>(fileInfo, size));
601          }
602        }
603    });
604
605    return files;
606  }
607
608  /**
609   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
610   * The groups created will have similar amounts of bytes.
611   * <p>
612   * The algorithm used is pretty straightforward; the file list is sorted by size,
613   * and then each group fetch the bigger file available, iterating through groups
614   * alternating the direction.
615   */
616  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
617      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
618    // Sort files by size, from small to big
619    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
620      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
621        long r = a.getSecond() - b.getSecond();
622        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
623      }
624    });
625
626    // create balanced groups
627    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
628    long[] sizeGroups = new long[ngroups];
629    int hi = files.size() - 1;
630    int lo = 0;
631
632    List<Pair<SnapshotFileInfo, Long>> group;
633    int dir = 1;
634    int g = 0;
635
636    while (hi >= lo) {
637      if (g == fileGroups.size()) {
638        group = new LinkedList<>();
639        fileGroups.add(group);
640      } else {
641        group = fileGroups.get(g);
642      }
643
644      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
645
646      // add the hi one
647      sizeGroups[g] += fileInfo.getSecond();
648      group.add(fileInfo);
649
650      // change direction when at the end or the beginning
651      g += dir;
652      if (g == ngroups) {
653        dir = -1;
654        g = ngroups - 1;
655      } else if (g < 0) {
656        dir = 1;
657        g = 0;
658      }
659    }
660
661    if (LOG.isDebugEnabled()) {
662      for (int i = 0; i < sizeGroups.length; ++i) {
663        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
664      }
665    }
666
667    return fileGroups;
668  }
669
670  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
671    @Override
672    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
673        TaskAttemptContext tac) throws IOException, InterruptedException {
674      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
675    }
676
677    @Override
678    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
679      Configuration conf = context.getConfiguration();
680      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
681      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
682
683      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
684      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
685      if (mappers == 0 && snapshotFiles.size() > 0) {
686        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
687        mappers = Math.min(mappers, snapshotFiles.size());
688        conf.setInt(CONF_NUM_SPLITS, mappers);
689        conf.setInt(MR_NUM_MAPS, mappers);
690      }
691
692      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
693      List<InputSplit> splits = new ArrayList(groups.size());
694      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
695        splits.add(new ExportSnapshotInputSplit(files));
696      }
697      return splits;
698    }
699
700    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
701      private List<Pair<BytesWritable, Long>> files;
702      private long length;
703
704      public ExportSnapshotInputSplit() {
705        this.files = null;
706      }
707
708      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
709        this.files = new ArrayList(snapshotFiles.size());
710        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
711          this.files.add(new Pair<>(
712            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
713          this.length += fileInfo.getSecond();
714        }
715      }
716
717      private List<Pair<BytesWritable, Long>> getSplitKeys() {
718        return files;
719      }
720
721      @Override
722      public long getLength() throws IOException, InterruptedException {
723        return length;
724      }
725
726      @Override
727      public String[] getLocations() throws IOException, InterruptedException {
728        return new String[] {};
729      }
730
731      @Override
732      public void readFields(DataInput in) throws IOException {
733        int count = in.readInt();
734        files = new ArrayList<>(count);
735        length = 0;
736        for (int i = 0; i < count; ++i) {
737          BytesWritable fileInfo = new BytesWritable();
738          fileInfo.readFields(in);
739          long size = in.readLong();
740          files.add(new Pair<>(fileInfo, size));
741          length += size;
742        }
743      }
744
745      @Override
746      public void write(DataOutput out) throws IOException {
747        out.writeInt(files.size());
748        for (final Pair<BytesWritable, Long> fileInfo: files) {
749          fileInfo.getFirst().write(out);
750          out.writeLong(fileInfo.getSecond());
751        }
752      }
753    }
754
755    private static class ExportSnapshotRecordReader
756        extends RecordReader<BytesWritable, NullWritable> {
757      private final List<Pair<BytesWritable, Long>> files;
758      private long totalSize = 0;
759      private long procSize = 0;
760      private int index = -1;
761
762      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
763        this.files = files;
764        for (Pair<BytesWritable, Long> fileInfo: files) {
765          totalSize += fileInfo.getSecond();
766        }
767      }
768
769      @Override
770      public void close() { }
771
772      @Override
773      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
774
775      @Override
776      public NullWritable getCurrentValue() { return NullWritable.get(); }
777
778      @Override
779      public float getProgress() { return (float)procSize / totalSize; }
780
781      @Override
782      public void initialize(InputSplit split, TaskAttemptContext tac) { }
783
784      @Override
785      public boolean nextKeyValue() {
786        if (index >= 0) {
787          procSize += files.get(index).getSecond();
788        }
789        return(++index < files.size());
790      }
791    }
792  }
793
794  // ==========================================================================
795  //  Tool
796  // ==========================================================================
797
798  /**
799   * Run Map-Reduce Job to perform the files copy.
800   */
801  private void runCopyJob(final Path inputRoot, final Path outputRoot,
802      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
803      final String filesUser, final String filesGroup, final int filesMode,
804      final int mappers, final int bandwidthMB)
805          throws IOException, InterruptedException, ClassNotFoundException {
806    Configuration conf = getConf();
807    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
808    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
809    if (mappers > 0) {
810      conf.setInt(CONF_NUM_SPLITS, mappers);
811      conf.setInt(MR_NUM_MAPS, mappers);
812    }
813    conf.setInt(CONF_FILES_MODE, filesMode);
814    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
815    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
816    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
817    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
818    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
819    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
820
821    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
822    Job job = new Job(conf);
823    job.setJobName(jobname);
824    job.setJarByClass(ExportSnapshot.class);
825    TableMapReduceUtil.addDependencyJars(job);
826    job.setMapperClass(ExportMapper.class);
827    job.setInputFormatClass(ExportSnapshotInputFormat.class);
828    job.setOutputFormatClass(NullOutputFormat.class);
829    job.setMapSpeculativeExecution(false);
830    job.setNumReduceTasks(0);
831
832    // Acquire the delegation Tokens
833    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
834    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
835      new Path[] { inputRoot }, srcConf);
836    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
837    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
838        new Path[] { outputRoot }, destConf);
839
840    // Run the MR Job
841    if (!job.waitForCompletion(true)) {
842      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
843    }
844  }
845
846  private void verifySnapshot(final Configuration baseConf,
847      final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
848    // Update the conf with the current root dir, since may be a different cluster
849    Configuration conf = new Configuration(baseConf);
850    CommonFSUtils.setRootDir(conf, rootDir);
851    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
852    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
853    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
854  }
855
856  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
857      BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
858    ExecutorService pool = Executors
859        .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
860    List<Future<Void>> futures = new ArrayList<>();
861    for (Path dstPath : traversedPath) {
862      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
863      futures.add(future);
864    }
865    try {
866      for (Future<Void> future : futures) {
867        future.get();
868      }
869    } catch (InterruptedException | ExecutionException e) {
870      throw new IOException(e);
871    } finally {
872      pool.shutdownNow();
873    }
874  }
875
876  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
877      Configuration conf, List<Path> traversedPath) throws IOException {
878    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
879      try {
880        fs.setOwner(path, filesUser, filesGroup);
881      } catch (IOException e) {
882        throw new RuntimeException(
883            "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
884      }
885    }, conf);
886  }
887
888  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
889      final List<Path> traversedPath, final Configuration conf) throws IOException {
890    if (filesMode <= 0) {
891      return;
892    }
893    FsPermission perm = new FsPermission(filesMode);
894    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
895      try {
896        fs.setPermission(path, perm);
897      } catch (IOException e) {
898        throw new RuntimeException(
899            "set permission for file " + path + " to " + filesMode + " failed", e);
900      }
901    }, conf);
902  }
903
904  private boolean verifyTarget = true;
905  private boolean verifyChecksum = true;
906  private String snapshotName = null;
907  private String targetName = null;
908  private boolean overwrite = false;
909  private String filesGroup = null;
910  private String filesUser = null;
911  private Path outputRoot = null;
912  private Path inputRoot = null;
913  private int bandwidthMB = Integer.MAX_VALUE;
914  private int filesMode = 0;
915  private int mappers = 0;
916
917  @Override
918  protected void processOptions(CommandLine cmd) {
919    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
920    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
921    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
922      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
923    }
924    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
925      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
926    }
927    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
928    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
929    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
930    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode);
931    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
932    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
933    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
934    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
935    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
936  }
937
938  /**
939   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
940   * @return 0 on success, and != 0 upon failure.
941   */
942  @Override
943  public int doWork() throws IOException {
944    Configuration conf = getConf();
945
946    // Check user options
947    if (snapshotName == null) {
948      System.err.println("Snapshot name not provided.");
949      LOG.error("Use -h or --help for usage instructions.");
950      return 0;
951    }
952
953    if (outputRoot == null) {
954      System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt()
955              + ") not provided.");
956      LOG.error("Use -h or --help for usage instructions.");
957      return 0;
958    }
959
960    if (targetName == null) {
961      targetName = snapshotName;
962    }
963    if (inputRoot == null) {
964      inputRoot = CommonFSUtils.getRootDir(conf);
965    } else {
966      CommonFSUtils.setRootDir(conf, inputRoot);
967    }
968
969    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
970    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
971    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
972    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
973    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
974    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
975    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) ||
976        conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
977    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
978    Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot,
979        destConf);
980    Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
981    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
982    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
983    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}",
984      outputFs, outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
985
986    // Find the necessary directory which need to change owner and group
987    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
988    if (outputFs.exists(needSetOwnerDir)) {
989      if (skipTmp) {
990        needSetOwnerDir = outputSnapshotDir;
991      } else {
992        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
993        if (outputFs.exists(needSetOwnerDir)) {
994          needSetOwnerDir = snapshotTmpDir;
995        }
996      }
997    }
998
999    // Check if the snapshot already exists
1000    if (outputFs.exists(outputSnapshotDir)) {
1001      if (overwrite) {
1002        if (!outputFs.delete(outputSnapshotDir, true)) {
1003          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1004          return 1;
1005        }
1006      } else {
1007        System.err.println("The snapshot '" + targetName +
1008          "' already exists in the destination: " + outputSnapshotDir);
1009        return 1;
1010      }
1011    }
1012
1013    if (!skipTmp) {
1014      // Check if the snapshot already in-progress
1015      if (outputFs.exists(snapshotTmpDir)) {
1016        if (overwrite) {
1017          if (!outputFs.delete(snapshotTmpDir, true)) {
1018            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
1019            return 1;
1020          }
1021        } else {
1022          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
1023          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
1024          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
1025          return 1;
1026        }
1027      }
1028    }
1029
1030    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
1031    // The snapshot references must be copied before the hfiles otherwise the cleaner
1032    // will remove them because they are unreferenced.
1033    List<Path> travesedPaths = new ArrayList<>();
1034    boolean copySucceeded = false;
1035    try {
1036      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1037      travesedPaths =
1038          FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1039              conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1040      copySucceeded = true;
1041    } catch (IOException e) {
1042      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
1043        snapshotDir + " to=" + initialOutputSnapshotDir, e);
1044    } finally {
1045      if (copySucceeded) {
1046        if (filesUser != null || filesGroup != null) {
1047          LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1048              + filesUser)
1049              + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1050                  + filesGroup));
1051          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1052        }
1053        if (filesMode > 0) {
1054          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1055          setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
1056        }
1057      }
1058    }
1059
1060    // Write a new .snapshotinfo if the target name is different from the source name
1061    if (!targetName.equals(snapshotName)) {
1062      SnapshotDescription snapshotDesc =
1063        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1064          .toBuilder()
1065          .setName(targetName)
1066          .build();
1067      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs);
1068      if (filesUser != null || filesGroup != null) {
1069        outputFs.setOwner(new Path(initialOutputSnapshotDir,
1070          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup);
1071      }
1072      if (filesMode > 0) {
1073        outputFs.setPermission(new Path(initialOutputSnapshotDir,
1074          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode));
1075      }
1076    }
1077
1078    // Step 2 - Start MR Job to copy files
1079    // The snapshot references must be copied before the files otherwise the files gets removed
1080    // by the HFileArchiver, since they have no references.
1081    try {
1082      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1083                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1084
1085      LOG.info("Finalize the Snapshot Export");
1086      if (!skipTmp) {
1087        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1088        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1089          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1090            snapshotTmpDir + " to=" + outputSnapshotDir);
1091        }
1092      }
1093
1094      // Step 4 - Verify snapshot integrity
1095      if (verifyTarget) {
1096        LOG.info("Verify snapshot integrity");
1097        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1098      }
1099
1100      LOG.info("Export Completed: " + targetName);
1101      return 0;
1102    } catch (Exception e) {
1103      LOG.error("Snapshot export failed", e);
1104      if (!skipTmp) {
1105        outputFs.delete(snapshotTmpDir, true);
1106      }
1107      outputFs.delete(outputSnapshotDir, true);
1108      return 1;
1109    } finally {
1110      IOUtils.closeStream(inputFs);
1111      IOUtils.closeStream(outputFs);
1112    }
1113  }
1114
1115  @Override
1116  protected void printUsage() {
1117    super.printUsage();
1118    System.out.println("\n"
1119        + "Examples:\n"
1120        + "  hbase snapshot export \\\n"
1121        + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1122        + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n"
1123        + "\n"
1124        + "  hbase snapshot export \\\n"
1125        + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1126        + "    --copy-to hdfs://srv1:50070/hbase");
1127  }
1128
1129  @Override protected void addOptions() {
1130    addRequiredOption(Options.SNAPSHOT);
1131    addOption(Options.COPY_TO);
1132    addOption(Options.COPY_FROM);
1133    addOption(Options.TARGET_NAME);
1134    addOption(Options.NO_CHECKSUM_VERIFY);
1135    addOption(Options.NO_TARGET_VERIFY);
1136    addOption(Options.OVERWRITE);
1137    addOption(Options.CHUSER);
1138    addOption(Options.CHGROUP);
1139    addOption(Options.CHMOD);
1140    addOption(Options.MAPPERS);
1141    addOption(Options.BANDWIDTH);
1142  }
1143
1144  public static void main(String[] args) {
1145    new ExportSnapshot().doStaticMain(args);
1146  }
1147}