001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.Executors;
038import java.util.concurrent.Future;
039import java.util.function.BiConsumer;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileChecksum;
044import org.apache.hadoop.fs.FileStatus;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.permission.FsPermission;
048import org.apache.hadoop.hbase.HBaseConfiguration;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.io.FileLink;
053import org.apache.hadoop.hbase.io.HFileLink;
054import org.apache.hadoop.hbase.io.WALLink;
055import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
056import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
057import org.apache.hadoop.hbase.mob.MobUtils;
058import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
059import org.apache.hadoop.hbase.util.AbstractHBaseTool;
060import org.apache.hadoop.hbase.util.CommonFSUtils;
061import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
062import org.apache.hadoop.hbase.util.FSUtils;
063import org.apache.hadoop.hbase.util.HFileArchiveUtil;
064import org.apache.hadoop.hbase.util.Pair;
065import org.apache.hadoop.hbase.util.Strings;
066import org.apache.hadoop.io.BytesWritable;
067import org.apache.hadoop.io.NullWritable;
068import org.apache.hadoop.io.Writable;
069import org.apache.hadoop.mapreduce.InputFormat;
070import org.apache.hadoop.mapreduce.InputSplit;
071import org.apache.hadoop.mapreduce.Job;
072import org.apache.hadoop.mapreduce.JobContext;
073import org.apache.hadoop.mapreduce.Mapper;
074import org.apache.hadoop.mapreduce.RecordReader;
075import org.apache.hadoop.mapreduce.TaskAttemptContext;
076import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
077import org.apache.hadoop.mapreduce.security.TokenCache;
078import org.apache.hadoop.util.StringUtils;
079import org.apache.hadoop.util.Tool;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
085import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
091
092/**
093 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
094 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
095 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
096 */
097@InterfaceAudience.Public
098public class ExportSnapshot extends AbstractHBaseTool implements Tool {
099  public static final String NAME = "exportsnapshot";
100  /** Configuration prefix for overrides for the source filesystem */
101  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
102  /** Configuration prefix for overrides for the destination filesystem */
103  public static final String CONF_DEST_PREFIX = NAME + ".to.";
104
105  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
106
107  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
108  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
109  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
110  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
111  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
112  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
113  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
114  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
115  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
116  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
117  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
118  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
119  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
120  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
121  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
122  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
123  private static final String CONF_COPY_MANIFEST_THREADS =
124    "snapshot.export.copy.references.threads";
125  private static final int DEFAULT_COPY_MANIFEST_THREADS =
126    Runtime.getRuntime().availableProcessors();
127  private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family";
128
129  static class Testing {
130    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
131    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
132    int failuresCountToInject = 0;
133    int injectedFailureCount = 0;
134  }
135
136  // Command line options and defaults.
137  static final class Options {
138    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
139    static final Option TARGET_NAME =
140      new Option(null, "target", true, "Target name for the snapshot.");
141    static final Option COPY_TO =
142      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
143    static final Option COPY_FROM =
144      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
145    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
146      "Do not verify checksum, use name+length only.");
147    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
148      "Do not verify the exported snapshot's expiration status and integrity.");
149    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
150      "Do not verify the source snapshot's expiration status and integrity.");
151    static final Option OVERWRITE =
152      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
153    static final Option CHUSER =
154      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
155    static final Option CHGROUP =
156      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
157    static final Option CHMOD =
158      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
159    static final Option MAPPERS = new Option(null, "mappers", true,
160      "Number of mappers to use during the copy (mapreduce.job.maps).");
161    static final Option BANDWIDTH =
162      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
163    static final Option RESET_TTL =
164      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
165    static final Option STORAGE_POLICY = new Option(null, "storage-policy", true,
166      "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD");
167  }
168
169  // Export Map-Reduce Counters, to keep track of the progress
170  public enum Counter {
171    MISSING_FILES,
172    FILES_COPIED,
173    FILES_SKIPPED,
174    COPY_FAILED,
175    BYTES_EXPECTED,
176    BYTES_SKIPPED,
177    BYTES_COPIED
178  }
179
180  /**
181   * Indicates the checksum comparison result.
182   */
183  public enum ChecksumComparison {
184    TRUE, // checksum comparison is compatible and true.
185    FALSE, // checksum comparison is compatible and false.
186    INCOMPATIBLE, // checksum comparison is not compatible.
187  }
188
189  private static class ExportMapper
190    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
191    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
192    final static int REPORT_SIZE = 1 * 1024 * 1024;
193    final static int BUFFER_SIZE = 64 * 1024;
194
195    private boolean verifyChecksum;
196    private String filesGroup;
197    private String filesUser;
198    private short filesMode;
199    private int bufferSize;
200    private int reportSize;
201
202    private FileSystem outputFs;
203    private Path outputArchive;
204    private Path outputRoot;
205
206    private FileSystem inputFs;
207    private Path inputArchive;
208    private Path inputRoot;
209
210    private static Testing testing = new Testing();
211
212    @Override
213    public void setup(Context context) throws IOException {
214      Configuration conf = context.getConfiguration();
215
216      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
217      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
218
219      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
220
221      filesGroup = conf.get(CONF_FILES_GROUP);
222      filesUser = conf.get(CONF_FILES_USER);
223      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
224      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
225      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
226
227      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
228      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
229
230      try {
231        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
232      } catch (IOException e) {
233        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
234      }
235
236      try {
237        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
238      } catch (IOException e) {
239        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
240      }
241
242      // Use the default block size of the outputFs if bigger
243      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
244      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
245      LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize));
246      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
247
248      for (Counter c : Counter.values()) {
249        context.getCounter(c).increment(0);
250      }
251      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
252        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
253        // Get number of times we have already injected failure based on attempt number of this
254        // task.
255        testing.injectedFailureCount = context.getTaskAttemptID().getId();
256      }
257    }
258
259    @Override
260    public void map(BytesWritable key, NullWritable value, Context context)
261      throws InterruptedException, IOException {
262      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
263      Path outputPath = getOutputPath(inputInfo);
264
265      copyFile(context, inputInfo, outputPath);
266    }
267
268    /**
269     * Returns the location where the inputPath will be copied.
270     */
271    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
272      Path path = null;
273      switch (inputInfo.getType()) {
274        case HFILE:
275          Path inputPath = new Path(inputInfo.getHfile());
276          String family = inputPath.getParent().getName();
277          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
278          String region = HFileLink.getReferencedRegionName(inputPath.getName());
279          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
280          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
281            new Path(region, new Path(family, hfile)));
282          break;
283        case WAL:
284          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
285          break;
286        default:
287          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
288      }
289      return new Path(outputArchive, path);
290    }
291
292    @SuppressWarnings("checkstyle:linelength")
293    /**
294     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
295     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
296     */
297    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
298      throws IOException {
299      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
300      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
301      testing.injectedFailureCount++;
302      context.getCounter(Counter.COPY_FAILED).increment(1);
303      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
304      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
305        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
306    }
307
308    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
309      final Path outputPath) throws IOException {
310      // Get the file information
311      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
312
313      // Verify if the output file exists and is the same that we want to copy
314      if (outputFs.exists(outputPath)) {
315        FileStatus outputStat = outputFs.getFileStatus(outputPath);
316        if (outputStat != null && sameFile(inputStat, outputStat)) {
317          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
318          context.getCounter(Counter.FILES_SKIPPED).increment(1);
319          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
320          return;
321        }
322      }
323
324      InputStream in = openSourceFile(context, inputInfo);
325      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
326      if (Integer.MAX_VALUE != bandwidthMB) {
327        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
328      }
329
330      Path inputPath = inputStat.getPath();
331      try {
332        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
333
334        // Ensure that the output folder is there and copy the file
335        createOutputPath(outputPath.getParent());
336        String family = new Path(inputInfo.getHfile()).getParent().getName();
337        String familyStoragePolicy = generateFamilyStoragePolicyKey(family);
338        if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) {
339          String key = context.getConfiguration().get(familyStoragePolicy);
340          LOG.info("Setting storage policy {} for {}", key, outputPath.getParent());
341          outputFs.setStoragePolicy(outputPath.getParent(), key);
342        }
343        FSDataOutputStream out = outputFs.create(outputPath, true);
344
345        long stime = EnvironmentEdgeManager.currentTime();
346        long totalBytesWritten =
347          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
348
349        // Verify the file length and checksum
350        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
351
352        long etime = EnvironmentEdgeManager.currentTime();
353        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
354        LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten)
355          + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec",
356            (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
357        context.getCounter(Counter.FILES_COPIED).increment(1);
358
359        // Try to Preserve attributes
360        if (!preserveAttributes(outputPath, inputStat)) {
361          LOG.warn("You may have to run manually chown on: " + outputPath);
362        }
363      } catch (IOException e) {
364        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
365        context.getCounter(Counter.COPY_FAILED).increment(1);
366        throw e;
367      } finally {
368        injectTestFailure(context, inputInfo);
369      }
370    }
371
372    /**
373     * Create the output folder and optionally set ownership.
374     */
375    private void createOutputPath(final Path path) throws IOException {
376      if (filesUser == null && filesGroup == null) {
377        outputFs.mkdirs(path);
378      } else {
379        Path parent = path.getParent();
380        if (!outputFs.exists(parent) && !parent.isRoot()) {
381          createOutputPath(parent);
382        }
383        outputFs.mkdirs(path);
384        if (filesUser != null || filesGroup != null) {
385          // override the owner when non-null user/group is specified
386          outputFs.setOwner(path, filesUser, filesGroup);
387        }
388        if (filesMode > 0) {
389          outputFs.setPermission(path, new FsPermission(filesMode));
390        }
391      }
392    }
393
394    /**
395     * Try to Preserve the files attribute selected by the user copying them from the source file
396     * This is only required when you are exporting as a different user than "hbase" or on a system
397     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
398     * can force a chmod with the user that knows is available on the system.
399     */
400    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
401      FileStatus stat;
402      try {
403        stat = outputFs.getFileStatus(path);
404      } catch (IOException e) {
405        LOG.warn("Unable to get the status for file=" + path);
406        return false;
407      }
408
409      try {
410        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
411          outputFs.setPermission(path, new FsPermission(filesMode));
412        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
413          outputFs.setPermission(path, refStat.getPermission());
414        }
415      } catch (IOException e) {
416        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
417        return false;
418      }
419
420      boolean hasRefStat = (refStat != null);
421      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
422      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
423      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
424        try {
425          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
426            outputFs.setOwner(path, user, group);
427          }
428        } catch (IOException e) {
429          LOG.warn(
430            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
431          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
432            + " group=" + group);
433          return false;
434        }
435      }
436
437      return true;
438    }
439
440    private boolean stringIsNotEmpty(final String str) {
441      return str != null && !str.isEmpty();
442    }
443
444    private long copyData(final Context context, final Path inputPath, final InputStream in,
445      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
446      throws IOException {
447      final String statusMessage =
448        "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)";
449
450      try {
451        byte[] buffer = new byte[bufferSize];
452        long totalBytesWritten = 0;
453        int reportBytes = 0;
454        int bytesRead;
455
456        while ((bytesRead = in.read(buffer)) > 0) {
457          out.write(buffer, 0, bytesRead);
458          totalBytesWritten += bytesRead;
459          reportBytes += bytesRead;
460
461          if (reportBytes >= reportSize) {
462            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
463            context
464              .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
465                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
466                + " to " + outputPath);
467            reportBytes = 0;
468          }
469        }
470
471        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
472        context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
473          (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
474          + outputPath);
475
476        return totalBytesWritten;
477      } finally {
478        out.close();
479        in.close();
480      }
481    }
482
483    /**
484     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
485     * fail or if the file is not found.
486     */
487    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
488      throws IOException {
489      try {
490        Configuration conf = context.getConfiguration();
491        FileLink link = null;
492        switch (fileInfo.getType()) {
493          case HFILE:
494            Path inputPath = new Path(fileInfo.getHfile());
495            link = getFileLink(inputPath, conf);
496            break;
497          case WAL:
498            String serverName = fileInfo.getWalServer();
499            String logName = fileInfo.getWalName();
500            link = new WALLink(inputRoot, serverName, logName);
501            break;
502          default:
503            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
504        }
505        return link.open(inputFs);
506      } catch (IOException e) {
507        context.getCounter(Counter.MISSING_FILES).increment(1);
508        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
509        throw e;
510      }
511    }
512
513    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
514      throws IOException {
515      try {
516        Configuration conf = context.getConfiguration();
517        FileLink link = null;
518        switch (fileInfo.getType()) {
519          case HFILE:
520            Path inputPath = new Path(fileInfo.getHfile());
521            link = getFileLink(inputPath, conf);
522            break;
523          case WAL:
524            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
525            break;
526          default:
527            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
528        }
529        return link.getFileStatus(inputFs);
530      } catch (FileNotFoundException e) {
531        context.getCounter(Counter.MISSING_FILES).increment(1);
532        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
533        throw e;
534      } catch (IOException e) {
535        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
536        throw e;
537      }
538    }
539
540    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
541      String regionName = HFileLink.getReferencedRegionName(path.getName());
542      TableName tableName = HFileLink.getReferencedTableName(path.getName());
543      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
544        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
545          HFileArchiveUtil.getArchivePath(conf), path);
546      }
547      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
548    }
549
550    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
551      try {
552        return fs.getFileChecksum(path);
553      } catch (IOException e) {
554        LOG.warn("Unable to get checksum for file=" + path, e);
555        return null;
556      }
557    }
558
559    /**
560     * Utility to compare the file length and checksums for the paths specified.
561     */
562    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
563      throws IOException {
564      long inputLen = inputStat.getLen();
565      long outputLen = outputStat.getLen();
566      Path inputPath = inputStat.getPath();
567      Path outputPath = outputStat.getPath();
568
569      if (inputLen != outputLen) {
570        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
571          + ") and output:" + outputPath + " (" + outputLen + ")");
572      }
573
574      // If length==0, we will skip checksum
575      if (inputLen != 0 && verifyChecksum) {
576        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
577        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
578
579        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
580        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
581          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
582            .append(inputPath).append(" and ").append(outputPath).append(".");
583
584          boolean addSkipHint = false;
585          String inputScheme = inputFs.getScheme();
586          String outputScheme = outputFs.getScheme();
587          if (!inputScheme.equals(outputScheme)) {
588            errMessage.append(" Input and output filesystems are of different types.\n")
589              .append("Their checksum algorithms may be incompatible.");
590            addSkipHint = true;
591          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
592            errMessage.append(" Input and output differ in block-size.");
593            addSkipHint = true;
594          } else if (
595            inChecksum != null && outChecksum != null
596              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
597          ) {
598            errMessage.append(" Input and output checksum algorithms are of different types.");
599            addSkipHint = true;
600          }
601          if (addSkipHint) {
602            errMessage
603              .append(" You can choose file-level checksum validation via "
604                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
605                + " or filesystems are different.\n")
606              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
607              .append(
608                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
609              .append(" (NOTE: By skipping checksums, one runs the risk of "
610                + "masking data-corruption during file-transfer.)\n");
611          }
612          throw new IOException(errMessage.toString());
613        }
614      }
615    }
616
617    /**
618     * Utility to compare checksums
619     */
620    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
621      final FileChecksum outChecksum) {
622      // If the input or output checksum is null, or the algorithms of input and output are not
623      // equal, that means there is no comparison
624      // and return not compatible. else if matched, return compatible with the matched result.
625      if (
626        inChecksum == null || outChecksum == null
627          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
628      ) {
629        return ChecksumComparison.INCOMPATIBLE;
630      } else if (inChecksum.equals(outChecksum)) {
631        return ChecksumComparison.TRUE;
632      }
633      return ChecksumComparison.FALSE;
634    }
635
636    /**
637     * Check if the two files are equal by looking at the file length, and at the checksum (if user
638     * has specified the verifyChecksum flag).
639     */
640    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
641      // Not matching length
642      if (inputStat.getLen() != outputStat.getLen()) return false;
643
644      // Mark files as equals, since user asked for no checksum verification
645      if (!verifyChecksum) return true;
646
647      // If checksums are not available, files are not the same.
648      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
649      if (inChecksum == null) return false;
650
651      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
652      if (outChecksum == null) return false;
653
654      return inChecksum.equals(outChecksum);
655    }
656  }
657
658  // ==========================================================================
659  // Input Format
660  // ==========================================================================
661
662  /**
663   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
664   * @return list of files referenced by the snapshot (pair of path and size)
665   */
666  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
667    final FileSystem fs, final Path snapshotDir) throws IOException {
668    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
669
670    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
671    final TableName table = TableName.valueOf(snapshotDesc.getTable());
672
673    // Get snapshot files
674    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
675    Set<String> addedFiles = new HashSet<>();
676    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
677      new SnapshotReferenceUtil.SnapshotVisitor() {
678        @Override
679        public void storeFile(final RegionInfo regionInfo, final String family,
680          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
681          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
682          if (!storeFile.hasReference()) {
683            String region = regionInfo.getEncodedName();
684            String hfile = storeFile.getName();
685            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
686              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
687          } else {
688            Pair<String, String> referredToRegionAndFile =
689              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
690            String referencedRegion = referredToRegionAndFile.getFirst();
691            String referencedHFile = referredToRegionAndFile.getSecond();
692            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
693              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
694          }
695          String fileToExport = snapshotFileAndSize.getFirst().getHfile();
696          if (!addedFiles.contains(fileToExport)) {
697            files.add(snapshotFileAndSize);
698            addedFiles.add(fileToExport);
699          } else {
700            LOG.debug("Skip the existing file: {}.", fileToExport);
701          }
702        }
703      });
704
705    return files;
706  }
707
708  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
709    Configuration conf, TableName table, String region, String family, String hfile, long size)
710    throws IOException {
711    Path path = HFileLink.createPath(table, region, family, hfile);
712    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
713      .setHfile(path.toString()).build();
714    if (size == -1) {
715      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
716    }
717    return new Pair<>(fileInfo, size);
718  }
719
720  /**
721   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
722   * The groups created will have similar amounts of bytes.
723   * <p>
724   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
725   * group fetch the bigger file available, iterating through groups alternating the direction.
726   */
727  static List<List<Pair<SnapshotFileInfo, Long>>>
728    getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
729    // Sort files by size, from small to big
730    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
731      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
732        long r = a.getSecond() - b.getSecond();
733        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
734      }
735    });
736
737    // create balanced groups
738    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
739    long[] sizeGroups = new long[ngroups];
740    int hi = files.size() - 1;
741    int lo = 0;
742
743    List<Pair<SnapshotFileInfo, Long>> group;
744    int dir = 1;
745    int g = 0;
746
747    while (hi >= lo) {
748      if (g == fileGroups.size()) {
749        group = new LinkedList<>();
750        fileGroups.add(group);
751      } else {
752        group = fileGroups.get(g);
753      }
754
755      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
756
757      // add the hi one
758      sizeGroups[g] += fileInfo.getSecond();
759      group.add(fileInfo);
760
761      // change direction when at the end or the beginning
762      g += dir;
763      if (g == ngroups) {
764        dir = -1;
765        g = ngroups - 1;
766      } else if (g < 0) {
767        dir = 1;
768        g = 0;
769      }
770    }
771
772    if (LOG.isDebugEnabled()) {
773      for (int i = 0; i < sizeGroups.length; ++i) {
774        LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i]));
775      }
776    }
777
778    return fileGroups;
779  }
780
781  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
782    @Override
783    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
784      TaskAttemptContext tac) throws IOException, InterruptedException {
785      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
786    }
787
788    @Override
789    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
790      Configuration conf = context.getConfiguration();
791      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
792      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
793
794      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
795      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
796      if (mappers == 0 && snapshotFiles.size() > 0) {
797        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
798        mappers = Math.min(mappers, snapshotFiles.size());
799        conf.setInt(CONF_NUM_SPLITS, mappers);
800        conf.setInt(MR_NUM_MAPS, mappers);
801      }
802
803      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
804      List<InputSplit> splits = new ArrayList(groups.size());
805      for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
806        splits.add(new ExportSnapshotInputSplit(files));
807      }
808      return splits;
809    }
810
811    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
812      private List<Pair<BytesWritable, Long>> files;
813      private long length;
814
815      public ExportSnapshotInputSplit() {
816        this.files = null;
817      }
818
819      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
820        this.files = new ArrayList(snapshotFiles.size());
821        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
822          this.files.add(
823            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
824          this.length += fileInfo.getSecond();
825        }
826      }
827
828      private List<Pair<BytesWritable, Long>> getSplitKeys() {
829        return files;
830      }
831
832      @Override
833      public long getLength() throws IOException, InterruptedException {
834        return length;
835      }
836
837      @Override
838      public String[] getLocations() throws IOException, InterruptedException {
839        return new String[] {};
840      }
841
842      @Override
843      public void readFields(DataInput in) throws IOException {
844        int count = in.readInt();
845        files = new ArrayList<>(count);
846        length = 0;
847        for (int i = 0; i < count; ++i) {
848          BytesWritable fileInfo = new BytesWritable();
849          fileInfo.readFields(in);
850          long size = in.readLong();
851          files.add(new Pair<>(fileInfo, size));
852          length += size;
853        }
854      }
855
856      @Override
857      public void write(DataOutput out) throws IOException {
858        out.writeInt(files.size());
859        for (final Pair<BytesWritable, Long> fileInfo : files) {
860          fileInfo.getFirst().write(out);
861          out.writeLong(fileInfo.getSecond());
862        }
863      }
864    }
865
866    private static class ExportSnapshotRecordReader
867      extends RecordReader<BytesWritable, NullWritable> {
868      private final List<Pair<BytesWritable, Long>> files;
869      private long totalSize = 0;
870      private long procSize = 0;
871      private int index = -1;
872
873      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
874        this.files = files;
875        for (Pair<BytesWritable, Long> fileInfo : files) {
876          totalSize += fileInfo.getSecond();
877        }
878      }
879
880      @Override
881      public void close() {
882      }
883
884      @Override
885      public BytesWritable getCurrentKey() {
886        return files.get(index).getFirst();
887      }
888
889      @Override
890      public NullWritable getCurrentValue() {
891        return NullWritable.get();
892      }
893
894      @Override
895      public float getProgress() {
896        return (float) procSize / totalSize;
897      }
898
899      @Override
900      public void initialize(InputSplit split, TaskAttemptContext tac) {
901      }
902
903      @Override
904      public boolean nextKeyValue() {
905        if (index >= 0) {
906          procSize += files.get(index).getSecond();
907        }
908        return (++index < files.size());
909      }
910    }
911  }
912
913  // ==========================================================================
914  // Tool
915  // ==========================================================================
916
917  /**
918   * Run Map-Reduce Job to perform the files copy.
919   */
920  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
921    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
922    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB,
923    final String storagePolicy) throws IOException, InterruptedException, ClassNotFoundException {
924    Configuration conf = getConf();
925    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
926    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
927    if (mappers > 0) {
928      conf.setInt(CONF_NUM_SPLITS, mappers);
929      conf.setInt(MR_NUM_MAPS, mappers);
930    }
931    conf.setInt(CONF_FILES_MODE, filesMode);
932    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
933    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
934    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
935    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
936    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
937    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
938    if (storagePolicy != null) {
939      for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) {
940        conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue());
941      }
942    }
943
944    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
945    Job job = new Job(conf);
946    job.setJobName(jobname);
947    job.setJarByClass(ExportSnapshot.class);
948    TableMapReduceUtil.addDependencyJars(job);
949    job.setMapperClass(ExportMapper.class);
950    job.setInputFormatClass(ExportSnapshotInputFormat.class);
951    job.setOutputFormatClass(NullOutputFormat.class);
952    job.setMapSpeculativeExecution(false);
953    job.setNumReduceTasks(0);
954
955    // Acquire the delegation Tokens
956    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
957    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
958    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
959    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
960
961    // Run the MR Job
962    if (!job.waitForCompletion(true)) {
963      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
964    }
965  }
966
967  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
968    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
969    // Update the conf with the current root dir, since may be a different cluster
970    Configuration conf = new Configuration(baseConf);
971    CommonFSUtils.setRootDir(conf, rootDir);
972    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
973    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
974      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
975    if (isExpired) {
976      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
977    }
978    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
979  }
980
981  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
982    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
983    ExecutorService pool = Executors
984      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
985    List<Future<Void>> futures = new ArrayList<>();
986    for (Path dstPath : traversedPath) {
987      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
988      futures.add(future);
989    }
990    try {
991      for (Future<Void> future : futures) {
992        future.get();
993      }
994    } catch (InterruptedException | ExecutionException e) {
995      throw new IOException(e);
996    } finally {
997      pool.shutdownNow();
998    }
999  }
1000
1001  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
1002    Configuration conf, List<Path> traversedPath) throws IOException {
1003    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1004      try {
1005        fs.setOwner(path, filesUser, filesGroup);
1006      } catch (IOException e) {
1007        throw new RuntimeException(
1008          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
1009      }
1010    }, conf);
1011  }
1012
1013  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
1014    final List<Path> traversedPath, final Configuration conf) throws IOException {
1015    if (filesMode <= 0) {
1016      return;
1017    }
1018    FsPermission perm = new FsPermission(filesMode);
1019    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1020      try {
1021        fs.setPermission(path, perm);
1022      } catch (IOException e) {
1023        throw new RuntimeException(
1024          "set permission for file " + path + " to " + filesMode + " failed", e);
1025      }
1026    }, conf);
1027  }
1028
1029  private Map<String, String> storagePolicyPerFamily(String storagePolicy) {
1030    Map<String, String> familyStoragePolicy = new HashMap<>();
1031    for (String familyConf : storagePolicy.split("&")) {
1032      String[] familySplit = familyConf.split("=");
1033      if (familySplit.length != 2) {
1034        continue;
1035      }
1036      // family is key, storage policy is value
1037      familyStoragePolicy.put(familySplit[0], familySplit[1]);
1038    }
1039    return familyStoragePolicy;
1040  }
1041
1042  private static String generateFamilyStoragePolicyKey(String family) {
1043    return CONF_STORAGE_POLICY + "." + family;
1044  }
1045
1046  private boolean verifyTarget = true;
1047  private boolean verifySource = true;
1048  private boolean verifyChecksum = true;
1049  private String snapshotName = null;
1050  private String targetName = null;
1051  private boolean overwrite = false;
1052  private String filesGroup = null;
1053  private String filesUser = null;
1054  private Path outputRoot = null;
1055  private Path inputRoot = null;
1056  private int bandwidthMB = Integer.MAX_VALUE;
1057  private int filesMode = 0;
1058  private int mappers = 0;
1059  private boolean resetTtl = false;
1060  private String storagePolicy = null;
1061
1062  @Override
1063  protected void processOptions(CommandLine cmd) {
1064    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1065    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1066    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1067      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1068    }
1069    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1070      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1071    }
1072    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1073    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1074    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1075    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1076    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1077    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1078    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1079    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1080    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1081    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1082    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1083    if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) {
1084      storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt());
1085    }
1086  }
1087
1088  /**
1089   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1090   * @return 0 on success, and != 0 upon failure.
1091   */
1092  @Override
1093  public int doWork() throws IOException {
1094    Configuration conf = getConf();
1095
1096    // Check user options
1097    if (snapshotName == null) {
1098      System.err.println("Snapshot name not provided.");
1099      LOG.error("Use -h or --help for usage instructions.");
1100      return EXIT_FAILURE;
1101    }
1102
1103    if (outputRoot == null) {
1104      System.err
1105        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1106      LOG.error("Use -h or --help for usage instructions.");
1107      return EXIT_FAILURE;
1108    }
1109
1110    if (targetName == null) {
1111      targetName = snapshotName;
1112    }
1113    if (inputRoot == null) {
1114      inputRoot = CommonFSUtils.getRootDir(conf);
1115    } else {
1116      CommonFSUtils.setRootDir(conf, inputRoot);
1117    }
1118
1119    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1120    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1121    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1122    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1123    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1124      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1125    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1126    Path snapshotTmpDir =
1127      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1128    Path outputSnapshotDir =
1129      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1130    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1131    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1132    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1133      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1134
1135    // throw CorruptedSnapshotException if we can't read the snapshot info.
1136    SnapshotDescription sourceSnapshotDesc =
1137      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1138
1139    // Verify snapshot source before copying files
1140    if (verifySource) {
1141      LOG.info("Verify the source snapshot's expiration status and integrity.");
1142      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1143    }
1144
1145    // Find the necessary directory which need to change owner and group
1146    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1147    if (outputFs.exists(needSetOwnerDir)) {
1148      if (skipTmp) {
1149        needSetOwnerDir = outputSnapshotDir;
1150      } else {
1151        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1152        if (outputFs.exists(needSetOwnerDir)) {
1153          needSetOwnerDir = snapshotTmpDir;
1154        }
1155      }
1156    }
1157
1158    // Check if the snapshot already exists
1159    if (outputFs.exists(outputSnapshotDir)) {
1160      if (overwrite) {
1161        if (!outputFs.delete(outputSnapshotDir, true)) {
1162          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1163          return EXIT_FAILURE;
1164        }
1165      } else {
1166        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1167          + outputSnapshotDir);
1168        return EXIT_FAILURE;
1169      }
1170    }
1171
1172    if (!skipTmp) {
1173      // Check if the snapshot already in-progress
1174      if (outputFs.exists(snapshotTmpDir)) {
1175        if (overwrite) {
1176          if (!outputFs.delete(snapshotTmpDir, true)) {
1177            System.err
1178              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1179            return EXIT_FAILURE;
1180          }
1181        } else {
1182          System.err
1183            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1184          System.err
1185            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1186          System.err
1187            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1188          return EXIT_FAILURE;
1189        }
1190      }
1191    }
1192
1193    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1194    // The snapshot references must be copied before the hfiles otherwise the cleaner
1195    // will remove them because they are unreferenced.
1196    List<Path> travesedPaths = new ArrayList<>();
1197    boolean copySucceeded = false;
1198    try {
1199      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1200      travesedPaths =
1201        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1202          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1203      copySucceeded = true;
1204    } catch (IOException e) {
1205      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1206        + " to=" + initialOutputSnapshotDir, e);
1207    } finally {
1208      if (copySucceeded) {
1209        if (filesUser != null || filesGroup != null) {
1210          LOG.warn(
1211            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1212              + (filesGroup == null
1213                ? ""
1214                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1215          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1216        }
1217        if (filesMode > 0) {
1218          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1219          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1220        }
1221      }
1222    }
1223
1224    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1225    // reset TTL for target snapshot.
1226    if (!targetName.equals(snapshotName) || resetTtl) {
1227      SnapshotDescription.Builder snapshotDescBuilder =
1228        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1229      if (!targetName.equals(snapshotName)) {
1230        snapshotDescBuilder.setName(targetName);
1231      }
1232      if (resetTtl) {
1233        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1234      }
1235      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1236        initialOutputSnapshotDir, outputFs);
1237      if (filesUser != null || filesGroup != null) {
1238        outputFs.setOwner(
1239          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1240          filesGroup);
1241      }
1242      if (filesMode > 0) {
1243        outputFs.setPermission(
1244          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1245          new FsPermission((short) filesMode));
1246      }
1247    }
1248
1249    // Step 2 - Start MR Job to copy files
1250    // The snapshot references must be copied before the files otherwise the files gets removed
1251    // by the HFileArchiver, since they have no references.
1252    try {
1253      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1254        filesGroup, filesMode, mappers, bandwidthMB, storagePolicy);
1255
1256      LOG.info("Finalize the Snapshot Export");
1257      if (!skipTmp) {
1258        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1259        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1260          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1261            + snapshotTmpDir + " to=" + outputSnapshotDir);
1262        }
1263      }
1264
1265      // Step 4 - Verify snapshot integrity
1266      if (verifyTarget) {
1267        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1268        SnapshotDescription targetSnapshotDesc =
1269          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1270        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1271      }
1272
1273      LOG.info("Export Completed: " + targetName);
1274      return EXIT_SUCCESS;
1275    } catch (Exception e) {
1276      LOG.error("Snapshot export failed", e);
1277      if (!skipTmp) {
1278        outputFs.delete(snapshotTmpDir, true);
1279      }
1280      outputFs.delete(outputSnapshotDir, true);
1281      return EXIT_FAILURE;
1282    }
1283  }
1284
1285  @Override
1286  protected void printUsage() {
1287    super.printUsage();
1288    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1289      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1290      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1291      + "  hbase snapshot export \\\n"
1292      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1293      + "    --copy-to hdfs://srv1:50070/hbase");
1294  }
1295
1296  @Override
1297  protected void addOptions() {
1298    addRequiredOption(Options.SNAPSHOT);
1299    addOption(Options.COPY_TO);
1300    addOption(Options.COPY_FROM);
1301    addOption(Options.TARGET_NAME);
1302    addOption(Options.NO_CHECKSUM_VERIFY);
1303    addOption(Options.NO_TARGET_VERIFY);
1304    addOption(Options.NO_SOURCE_VERIFY);
1305    addOption(Options.OVERWRITE);
1306    addOption(Options.CHUSER);
1307    addOption(Options.CHGROUP);
1308    addOption(Options.CHMOD);
1309    addOption(Options.MAPPERS);
1310    addOption(Options.BANDWIDTH);
1311    addOption(Options.RESET_TTL);
1312  }
1313
1314  public static void main(String[] args) {
1315    new ExportSnapshot().doStaticMain(args);
1316  }
1317}