001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.snapshot;
020
021import java.io.BufferedInputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.function.BiConsumer;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FSDataOutputStream;
041import org.apache.hadoop.fs.FileChecksum;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.FsPermission;
046import org.apache.hadoop.hbase.HBaseConfiguration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.io.FileLink;
051import org.apache.hadoop.hbase.io.HFileLink;
052import org.apache.hadoop.hbase.io.WALLink;
053import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
054import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
055import org.apache.hadoop.hbase.mob.MobUtils;
056import org.apache.hadoop.hbase.util.AbstractHBaseTool;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.HFileArchiveUtil;
059import org.apache.hadoop.hbase.util.Pair;
060import org.apache.hadoop.io.BytesWritable;
061import org.apache.hadoop.io.IOUtils;
062import org.apache.hadoop.io.NullWritable;
063import org.apache.hadoop.io.Writable;
064import org.apache.hadoop.mapreduce.InputFormat;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.JobContext;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.RecordReader;
070import org.apache.hadoop.mapreduce.TaskAttemptContext;
071import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
072import org.apache.hadoop.mapreduce.security.TokenCache;
073import org.apache.hadoop.util.StringUtils;
074import org.apache.hadoop.util.Tool;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
079import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
083
084/**
085 * Export the specified snapshot to a given FileSystem.
086 *
087 * The .snapshot/name folder is copied to the destination cluster
088 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
089 * When everything is done, the second cluster can restore the snapshot.
090 */
091@InterfaceAudience.Public
092public class ExportSnapshot extends AbstractHBaseTool implements Tool {
093  public static final String NAME = "exportsnapshot";
094  /** Configuration prefix for overrides for the source filesystem */
095  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
096  /** Configuration prefix for overrides for the destination filesystem */
097  public static final String CONF_DEST_PREFIX = NAME + ".to.";
098
099  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
100
101  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
102  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
103  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
104  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
105  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
106  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
107  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
108  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
109  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
110  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
111  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
112  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
113  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
114  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
115  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
116  private static final String CONF_COPY_MANIFEST_THREADS =
117      "snapshot.export.copy.references.threads";
118  private static final int DEFAULT_COPY_MANIFEST_THREADS =
119      Runtime.getRuntime().availableProcessors();
120
121  static class Testing {
122    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
123    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
124    int failuresCountToInject = 0;
125    int injectedFailureCount = 0;
126  }
127
128  // Command line options and defaults.
129  static final class Options {
130    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
131    static final Option TARGET_NAME = new Option(null, "target", true,
132        "Target name for the snapshot.");
133    static final Option COPY_TO = new Option(null, "copy-to", true, "Remote "
134        + "destination hdfs://");
135    static final Option COPY_FROM = new Option(null, "copy-from", true,
136        "Input folder hdfs:// (default hbase.rootdir)");
137    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
138        "Do not verify checksum, use name+length only.");
139    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
140        "Do not verify the integrity of the exported snapshot.");
141    static final Option OVERWRITE = new Option(null, "overwrite", false,
142        "Rewrite the snapshot manifest if already exists.");
143    static final Option CHUSER = new Option(null, "chuser", true,
144        "Change the owner of the files to the specified one.");
145    static final Option CHGROUP = new Option(null, "chgroup", true,
146        "Change the group of the files to the specified one.");
147    static final Option CHMOD = new Option(null, "chmod", true,
148        "Change the permission of the files to the specified one.");
149    static final Option MAPPERS = new Option(null, "mappers", true,
150        "Number of mappers to use during the copy (mapreduce.job.maps).");
151    static final Option BANDWIDTH = new Option(null, "bandwidth", true,
152        "Limit bandwidth to this value in MB/second.");
153  }
154
155  // Export Map-Reduce Counters, to keep track of the progress
156  public enum Counter {
157    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
158    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
159  }
160
161  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
162                                                   NullWritable, NullWritable> {
163    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
164    final static int REPORT_SIZE = 1 * 1024 * 1024;
165    final static int BUFFER_SIZE = 64 * 1024;
166
167    private boolean verifyChecksum;
168    private String filesGroup;
169    private String filesUser;
170    private short filesMode;
171    private int bufferSize;
172
173    private FileSystem outputFs;
174    private Path outputArchive;
175    private Path outputRoot;
176
177    private FileSystem inputFs;
178    private Path inputArchive;
179    private Path inputRoot;
180
181    private static Testing testing = new Testing();
182
183    @Override
184    public void setup(Context context) throws IOException {
185      Configuration conf = context.getConfiguration();
186
187      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
188      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
189
190      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
191
192      filesGroup = conf.get(CONF_FILES_GROUP);
193      filesUser = conf.get(CONF_FILES_USER);
194      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
195      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
196      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
197
198      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
199      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
200
201      try {
202        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
203        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
204      } catch (IOException e) {
205        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
206      }
207
208      try {
209        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
210        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
211      } catch (IOException e) {
212        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
213      }
214
215      // Use the default block size of the outputFs if bigger
216      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
217      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
218      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
219
220      for (Counter c : Counter.values()) {
221        context.getCounter(c).increment(0);
222      }
223      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
224        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
225        // Get number of times we have already injected failure based on attempt number of this
226        // task.
227        testing.injectedFailureCount = context.getTaskAttemptID().getId();
228      }
229    }
230
231    @Override
232    protected void cleanup(Context context) {
233      IOUtils.closeStream(inputFs);
234      IOUtils.closeStream(outputFs);
235    }
236
237    @Override
238    public void map(BytesWritable key, NullWritable value, Context context)
239        throws InterruptedException, IOException {
240      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
241      Path outputPath = getOutputPath(inputInfo);
242
243      copyFile(context, inputInfo, outputPath);
244    }
245
246    /**
247     * Returns the location where the inputPath will be copied.
248     */
249    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
250      Path path = null;
251      switch (inputInfo.getType()) {
252        case HFILE:
253          Path inputPath = new Path(inputInfo.getHfile());
254          String family = inputPath.getParent().getName();
255          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
256          String region = HFileLink.getReferencedRegionName(inputPath.getName());
257          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
258          path = new Path(FSUtils.getTableDir(new Path("./"), table),
259              new Path(region, new Path(family, hfile)));
260          break;
261        case WAL:
262          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
263          break;
264        default:
265          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
266      }
267      return new Path(outputArchive, path);
268    }
269
270    @SuppressWarnings("checkstyle:linelength")
271    /**
272     * Used by TestExportSnapshot to test for retries when failures happen.
273     * Failure is injected in {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
274     */
275    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
276        throws IOException {
277      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
278      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
279      testing.injectedFailureCount++;
280      context.getCounter(Counter.COPY_FAILED).increment(1);
281      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
282      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
283          testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
284    }
285
286    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
287        final Path outputPath) throws IOException {
288      // Get the file information
289      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
290
291      // Verify if the output file exists and is the same that we want to copy
292      if (outputFs.exists(outputPath)) {
293        FileStatus outputStat = outputFs.getFileStatus(outputPath);
294        if (outputStat != null && sameFile(inputStat, outputStat)) {
295          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
296          context.getCounter(Counter.FILES_SKIPPED).increment(1);
297          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
298          return;
299        }
300      }
301
302      InputStream in = openSourceFile(context, inputInfo);
303      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
304      if (Integer.MAX_VALUE != bandwidthMB) {
305        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
306      }
307
308      try {
309        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
310
311        // Ensure that the output folder is there and copy the file
312        createOutputPath(outputPath.getParent());
313        FSDataOutputStream out = outputFs.create(outputPath, true);
314        try {
315          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
316        } finally {
317          out.close();
318        }
319
320        // Try to Preserve attributes
321        if (!preserveAttributes(outputPath, inputStat)) {
322          LOG.warn("You may have to run manually chown on: " + outputPath);
323        }
324      } finally {
325        in.close();
326        injectTestFailure(context, inputInfo);
327      }
328    }
329
330    /**
331     * Create the output folder and optionally set ownership.
332     */
333    private void createOutputPath(final Path path) throws IOException {
334      if (filesUser == null && filesGroup == null) {
335        outputFs.mkdirs(path);
336      } else {
337        Path parent = path.getParent();
338        if (!outputFs.exists(parent) && !parent.isRoot()) {
339          createOutputPath(parent);
340        }
341        outputFs.mkdirs(path);
342        if (filesUser != null || filesGroup != null) {
343          // override the owner when non-null user/group is specified
344          outputFs.setOwner(path, filesUser, filesGroup);
345        }
346        if (filesMode > 0) {
347          outputFs.setPermission(path, new FsPermission(filesMode));
348        }
349      }
350    }
351
352    /**
353     * Try to Preserve the files attribute selected by the user copying them from the source file
354     * This is only required when you are exporting as a different user than "hbase" or on a system
355     * that doesn't have the "hbase" user.
356     *
357     * This is not considered a blocking failure since the user can force a chmod with the user
358     * that knows is available on the system.
359     */
360    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
361      FileStatus stat;
362      try {
363        stat = outputFs.getFileStatus(path);
364      } catch (IOException e) {
365        LOG.warn("Unable to get the status for file=" + path);
366        return false;
367      }
368
369      try {
370        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
371          outputFs.setPermission(path, new FsPermission(filesMode));
372        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
373          outputFs.setPermission(path, refStat.getPermission());
374        }
375      } catch (IOException e) {
376        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
377        return false;
378      }
379
380      boolean hasRefStat = (refStat != null);
381      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
382      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
383      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
384        try {
385          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
386            outputFs.setOwner(path, user, group);
387          }
388        } catch (IOException e) {
389          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
390          LOG.warn("The user/group may not exist on the destination cluster: user=" +
391                   user + " group=" + group);
392          return false;
393        }
394      }
395
396      return true;
397    }
398
399    private boolean stringIsNotEmpty(final String str) {
400      return str != null && str.length() > 0;
401    }
402
403    private void copyData(final Context context,
404        final Path inputPath, final InputStream in,
405        final Path outputPath, final FSDataOutputStream out,
406        final long inputFileSize)
407        throws IOException {
408      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
409                                   " (%.1f%%)";
410
411      try {
412        byte[] buffer = new byte[bufferSize];
413        long totalBytesWritten = 0;
414        int reportBytes = 0;
415        int bytesRead;
416
417        long stime = System.currentTimeMillis();
418        while ((bytesRead = in.read(buffer)) > 0) {
419          out.write(buffer, 0, bytesRead);
420          totalBytesWritten += bytesRead;
421          reportBytes += bytesRead;
422
423          if (reportBytes >= REPORT_SIZE) {
424            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
425            context.setStatus(String.format(statusMessage,
426                              StringUtils.humanReadableInt(totalBytesWritten),
427                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
428                              " from " + inputPath + " to " + outputPath);
429            reportBytes = 0;
430          }
431        }
432        long etime = System.currentTimeMillis();
433
434        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
435        context.setStatus(String.format(statusMessage,
436                          StringUtils.humanReadableInt(totalBytesWritten),
437                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
438                          " from " + inputPath + " to " + outputPath);
439
440        // Verify that the written size match
441        if (totalBytesWritten != inputFileSize) {
442          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
443                       " expected=" + inputFileSize + " for file=" + inputPath;
444          throw new IOException(msg);
445        }
446
447        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
448        LOG.info("size=" + totalBytesWritten +
449            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
450            " time=" + StringUtils.formatTimeDiff(etime, stime) +
451            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
452        context.getCounter(Counter.FILES_COPIED).increment(1);
453      } catch (IOException e) {
454        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
455        context.getCounter(Counter.COPY_FAILED).increment(1);
456        throw e;
457      }
458    }
459
460    /**
461     * Try to open the "source" file.
462     * Throws an IOException if the communication with the inputFs fail or
463     * if the file is not found.
464     */
465    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
466            throws IOException {
467      try {
468        Configuration conf = context.getConfiguration();
469        FileLink link = null;
470        switch (fileInfo.getType()) {
471          case HFILE:
472            Path inputPath = new Path(fileInfo.getHfile());
473            link = getFileLink(inputPath, conf);
474            break;
475          case WAL:
476            String serverName = fileInfo.getWalServer();
477            String logName = fileInfo.getWalName();
478            link = new WALLink(inputRoot, serverName, logName);
479            break;
480          default:
481            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
482        }
483        return link.open(inputFs);
484      } catch (IOException e) {
485        context.getCounter(Counter.MISSING_FILES).increment(1);
486        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
487        throw e;
488      }
489    }
490
491    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
492        throws IOException {
493      try {
494        Configuration conf = context.getConfiguration();
495        FileLink link = null;
496        switch (fileInfo.getType()) {
497          case HFILE:
498            Path inputPath = new Path(fileInfo.getHfile());
499            link = getFileLink(inputPath, conf);
500            break;
501          case WAL:
502            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
503            break;
504          default:
505            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
506        }
507        return link.getFileStatus(inputFs);
508      } catch (FileNotFoundException e) {
509        context.getCounter(Counter.MISSING_FILES).increment(1);
510        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
511        throw e;
512      } catch (IOException e) {
513        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
514        throw e;
515      }
516    }
517
518    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
519      String regionName = HFileLink.getReferencedRegionName(path.getName());
520      TableName tableName = HFileLink.getReferencedTableName(path.getName());
521      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
522        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
523                HFileArchiveUtil.getArchivePath(conf), path);
524      }
525      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
526    }
527
528    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
529      try {
530        return fs.getFileChecksum(path);
531      } catch (IOException e) {
532        LOG.warn("Unable to get checksum for file=" + path, e);
533        return null;
534      }
535    }
536
537    /**
538     * Check if the two files are equal by looking at the file length,
539     * and at the checksum (if user has specified the verifyChecksum flag).
540     */
541    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
542      // Not matching length
543      if (inputStat.getLen() != outputStat.getLen()) return false;
544
545      // Mark files as equals, since user asked for no checksum verification
546      if (!verifyChecksum) return true;
547
548      // If checksums are not available, files are not the same.
549      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
550      if (inChecksum == null) return false;
551
552      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
553      if (outChecksum == null) return false;
554
555      return inChecksum.equals(outChecksum);
556    }
557  }
558
559  // ==========================================================================
560  //  Input Format
561  // ==========================================================================
562
563  /**
564   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
565   * @return list of files referenced by the snapshot (pair of path and size)
566   */
567  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
568      final FileSystem fs, final Path snapshotDir) throws IOException {
569    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
570
571    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
572    final TableName table = TableName.valueOf(snapshotDesc.getTable());
573
574    // Get snapshot files
575    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
576    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
577      new SnapshotReferenceUtil.SnapshotVisitor() {
578        @Override
579        public void storeFile(final RegionInfo regionInfo, final String family,
580            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
581          // for storeFile.hasReference() case, copied as part of the manifest
582          if (!storeFile.hasReference()) {
583            String region = regionInfo.getEncodedName();
584            String hfile = storeFile.getName();
585            Path path = HFileLink.createPath(table, region, family, hfile);
586
587            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
588              .setType(SnapshotFileInfo.Type.HFILE)
589              .setHfile(path.toString())
590              .build();
591
592            long size;
593            if (storeFile.hasFileSize()) {
594              size = storeFile.getFileSize();
595            } else {
596              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
597            }
598            files.add(new Pair<>(fileInfo, size));
599          }
600        }
601    });
602
603    return files;
604  }
605
606  /**
607   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
608   * The groups created will have similar amounts of bytes.
609   * <p>
610   * The algorithm used is pretty straightforward; the file list is sorted by size,
611   * and then each group fetch the bigger file available, iterating through groups
612   * alternating the direction.
613   */
614  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
615      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
616    // Sort files by size, from small to big
617    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
618      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
619        long r = a.getSecond() - b.getSecond();
620        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
621      }
622    });
623
624    // create balanced groups
625    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
626    long[] sizeGroups = new long[ngroups];
627    int hi = files.size() - 1;
628    int lo = 0;
629
630    List<Pair<SnapshotFileInfo, Long>> group;
631    int dir = 1;
632    int g = 0;
633
634    while (hi >= lo) {
635      if (g == fileGroups.size()) {
636        group = new LinkedList<>();
637        fileGroups.add(group);
638      } else {
639        group = fileGroups.get(g);
640      }
641
642      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
643
644      // add the hi one
645      sizeGroups[g] += fileInfo.getSecond();
646      group.add(fileInfo);
647
648      // change direction when at the end or the beginning
649      g += dir;
650      if (g == ngroups) {
651        dir = -1;
652        g = ngroups - 1;
653      } else if (g < 0) {
654        dir = 1;
655        g = 0;
656      }
657    }
658
659    if (LOG.isDebugEnabled()) {
660      for (int i = 0; i < sizeGroups.length; ++i) {
661        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
662      }
663    }
664
665    return fileGroups;
666  }
667
668  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
669    @Override
670    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
671        TaskAttemptContext tac) throws IOException, InterruptedException {
672      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
673    }
674
675    @Override
676    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
677      Configuration conf = context.getConfiguration();
678      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
679      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
680
681      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
682      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
683      if (mappers == 0 && snapshotFiles.size() > 0) {
684        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
685        mappers = Math.min(mappers, snapshotFiles.size());
686        conf.setInt(CONF_NUM_SPLITS, mappers);
687        conf.setInt(MR_NUM_MAPS, mappers);
688      }
689
690      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
691      List<InputSplit> splits = new ArrayList(groups.size());
692      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
693        splits.add(new ExportSnapshotInputSplit(files));
694      }
695      return splits;
696    }
697
698    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
699      private List<Pair<BytesWritable, Long>> files;
700      private long length;
701
702      public ExportSnapshotInputSplit() {
703        this.files = null;
704      }
705
706      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
707        this.files = new ArrayList(snapshotFiles.size());
708        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
709          this.files.add(new Pair<>(
710            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
711          this.length += fileInfo.getSecond();
712        }
713      }
714
715      private List<Pair<BytesWritable, Long>> getSplitKeys() {
716        return files;
717      }
718
719      @Override
720      public long getLength() throws IOException, InterruptedException {
721        return length;
722      }
723
724      @Override
725      public String[] getLocations() throws IOException, InterruptedException {
726        return new String[] {};
727      }
728
729      @Override
730      public void readFields(DataInput in) throws IOException {
731        int count = in.readInt();
732        files = new ArrayList<>(count);
733        length = 0;
734        for (int i = 0; i < count; ++i) {
735          BytesWritable fileInfo = new BytesWritable();
736          fileInfo.readFields(in);
737          long size = in.readLong();
738          files.add(new Pair<>(fileInfo, size));
739          length += size;
740        }
741      }
742
743      @Override
744      public void write(DataOutput out) throws IOException {
745        out.writeInt(files.size());
746        for (final Pair<BytesWritable, Long> fileInfo: files) {
747          fileInfo.getFirst().write(out);
748          out.writeLong(fileInfo.getSecond());
749        }
750      }
751    }
752
753    private static class ExportSnapshotRecordReader
754        extends RecordReader<BytesWritable, NullWritable> {
755      private final List<Pair<BytesWritable, Long>> files;
756      private long totalSize = 0;
757      private long procSize = 0;
758      private int index = -1;
759
760      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
761        this.files = files;
762        for (Pair<BytesWritable, Long> fileInfo: files) {
763          totalSize += fileInfo.getSecond();
764        }
765      }
766
767      @Override
768      public void close() { }
769
770      @Override
771      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
772
773      @Override
774      public NullWritable getCurrentValue() { return NullWritable.get(); }
775
776      @Override
777      public float getProgress() { return (float)procSize / totalSize; }
778
779      @Override
780      public void initialize(InputSplit split, TaskAttemptContext tac) { }
781
782      @Override
783      public boolean nextKeyValue() {
784        if (index >= 0) {
785          procSize += files.get(index).getSecond();
786        }
787        return(++index < files.size());
788      }
789    }
790  }
791
792  // ==========================================================================
793  //  Tool
794  // ==========================================================================
795
796  /**
797   * Run Map-Reduce Job to perform the files copy.
798   */
799  private void runCopyJob(final Path inputRoot, final Path outputRoot,
800      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
801      final String filesUser, final String filesGroup, final int filesMode,
802      final int mappers, final int bandwidthMB)
803          throws IOException, InterruptedException, ClassNotFoundException {
804    Configuration conf = getConf();
805    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
806    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
807    if (mappers > 0) {
808      conf.setInt(CONF_NUM_SPLITS, mappers);
809      conf.setInt(MR_NUM_MAPS, mappers);
810    }
811    conf.setInt(CONF_FILES_MODE, filesMode);
812    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
813    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
814    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
815    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
816    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
817    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
818
819    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
820    Job job = new Job(conf);
821    job.setJobName(jobname);
822    job.setJarByClass(ExportSnapshot.class);
823    TableMapReduceUtil.addDependencyJars(job);
824    job.setMapperClass(ExportMapper.class);
825    job.setInputFormatClass(ExportSnapshotInputFormat.class);
826    job.setOutputFormatClass(NullOutputFormat.class);
827    job.setMapSpeculativeExecution(false);
828    job.setNumReduceTasks(0);
829
830    // Acquire the delegation Tokens
831    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
832    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
833      new Path[] { inputRoot }, srcConf);
834    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
835    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
836        new Path[] { outputRoot }, destConf);
837
838    // Run the MR Job
839    if (!job.waitForCompletion(true)) {
840      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
841    }
842  }
843
844  private void verifySnapshot(final Configuration baseConf,
845      final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
846    // Update the conf with the current root dir, since may be a different cluster
847    Configuration conf = new Configuration(baseConf);
848    FSUtils.setRootDir(conf, rootDir);
849    FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
850    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
851    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
852  }
853
854  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
855      BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
856    ExecutorService pool = Executors
857        .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
858    List<Future<Void>> futures = new ArrayList<>();
859    for (Path dstPath : traversedPath) {
860      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
861      futures.add(future);
862    }
863    try {
864      for (Future<Void> future : futures) {
865        future.get();
866      }
867    } catch (InterruptedException | ExecutionException e) {
868      throw new IOException(e);
869    } finally {
870      pool.shutdownNow();
871    }
872  }
873
874  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
875      Configuration conf, List<Path> traversedPath) throws IOException {
876    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
877      try {
878        fs.setOwner(path, filesUser, filesGroup);
879      } catch (IOException e) {
880        throw new RuntimeException(
881            "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
882      }
883    }, conf);
884  }
885
886  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
887      final List<Path> traversedPath, final Configuration conf) throws IOException {
888    if (filesMode <= 0) {
889      return;
890    }
891    FsPermission perm = new FsPermission(filesMode);
892    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
893      try {
894        fs.setPermission(path, perm);
895      } catch (IOException e) {
896        throw new RuntimeException(
897            "set permission for file " + path + " to " + filesMode + " failed", e);
898      }
899    }, conf);
900  }
901
902  private boolean verifyTarget = true;
903  private boolean verifyChecksum = true;
904  private String snapshotName = null;
905  private String targetName = null;
906  private boolean overwrite = false;
907  private String filesGroup = null;
908  private String filesUser = null;
909  private Path outputRoot = null;
910  private Path inputRoot = null;
911  private int bandwidthMB = Integer.MAX_VALUE;
912  private int filesMode = 0;
913  private int mappers = 0;
914
915  @Override
916  protected void processOptions(CommandLine cmd) {
917    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
918    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
919    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
920      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
921    }
922    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
923      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
924    }
925    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
926    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
927    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
928    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode);
929    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
930    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
931    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
932    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
933    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
934  }
935
936  /**
937   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
938   * @return 0 on success, and != 0 upon failure.
939   */
940  @Override
941  public int doWork() throws IOException {
942    Configuration conf = getConf();
943
944    // Check user options
945    if (snapshotName == null) {
946      System.err.println("Snapshot name not provided.");
947      LOG.error("Use -h or --help for usage instructions.");
948      return 0;
949    }
950
951    if (outputRoot == null) {
952      System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt()
953              + ") not provided.");
954      LOG.error("Use -h or --help for usage instructions.");
955      return 0;
956    }
957
958    if (targetName == null) {
959      targetName = snapshotName;
960    }
961    if (inputRoot == null) {
962      inputRoot = FSUtils.getRootDir(conf);
963    } else {
964      FSUtils.setRootDir(conf, inputRoot);
965    }
966
967    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
968    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
969    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
970    LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
971    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
972    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
973    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
974    LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
975
976    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) ||
977        conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
978
979    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
980    Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot,
981        destConf);
982    Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
983    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
984
985    // Find the necessary directory which need to change owner and group
986    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
987    if (outputFs.exists(needSetOwnerDir)) {
988      if (skipTmp) {
989        needSetOwnerDir = outputSnapshotDir;
990      } else {
991        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
992        if (outputFs.exists(needSetOwnerDir)) {
993          needSetOwnerDir = snapshotTmpDir;
994        }
995      }
996    }
997
998    // Check if the snapshot already exists
999    if (outputFs.exists(outputSnapshotDir)) {
1000      if (overwrite) {
1001        if (!outputFs.delete(outputSnapshotDir, true)) {
1002          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1003          return 1;
1004        }
1005      } else {
1006        System.err.println("The snapshot '" + targetName +
1007          "' already exists in the destination: " + outputSnapshotDir);
1008        return 1;
1009      }
1010    }
1011
1012    if (!skipTmp) {
1013      // Check if the snapshot already in-progress
1014      if (outputFs.exists(snapshotTmpDir)) {
1015        if (overwrite) {
1016          if (!outputFs.delete(snapshotTmpDir, true)) {
1017            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
1018            return 1;
1019          }
1020        } else {
1021          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
1022          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
1023          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
1024          return 1;
1025        }
1026      }
1027    }
1028
1029    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
1030    // The snapshot references must be copied before the hfiles otherwise the cleaner
1031    // will remove them because they are unreferenced.
1032    List<Path> travesedPaths = new ArrayList<>();
1033    boolean copySucceeded = false;
1034    try {
1035      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1036      travesedPaths =
1037          FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1038              conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1039      copySucceeded = true;
1040    } catch (IOException e) {
1041      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
1042        snapshotDir + " to=" + initialOutputSnapshotDir, e);
1043    } finally {
1044      if (copySucceeded) {
1045        if (filesUser != null || filesGroup != null) {
1046          LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1047              + filesUser)
1048              + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1049                  + filesGroup));
1050          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1051        }
1052        if (filesMode > 0) {
1053          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1054          setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
1055        }
1056      }
1057    }
1058
1059    // Write a new .snapshotinfo if the target name is different from the source name
1060    if (!targetName.equals(snapshotName)) {
1061      SnapshotDescription snapshotDesc =
1062        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1063          .toBuilder()
1064          .setName(targetName)
1065          .build();
1066      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs);
1067      if (filesUser != null || filesGroup != null) {
1068        outputFs.setOwner(new Path(initialOutputSnapshotDir,
1069          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup);
1070      }
1071      if (filesMode > 0) {
1072        outputFs.setPermission(new Path(initialOutputSnapshotDir,
1073          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode));
1074      }
1075    }
1076
1077    // Step 2 - Start MR Job to copy files
1078    // The snapshot references must be copied before the files otherwise the files gets removed
1079    // by the HFileArchiver, since they have no references.
1080    try {
1081      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1082                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1083
1084      LOG.info("Finalize the Snapshot Export");
1085      if (!skipTmp) {
1086        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1087        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1088          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1089            snapshotTmpDir + " to=" + outputSnapshotDir);
1090        }
1091      }
1092
1093      // Step 4 - Verify snapshot integrity
1094      if (verifyTarget) {
1095        LOG.info("Verify snapshot integrity");
1096        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1097      }
1098
1099      LOG.info("Export Completed: " + targetName);
1100      return 0;
1101    } catch (Exception e) {
1102      LOG.error("Snapshot export failed", e);
1103      if (!skipTmp) {
1104        outputFs.delete(snapshotTmpDir, true);
1105      }
1106      outputFs.delete(outputSnapshotDir, true);
1107      return 1;
1108    } finally {
1109      IOUtils.closeStream(inputFs);
1110      IOUtils.closeStream(outputFs);
1111    }
1112  }
1113
1114  @Override
1115  protected void printUsage() {
1116    super.printUsage();
1117    System.out.println("\n"
1118        + "Examples:\n"
1119        + "  hbase snapshot export \\\n"
1120        + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1121        + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n"
1122        + "\n"
1123        + "  hbase snapshot export \\\n"
1124        + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1125        + "    --copy-to hdfs://srv1:50070/hbase");
1126  }
1127
1128  @Override protected void addOptions() {
1129    addRequiredOption(Options.SNAPSHOT);
1130    addOption(Options.COPY_TO);
1131    addOption(Options.COPY_FROM);
1132    addOption(Options.TARGET_NAME);
1133    addOption(Options.NO_CHECKSUM_VERIFY);
1134    addOption(Options.NO_TARGET_VERIFY);
1135    addOption(Options.OVERWRITE);
1136    addOption(Options.CHUSER);
1137    addOption(Options.CHGROUP);
1138    addOption(Options.CHMOD);
1139    addOption(Options.MAPPERS);
1140    addOption(Options.BANDWIDTH);
1141  }
1142
1143  public static void main(String[] args) {
1144    new ExportSnapshot().doStaticMain(args);
1145  }
1146}