001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.snapshot;
020
021import java.io.BufferedInputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.function.BiConsumer;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FSDataInputStream;
039import org.apache.hadoop.fs.FSDataOutputStream;
040import org.apache.hadoop.fs.FileChecksum;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.fs.permission.FsPermission;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.io.FileLink;
050import org.apache.hadoop.hbase.io.HFileLink;
051import org.apache.hadoop.hbase.io.WALLink;
052import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
053import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
054import org.apache.hadoop.hbase.mob.MobUtils;
055import org.apache.hadoop.hbase.util.AbstractHBaseTool;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.FSUtils;
059import org.apache.hadoop.hbase.util.HFileArchiveUtil;
060import org.apache.hadoop.hbase.util.Pair;
061import org.apache.hadoop.io.BytesWritable;
062import org.apache.hadoop.io.IOUtils;
063import org.apache.hadoop.io.NullWritable;
064import org.apache.hadoop.io.Writable;
065import org.apache.hadoop.mapreduce.InputFormat;
066import org.apache.hadoop.mapreduce.InputSplit;
067import org.apache.hadoop.mapreduce.Job;
068import org.apache.hadoop.mapreduce.JobContext;
069import org.apache.hadoop.mapreduce.Mapper;
070import org.apache.hadoop.mapreduce.RecordReader;
071import org.apache.hadoop.mapreduce.TaskAttemptContext;
072import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
073import org.apache.hadoop.mapreduce.security.TokenCache;
074import org.apache.hadoop.util.StringUtils;
075import org.apache.hadoop.util.Tool;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
081import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
082
083import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
086
087/**
088 * Export the specified snapshot to a given FileSystem.
089 *
090 * The .snapshot/name folder is copied to the destination cluster
091 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
092 * When everything is done, the second cluster can restore the snapshot.
093 */
094@InterfaceAudience.Public
095public class ExportSnapshot extends AbstractHBaseTool implements Tool {
096  public static final String NAME = "exportsnapshot";
097  /** Configuration prefix for overrides for the source filesystem */
098  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
099  /** Configuration prefix for overrides for the destination filesystem */
100  public static final String CONF_DEST_PREFIX = NAME + ".to.";
101
102  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
103
104  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
105  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
106  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
107  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
108  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
109  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
110  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
111  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
112  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
113  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
114  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
115  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
116  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
117  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
118  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
119  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
120  private static final String CONF_COPY_MANIFEST_THREADS =
121      "snapshot.export.copy.references.threads";
122  private static final int DEFAULT_COPY_MANIFEST_THREADS =
123      Runtime.getRuntime().availableProcessors();
124
125  static class Testing {
126    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
127    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
128    int failuresCountToInject = 0;
129    int injectedFailureCount = 0;
130  }
131
132  // Command line options and defaults.
133  static final class Options {
134    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
135    static final Option TARGET_NAME = new Option(null, "target", true,
136        "Target name for the snapshot.");
137    static final Option COPY_TO = new Option(null, "copy-to", true, "Remote "
138        + "destination hdfs://");
139    static final Option COPY_FROM = new Option(null, "copy-from", true,
140        "Input folder hdfs:// (default hbase.rootdir)");
141    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
142        "Do not verify checksum, use name+length only.");
143    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
144        "Do not verify the integrity of the exported snapshot.");
145    static final Option OVERWRITE = new Option(null, "overwrite", false,
146        "Rewrite the snapshot manifest if already exists.");
147    static final Option CHUSER = new Option(null, "chuser", true,
148        "Change the owner of the files to the specified one.");
149    static final Option CHGROUP = new Option(null, "chgroup", true,
150        "Change the group of the files to the specified one.");
151    static final Option CHMOD = new Option(null, "chmod", true,
152        "Change the permission of the files to the specified one.");
153    static final Option MAPPERS = new Option(null, "mappers", true,
154        "Number of mappers to use during the copy (mapreduce.job.maps).");
155    static final Option BANDWIDTH = new Option(null, "bandwidth", true,
156        "Limit bandwidth to this value in MB/second.");
157  }
158
159  // Export Map-Reduce Counters, to keep track of the progress
160  public enum Counter {
161    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
162    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
163  }
164
165  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
166                                                   NullWritable, NullWritable> {
167    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
168    final static int REPORT_SIZE = 1 * 1024 * 1024;
169    final static int BUFFER_SIZE = 64 * 1024;
170
171    private boolean verifyChecksum;
172    private String filesGroup;
173    private String filesUser;
174    private short filesMode;
175    private int bufferSize;
176    private int reportSize;
177
178    private FileSystem outputFs;
179    private Path outputArchive;
180    private Path outputRoot;
181
182    private FileSystem inputFs;
183    private Path inputArchive;
184    private Path inputRoot;
185
186    private static Testing testing = new Testing();
187
188    @Override
189    public void setup(Context context) throws IOException {
190      Configuration conf = context.getConfiguration();
191
192      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
193      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
194
195      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
196
197      filesGroup = conf.get(CONF_FILES_GROUP);
198      filesUser = conf.get(CONF_FILES_USER);
199      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
200      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
201      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
202
203      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
204      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
205
206      try {
207        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
208        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
209      } catch (IOException e) {
210        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
211      }
212
213      try {
214        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
215        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
216      } catch (IOException e) {
217        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
218      }
219
220      // Use the default block size of the outputFs if bigger
221      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
222      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
223      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
224      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
225
226      for (Counter c : Counter.values()) {
227        context.getCounter(c).increment(0);
228      }
229      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
230        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
231        // Get number of times we have already injected failure based on attempt number of this
232        // task.
233        testing.injectedFailureCount = context.getTaskAttemptID().getId();
234      }
235    }
236
237    @Override
238    protected void cleanup(Context context) {
239      IOUtils.closeStream(inputFs);
240      IOUtils.closeStream(outputFs);
241    }
242
243    @Override
244    public void map(BytesWritable key, NullWritable value, Context context)
245        throws InterruptedException, IOException {
246      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
247      Path outputPath = getOutputPath(inputInfo);
248
249      copyFile(context, inputInfo, outputPath);
250    }
251
252    /**
253     * Returns the location where the inputPath will be copied.
254     */
255    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
256      Path path = null;
257      switch (inputInfo.getType()) {
258        case HFILE:
259          Path inputPath = new Path(inputInfo.getHfile());
260          String family = inputPath.getParent().getName();
261          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
262          String region = HFileLink.getReferencedRegionName(inputPath.getName());
263          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
264          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
265              new Path(region, new Path(family, hfile)));
266          break;
267        case WAL:
268          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
269          break;
270        default:
271          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
272      }
273      return new Path(outputArchive, path);
274    }
275
276    @SuppressWarnings("checkstyle:linelength")
277    /**
278     * Used by TestExportSnapshot to test for retries when failures happen.
279     * Failure is injected in {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
280     */
281    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
282        throws IOException {
283      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
284      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
285      testing.injectedFailureCount++;
286      context.getCounter(Counter.COPY_FAILED).increment(1);
287      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
288      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
289          testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
290    }
291
292    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
293        final Path outputPath) throws IOException {
294      // Get the file information
295      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
296
297      // Verify if the output file exists and is the same that we want to copy
298      if (outputFs.exists(outputPath)) {
299        FileStatus outputStat = outputFs.getFileStatus(outputPath);
300        if (outputStat != null && sameFile(inputStat, outputStat)) {
301          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
302          context.getCounter(Counter.FILES_SKIPPED).increment(1);
303          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
304          return;
305        }
306      }
307
308      InputStream in = openSourceFile(context, inputInfo);
309      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
310      if (Integer.MAX_VALUE != bandwidthMB) {
311        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
312      }
313
314      try {
315        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
316
317        // Ensure that the output folder is there and copy the file
318        createOutputPath(outputPath.getParent());
319        FSDataOutputStream out = outputFs.create(outputPath, true);
320        try {
321          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
322        } finally {
323          out.close();
324        }
325
326        // Try to Preserve attributes
327        if (!preserveAttributes(outputPath, inputStat)) {
328          LOG.warn("You may have to run manually chown on: " + outputPath);
329        }
330      } finally {
331        in.close();
332        injectTestFailure(context, inputInfo);
333      }
334    }
335
336    /**
337     * Create the output folder and optionally set ownership.
338     */
339    private void createOutputPath(final Path path) throws IOException {
340      if (filesUser == null && filesGroup == null) {
341        outputFs.mkdirs(path);
342      } else {
343        Path parent = path.getParent();
344        if (!outputFs.exists(parent) && !parent.isRoot()) {
345          createOutputPath(parent);
346        }
347        outputFs.mkdirs(path);
348        if (filesUser != null || filesGroup != null) {
349          // override the owner when non-null user/group is specified
350          outputFs.setOwner(path, filesUser, filesGroup);
351        }
352        if (filesMode > 0) {
353          outputFs.setPermission(path, new FsPermission(filesMode));
354        }
355      }
356    }
357
358    /**
359     * Try to Preserve the files attribute selected by the user copying them from the source file
360     * This is only required when you are exporting as a different user than "hbase" or on a system
361     * that doesn't have the "hbase" user.
362     *
363     * This is not considered a blocking failure since the user can force a chmod with the user
364     * that knows is available on the system.
365     */
366    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
367      FileStatus stat;
368      try {
369        stat = outputFs.getFileStatus(path);
370      } catch (IOException e) {
371        LOG.warn("Unable to get the status for file=" + path);
372        return false;
373      }
374
375      try {
376        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
377          outputFs.setPermission(path, new FsPermission(filesMode));
378        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
379          outputFs.setPermission(path, refStat.getPermission());
380        }
381      } catch (IOException e) {
382        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
383        return false;
384      }
385
386      boolean hasRefStat = (refStat != null);
387      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
388      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
389      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
390        try {
391          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
392            outputFs.setOwner(path, user, group);
393          }
394        } catch (IOException e) {
395          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
396          LOG.warn("The user/group may not exist on the destination cluster: user=" +
397                   user + " group=" + group);
398          return false;
399        }
400      }
401
402      return true;
403    }
404
405    private boolean stringIsNotEmpty(final String str) {
406      return str != null && str.length() > 0;
407    }
408
409    private void copyData(final Context context,
410        final Path inputPath, final InputStream in,
411        final Path outputPath, final FSDataOutputStream out,
412        final long inputFileSize)
413        throws IOException {
414      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
415                                   " (%.1f%%)";
416
417      try {
418        byte[] buffer = new byte[bufferSize];
419        long totalBytesWritten = 0;
420        int reportBytes = 0;
421        int bytesRead;
422
423        long stime = EnvironmentEdgeManager.currentTime();
424        while ((bytesRead = in.read(buffer)) > 0) {
425          out.write(buffer, 0, bytesRead);
426          totalBytesWritten += bytesRead;
427          reportBytes += bytesRead;
428
429          if (reportBytes >= reportSize) {
430            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
431            context.setStatus(String.format(statusMessage,
432                              StringUtils.humanReadableInt(totalBytesWritten),
433                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
434                              " from " + inputPath + " to " + outputPath);
435            reportBytes = 0;
436          }
437        }
438        long etime = EnvironmentEdgeManager.currentTime();
439
440        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
441        context.setStatus(String.format(statusMessage,
442                          StringUtils.humanReadableInt(totalBytesWritten),
443                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
444                          " from " + inputPath + " to " + outputPath);
445
446        // Verify that the written size match
447        if (totalBytesWritten != inputFileSize) {
448          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
449                       " expected=" + inputFileSize + " for file=" + inputPath;
450          throw new IOException(msg);
451        }
452
453        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
454        LOG.info("size=" + totalBytesWritten +
455            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
456            " time=" + StringUtils.formatTimeDiff(etime, stime) +
457            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
458        context.getCounter(Counter.FILES_COPIED).increment(1);
459      } catch (IOException e) {
460        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
461        context.getCounter(Counter.COPY_FAILED).increment(1);
462        throw e;
463      }
464    }
465
466    /**
467     * Try to open the "source" file.
468     * Throws an IOException if the communication with the inputFs fail or
469     * if the file is not found.
470     */
471    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
472            throws IOException {
473      try {
474        Configuration conf = context.getConfiguration();
475        FileLink link = null;
476        switch (fileInfo.getType()) {
477          case HFILE:
478            Path inputPath = new Path(fileInfo.getHfile());
479            link = getFileLink(inputPath, conf);
480            break;
481          case WAL:
482            String serverName = fileInfo.getWalServer();
483            String logName = fileInfo.getWalName();
484            link = new WALLink(inputRoot, serverName, logName);
485            break;
486          default:
487            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
488        }
489        return link.open(inputFs);
490      } catch (IOException e) {
491        context.getCounter(Counter.MISSING_FILES).increment(1);
492        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
493        throw e;
494      }
495    }
496
497    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
498        throws IOException {
499      try {
500        Configuration conf = context.getConfiguration();
501        FileLink link = null;
502        switch (fileInfo.getType()) {
503          case HFILE:
504            Path inputPath = new Path(fileInfo.getHfile());
505            link = getFileLink(inputPath, conf);
506            break;
507          case WAL:
508            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
509            break;
510          default:
511            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
512        }
513        return link.getFileStatus(inputFs);
514      } catch (FileNotFoundException e) {
515        context.getCounter(Counter.MISSING_FILES).increment(1);
516        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
517        throw e;
518      } catch (IOException e) {
519        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
520        throw e;
521      }
522    }
523
524    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
525      String regionName = HFileLink.getReferencedRegionName(path.getName());
526      TableName tableName = HFileLink.getReferencedTableName(path.getName());
527      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
528        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
529                HFileArchiveUtil.getArchivePath(conf), path);
530      }
531      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
532    }
533
534    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
535      try {
536        return fs.getFileChecksum(path);
537      } catch (IOException e) {
538        LOG.warn("Unable to get checksum for file=" + path, e);
539        return null;
540      }
541    }
542
543    /**
544     * Check if the two files are equal by looking at the file length,
545     * and at the checksum (if user has specified the verifyChecksum flag).
546     */
547    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
548      // Not matching length
549      if (inputStat.getLen() != outputStat.getLen()) return false;
550
551      // Mark files as equals, since user asked for no checksum verification
552      if (!verifyChecksum) return true;
553
554      // If checksums are not available, files are not the same.
555      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
556      if (inChecksum == null) return false;
557
558      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
559      if (outChecksum == null) return false;
560
561      return inChecksum.equals(outChecksum);
562    }
563  }
564
565  // ==========================================================================
566  //  Input Format
567  // ==========================================================================
568
569  /**
570   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
571   * @return list of files referenced by the snapshot (pair of path and size)
572   */
573  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
574      final FileSystem fs, final Path snapshotDir) throws IOException {
575    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
576
577    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
578    final TableName table = TableName.valueOf(snapshotDesc.getTable());
579
580    // Get snapshot files
581    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
582    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
583      new SnapshotReferenceUtil.SnapshotVisitor() {
584        @Override
585        public void storeFile(final RegionInfo regionInfo, final String family,
586            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
587          // for storeFile.hasReference() case, copied as part of the manifest
588          if (!storeFile.hasReference()) {
589            String region = regionInfo.getEncodedName();
590            String hfile = storeFile.getName();
591            Path path = HFileLink.createPath(table, region, family, hfile);
592
593            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
594              .setType(SnapshotFileInfo.Type.HFILE)
595              .setHfile(path.toString())
596              .build();
597
598            long size;
599            if (storeFile.hasFileSize()) {
600              size = storeFile.getFileSize();
601            } else {
602              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
603            }
604            files.add(new Pair<>(fileInfo, size));
605          }
606        }
607    });
608
609    return files;
610  }
611
612  /**
613   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
614   * The groups created will have similar amounts of bytes.
615   * <p>
616   * The algorithm used is pretty straightforward; the file list is sorted by size,
617   * and then each group fetch the bigger file available, iterating through groups
618   * alternating the direction.
619   */
620  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
621      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
622    // Sort files by size, from small to big
623    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
624      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
625        long r = a.getSecond() - b.getSecond();
626        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
627      }
628    });
629
630    // create balanced groups
631    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
632    long[] sizeGroups = new long[ngroups];
633    int hi = files.size() - 1;
634    int lo = 0;
635
636    List<Pair<SnapshotFileInfo, Long>> group;
637    int dir = 1;
638    int g = 0;
639
640    while (hi >= lo) {
641      if (g == fileGroups.size()) {
642        group = new LinkedList<>();
643        fileGroups.add(group);
644      } else {
645        group = fileGroups.get(g);
646      }
647
648      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
649
650      // add the hi one
651      sizeGroups[g] += fileInfo.getSecond();
652      group.add(fileInfo);
653
654      // change direction when at the end or the beginning
655      g += dir;
656      if (g == ngroups) {
657        dir = -1;
658        g = ngroups - 1;
659      } else if (g < 0) {
660        dir = 1;
661        g = 0;
662      }
663    }
664
665    if (LOG.isDebugEnabled()) {
666      for (int i = 0; i < sizeGroups.length; ++i) {
667        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
668      }
669    }
670
671    return fileGroups;
672  }
673
674  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
675    @Override
676    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
677        TaskAttemptContext tac) throws IOException, InterruptedException {
678      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
679    }
680
681    @Override
682    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
683      Configuration conf = context.getConfiguration();
684      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
685      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
686
687      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
688      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
689      if (mappers == 0 && snapshotFiles.size() > 0) {
690        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
691        mappers = Math.min(mappers, snapshotFiles.size());
692        conf.setInt(CONF_NUM_SPLITS, mappers);
693        conf.setInt(MR_NUM_MAPS, mappers);
694      }
695
696      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
697      List<InputSplit> splits = new ArrayList(groups.size());
698      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
699        splits.add(new ExportSnapshotInputSplit(files));
700      }
701      return splits;
702    }
703
704    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
705      private List<Pair<BytesWritable, Long>> files;
706      private long length;
707
708      public ExportSnapshotInputSplit() {
709        this.files = null;
710      }
711
712      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
713        this.files = new ArrayList(snapshotFiles.size());
714        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
715          this.files.add(new Pair<>(
716            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
717          this.length += fileInfo.getSecond();
718        }
719      }
720
721      private List<Pair<BytesWritable, Long>> getSplitKeys() {
722        return files;
723      }
724
725      @Override
726      public long getLength() throws IOException, InterruptedException {
727        return length;
728      }
729
730      @Override
731      public String[] getLocations() throws IOException, InterruptedException {
732        return new String[] {};
733      }
734
735      @Override
736      public void readFields(DataInput in) throws IOException {
737        int count = in.readInt();
738        files = new ArrayList<>(count);
739        length = 0;
740        for (int i = 0; i < count; ++i) {
741          BytesWritable fileInfo = new BytesWritable();
742          fileInfo.readFields(in);
743          long size = in.readLong();
744          files.add(new Pair<>(fileInfo, size));
745          length += size;
746        }
747      }
748
749      @Override
750      public void write(DataOutput out) throws IOException {
751        out.writeInt(files.size());
752        for (final Pair<BytesWritable, Long> fileInfo: files) {
753          fileInfo.getFirst().write(out);
754          out.writeLong(fileInfo.getSecond());
755        }
756      }
757    }
758
759    private static class ExportSnapshotRecordReader
760        extends RecordReader<BytesWritable, NullWritable> {
761      private final List<Pair<BytesWritable, Long>> files;
762      private long totalSize = 0;
763      private long procSize = 0;
764      private int index = -1;
765
766      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
767        this.files = files;
768        for (Pair<BytesWritable, Long> fileInfo: files) {
769          totalSize += fileInfo.getSecond();
770        }
771      }
772
773      @Override
774      public void close() { }
775
776      @Override
777      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
778
779      @Override
780      public NullWritable getCurrentValue() { return NullWritable.get(); }
781
782      @Override
783      public float getProgress() { return (float)procSize / totalSize; }
784
785      @Override
786      public void initialize(InputSplit split, TaskAttemptContext tac) { }
787
788      @Override
789      public boolean nextKeyValue() {
790        if (index >= 0) {
791          procSize += files.get(index).getSecond();
792        }
793        return(++index < files.size());
794      }
795    }
796  }
797
798  // ==========================================================================
799  //  Tool
800  // ==========================================================================
801
802  /**
803   * Run Map-Reduce Job to perform the files copy.
804   */
805  private void runCopyJob(final Path inputRoot, final Path outputRoot,
806      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
807      final String filesUser, final String filesGroup, final int filesMode,
808      final int mappers, final int bandwidthMB)
809          throws IOException, InterruptedException, ClassNotFoundException {
810    Configuration conf = getConf();
811    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
812    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
813    if (mappers > 0) {
814      conf.setInt(CONF_NUM_SPLITS, mappers);
815      conf.setInt(MR_NUM_MAPS, mappers);
816    }
817    conf.setInt(CONF_FILES_MODE, filesMode);
818    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
819    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
820    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
821    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
822    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
823    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
824
825    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
826    Job job = new Job(conf);
827    job.setJobName(jobname);
828    job.setJarByClass(ExportSnapshot.class);
829    TableMapReduceUtil.addDependencyJars(job);
830    job.setMapperClass(ExportMapper.class);
831    job.setInputFormatClass(ExportSnapshotInputFormat.class);
832    job.setOutputFormatClass(NullOutputFormat.class);
833    job.setMapSpeculativeExecution(false);
834    job.setNumReduceTasks(0);
835
836    // Acquire the delegation Tokens
837    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
838    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
839      new Path[] { inputRoot }, srcConf);
840    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
841    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
842        new Path[] { outputRoot }, destConf);
843
844    // Run the MR Job
845    if (!job.waitForCompletion(true)) {
846      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
847    }
848  }
849
850  private void verifySnapshot(final Configuration baseConf,
851      final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
852    // Update the conf with the current root dir, since may be a different cluster
853    Configuration conf = new Configuration(baseConf);
854    CommonFSUtils.setRootDir(conf, rootDir);
855    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
856    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
857    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
858  }
859
860  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
861      BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
862    ExecutorService pool = Executors
863        .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
864    List<Future<Void>> futures = new ArrayList<>();
865    for (Path dstPath : traversedPath) {
866      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
867      futures.add(future);
868    }
869    try {
870      for (Future<Void> future : futures) {
871        future.get();
872      }
873    } catch (InterruptedException | ExecutionException e) {
874      throw new IOException(e);
875    } finally {
876      pool.shutdownNow();
877    }
878  }
879
880  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
881      Configuration conf, List<Path> traversedPath) throws IOException {
882    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
883      try {
884        fs.setOwner(path, filesUser, filesGroup);
885      } catch (IOException e) {
886        throw new RuntimeException(
887            "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
888      }
889    }, conf);
890  }
891
892  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
893      final List<Path> traversedPath, final Configuration conf) throws IOException {
894    if (filesMode <= 0) {
895      return;
896    }
897    FsPermission perm = new FsPermission(filesMode);
898    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
899      try {
900        fs.setPermission(path, perm);
901      } catch (IOException e) {
902        throw new RuntimeException(
903            "set permission for file " + path + " to " + filesMode + " failed", e);
904      }
905    }, conf);
906  }
907
908  private boolean verifyTarget = true;
909  private boolean verifyChecksum = true;
910  private String snapshotName = null;
911  private String targetName = null;
912  private boolean overwrite = false;
913  private String filesGroup = null;
914  private String filesUser = null;
915  private Path outputRoot = null;
916  private Path inputRoot = null;
917  private int bandwidthMB = Integer.MAX_VALUE;
918  private int filesMode = 0;
919  private int mappers = 0;
920
921  @Override
922  protected void processOptions(CommandLine cmd) {
923    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
924    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
925    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
926      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
927    }
928    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
929      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
930    }
931    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
932    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
933    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
934    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
935    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
936    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
937    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
938    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
939    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
940  }
941
942  /**
943   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
944   * @return 0 on success, and != 0 upon failure.
945   */
946  @Override
947  public int doWork() throws IOException {
948    Configuration conf = getConf();
949
950    // Check user options
951    if (snapshotName == null) {
952      System.err.println("Snapshot name not provided.");
953      LOG.error("Use -h or --help for usage instructions.");
954      return 0;
955    }
956
957    if (outputRoot == null) {
958      System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt()
959              + ") not provided.");
960      LOG.error("Use -h or --help for usage instructions.");
961      return 0;
962    }
963
964    if (targetName == null) {
965      targetName = snapshotName;
966    }
967    if (inputRoot == null) {
968      inputRoot = CommonFSUtils.getRootDir(conf);
969    } else {
970      CommonFSUtils.setRootDir(conf, inputRoot);
971    }
972
973    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
974    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
975    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
976    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
977    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
978    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
979    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false) ||
980        conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
981    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
982    Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot,
983        destConf);
984    Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
985    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
986    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
987    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}",
988      outputFs, outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
989
990    // Find the necessary directory which need to change owner and group
991    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
992    if (outputFs.exists(needSetOwnerDir)) {
993      if (skipTmp) {
994        needSetOwnerDir = outputSnapshotDir;
995      } else {
996        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
997        if (outputFs.exists(needSetOwnerDir)) {
998          needSetOwnerDir = snapshotTmpDir;
999        }
1000      }
1001    }
1002
1003    // Check if the snapshot already exists
1004    if (outputFs.exists(outputSnapshotDir)) {
1005      if (overwrite) {
1006        if (!outputFs.delete(outputSnapshotDir, true)) {
1007          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1008          return 1;
1009        }
1010      } else {
1011        System.err.println("The snapshot '" + targetName +
1012          "' already exists in the destination: " + outputSnapshotDir);
1013        return 1;
1014      }
1015    }
1016
1017    if (!skipTmp) {
1018      // Check if the snapshot already in-progress
1019      if (outputFs.exists(snapshotTmpDir)) {
1020        if (overwrite) {
1021          if (!outputFs.delete(snapshotTmpDir, true)) {
1022            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
1023            return 1;
1024          }
1025        } else {
1026          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
1027          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
1028          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
1029          return 1;
1030        }
1031      }
1032    }
1033
1034    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
1035    // The snapshot references must be copied before the hfiles otherwise the cleaner
1036    // will remove them because they are unreferenced.
1037    List<Path> travesedPaths = new ArrayList<>();
1038    boolean copySucceeded = false;
1039    try {
1040      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1041      travesedPaths =
1042          FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1043              conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1044      copySucceeded = true;
1045    } catch (IOException e) {
1046      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
1047        snapshotDir + " to=" + initialOutputSnapshotDir, e);
1048    } finally {
1049      if (copySucceeded) {
1050        if (filesUser != null || filesGroup != null) {
1051          LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1052              + filesUser)
1053              + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1054                  + filesGroup));
1055          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1056        }
1057        if (filesMode > 0) {
1058          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1059          setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
1060        }
1061      }
1062    }
1063
1064    // Write a new .snapshotinfo if the target name is different from the source name
1065    if (!targetName.equals(snapshotName)) {
1066      SnapshotDescription snapshotDesc =
1067        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1068          .toBuilder()
1069          .setName(targetName)
1070          .build();
1071      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs);
1072      if (filesUser != null || filesGroup != null) {
1073        outputFs.setOwner(new Path(initialOutputSnapshotDir,
1074          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup);
1075      }
1076      if (filesMode > 0) {
1077        outputFs.setPermission(new Path(initialOutputSnapshotDir,
1078          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode));
1079      }
1080    }
1081
1082    // Step 2 - Start MR Job to copy files
1083    // The snapshot references must be copied before the files otherwise the files gets removed
1084    // by the HFileArchiver, since they have no references.
1085    try {
1086      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1087                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1088
1089      LOG.info("Finalize the Snapshot Export");
1090      if (!skipTmp) {
1091        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1092        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1093          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1094            snapshotTmpDir + " to=" + outputSnapshotDir);
1095        }
1096      }
1097
1098      // Step 4 - Verify snapshot integrity
1099      if (verifyTarget) {
1100        LOG.info("Verify snapshot integrity");
1101        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1102      }
1103
1104      LOG.info("Export Completed: " + targetName);
1105      return 0;
1106    } catch (Exception e) {
1107      LOG.error("Snapshot export failed", e);
1108      if (!skipTmp) {
1109        outputFs.delete(snapshotTmpDir, true);
1110      }
1111      outputFs.delete(outputSnapshotDir, true);
1112      return 1;
1113    } finally {
1114      IOUtils.closeStream(inputFs);
1115      IOUtils.closeStream(outputFs);
1116    }
1117  }
1118
1119  @Override
1120  protected void printUsage() {
1121    super.printUsage();
1122    System.out.println("\n"
1123        + "Examples:\n"
1124        + "  hbase snapshot export \\\n"
1125        + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1126        + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n"
1127        + "\n"
1128        + "  hbase snapshot export \\\n"
1129        + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1130        + "    --copy-to hdfs://srv1:50070/hbase");
1131  }
1132
1133  @Override protected void addOptions() {
1134    addRequiredOption(Options.SNAPSHOT);
1135    addOption(Options.COPY_TO);
1136    addOption(Options.COPY_FROM);
1137    addOption(Options.TARGET_NAME);
1138    addOption(Options.NO_CHECKSUM_VERIFY);
1139    addOption(Options.NO_TARGET_VERIFY);
1140    addOption(Options.OVERWRITE);
1141    addOption(Options.CHUSER);
1142    addOption(Options.CHGROUP);
1143    addOption(Options.CHMOD);
1144    addOption(Options.MAPPERS);
1145    addOption(Options.BANDWIDTH);
1146  }
1147
1148  public static void main(String[] args) {
1149    new ExportSnapshot().doStaticMain(args);
1150  }
1151}