001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.function.BiConsumer;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.FileChecksum;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.permission.FsPermission;
049import org.apache.hadoop.hbase.HBaseConfiguration;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.io.FileLink;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.WALLink;
056import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
057import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
058import org.apache.hadoop.hbase.mob.MobUtils;
059import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
060import org.apache.hadoop.hbase.util.AbstractHBaseTool;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.FSUtils;
064import org.apache.hadoop.hbase.util.HFileArchiveUtil;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.hbase.util.Strings;
067import org.apache.hadoop.io.BytesWritable;
068import org.apache.hadoop.io.NullWritable;
069import org.apache.hadoop.io.Writable;
070import org.apache.hadoop.mapreduce.InputFormat;
071import org.apache.hadoop.mapreduce.InputSplit;
072import org.apache.hadoop.mapreduce.Job;
073import org.apache.hadoop.mapreduce.JobContext;
074import org.apache.hadoop.mapreduce.Mapper;
075import org.apache.hadoop.mapreduce.RecordReader;
076import org.apache.hadoop.mapreduce.TaskAttemptContext;
077import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
078import org.apache.hadoop.mapreduce.security.TokenCache;
079import org.apache.hadoop.util.ReflectionUtils;
080import org.apache.hadoop.util.StringUtils;
081import org.apache.hadoop.util.Tool;
082import org.apache.yetus.audience.InterfaceAudience;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
089import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
090
091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
095
096/**
097 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
098 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
099 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
100 */
101@InterfaceAudience.Public
102public class ExportSnapshot extends AbstractHBaseTool implements Tool {
103  public static final String NAME = "exportsnapshot";
104  /** Configuration prefix for overrides for the source filesystem */
105  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
106  /** Configuration prefix for overrides for the destination filesystem */
107  public static final String CONF_DEST_PREFIX = NAME + ".to.";
108
109  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
110
111  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
112  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
113  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
114  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
115  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
116  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
117  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
118  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
119  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
120  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
121  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
122  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
123  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
124  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
125  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
126  private static final String CONF_INPUT_FILE_GROUPER_CLASS =
127    "snapshot.export.input.file.grouper.class";
128  private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS =
129    "snapshot.export.input.file.location.resolver.class";
130  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
131  private static final String CONF_COPY_MANIFEST_THREADS =
132    "snapshot.export.copy.references.threads";
133  private static final int DEFAULT_COPY_MANIFEST_THREADS =
134    Runtime.getRuntime().availableProcessors();
135  private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family";
136
137  static class Testing {
138    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
139    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
140    int failuresCountToInject = 0;
141    int injectedFailureCount = 0;
142  }
143
144  // Command line options and defaults.
145  static final class Options {
146    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
147    static final Option TARGET_NAME =
148      new Option(null, "target", true, "Target name for the snapshot.");
149    static final Option COPY_TO =
150      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
151    static final Option COPY_FROM =
152      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
153    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
154      "Do not verify checksum, use name+length only.");
155    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
156      "Do not verify the exported snapshot's expiration status and integrity.");
157    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
158      "Do not verify the source snapshot's expiration status and integrity.");
159    static final Option OVERWRITE =
160      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
161    static final Option CHUSER =
162      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
163    static final Option CHGROUP =
164      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
165    static final Option CHMOD =
166      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
167    static final Option MAPPERS = new Option(null, "mappers", true,
168      "Number of mappers to use during the copy (mapreduce.job.maps). "
169        + "If you provide a --custom-file-grouper, "
170        + "then --mappers is interpreted as the number of mappers per group.");
171    static final Option BANDWIDTH =
172      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
173    static final Option RESET_TTL =
174      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
175    static final Option STORAGE_POLICY = new Option(null, "storage-policy", true,
176      "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD");
177    static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true,
178      "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. "
179        + "See JavaDoc on that class for more information.");
180    static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true,
181      "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. "
182        + "See JavaDoc on that class for more information.");
183  }
184
185  // Export Map-Reduce Counters, to keep track of the progress
186  public enum Counter {
187    MISSING_FILES,
188    FILES_COPIED,
189    FILES_SKIPPED,
190    COPY_FAILED,
191    BYTES_EXPECTED,
192    BYTES_SKIPPED,
193    BYTES_COPIED
194  }
195
196  /**
197   * Indicates the checksum comparison result.
198   */
199  public enum ChecksumComparison {
200    TRUE, // checksum comparison is compatible and true.
201    FALSE, // checksum comparison is compatible and false.
202    INCOMPATIBLE, // checksum comparison is not compatible.
203  }
204
205  /**
206   * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot
207   * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation
208   * must return a data structure that contains each input file exactly once. Files that appear in
209   * separate entries in the top-level returned Collection are guaranteed to not be placed in the
210   * same InputSplit. This can be used to segregate your input files by the rack or host on which
211   * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve
212   * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper
213   * argument with the fully qualified class name of an implementation of CustomFileGrouper that's
214   * on the classpath. If this argument is not used, no particular grouping logic will be applied.
215   */
216  @InterfaceAudience.Public
217  public interface CustomFileGrouper {
218    Collection<Collection<Pair<SnapshotFileInfo, Long>>>
219      getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles);
220  }
221
222  private static class NoopCustomFileGrouper implements CustomFileGrouper {
223    @Override
224    public Collection<Collection<Pair<SnapshotFileInfo, Long>>>
225      getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
226      return ImmutableList.of(snapshotFiles);
227    }
228  }
229
230  /**
231   * If desired, you may implement a FileLocationResolver in order to influence the _location_
232   * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The
233   * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit
234   * being constructed. Whatever is returned will ultimately be reported by that split's
235   * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the
236   * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the
237   * --file-location-resolver argument with the fully qualified class name of an implementation of
238   * FileLocationResolver that's on the classpath. If this argument is not used, no locations will
239   * be attached to the InputSplits.
240   */
241  @InterfaceAudience.Public
242  public interface FileLocationResolver {
243    Set<String> getLocationsForInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> files);
244  }
245
246  static class NoopFileLocationResolver implements FileLocationResolver {
247    @Override
248    public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) {
249      return ImmutableSet.of();
250    }
251  }
252
253  private static class ExportMapper
254    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
255    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
256    final static int REPORT_SIZE = 1 * 1024 * 1024;
257    final static int BUFFER_SIZE = 64 * 1024;
258
259    private boolean verifyChecksum;
260    private String filesGroup;
261    private String filesUser;
262    private short filesMode;
263    private int bufferSize;
264    private int reportSize;
265
266    private FileSystem outputFs;
267    private Path outputArchive;
268    private Path outputRoot;
269
270    private FileSystem inputFs;
271    private Path inputArchive;
272    private Path inputRoot;
273
274    private static Testing testing = new Testing();
275
276    @Override
277    public void setup(Context context) throws IOException {
278      Configuration conf = context.getConfiguration();
279
280      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
281      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
282
283      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
284
285      filesGroup = conf.get(CONF_FILES_GROUP);
286      filesUser = conf.get(CONF_FILES_USER);
287      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
288      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
289      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
290
291      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
292      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
293
294      try {
295        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
296      } catch (IOException e) {
297        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
298      }
299
300      try {
301        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
302      } catch (IOException e) {
303        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
304      }
305
306      // Use the default block size of the outputFs if bigger
307      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
308      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
309      LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize));
310      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
311
312      for (Counter c : Counter.values()) {
313        context.getCounter(c).increment(0);
314      }
315      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
316        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
317        // Get number of times we have already injected failure based on attempt number of this
318        // task.
319        testing.injectedFailureCount = context.getTaskAttemptID().getId();
320      }
321    }
322
323    @Override
324    public void map(BytesWritable key, NullWritable value, Context context)
325      throws InterruptedException, IOException {
326      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
327      Path outputPath = getOutputPath(inputInfo);
328
329      copyFile(context, inputInfo, outputPath);
330    }
331
332    /**
333     * Returns the location where the inputPath will be copied.
334     */
335    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
336      Path path = null;
337      switch (inputInfo.getType()) {
338        case HFILE:
339          Path inputPath = new Path(inputInfo.getHfile());
340          String family = inputPath.getParent().getName();
341          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
342          String region = HFileLink.getReferencedRegionName(inputPath.getName());
343          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
344          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
345            new Path(region, new Path(family, hfile)));
346          break;
347        case WAL:
348          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
349          break;
350        default:
351          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
352      }
353      return new Path(outputArchive, path);
354    }
355
356    @SuppressWarnings("checkstyle:linelength")
357    /**
358     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
359     * {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
360     */
361    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
362      throws IOException {
363      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
364      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
365      testing.injectedFailureCount++;
366      context.getCounter(Counter.COPY_FAILED).increment(1);
367      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
368      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
369        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
370    }
371
372    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
373      final Path outputPath) throws IOException {
374      // Get the file information
375      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
376
377      // Verify if the output file exists and is the same that we want to copy
378      if (outputFs.exists(outputPath)) {
379        FileStatus outputStat = outputFs.getFileStatus(outputPath);
380        if (outputStat != null && sameFile(inputStat, outputStat)) {
381          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
382          context.getCounter(Counter.FILES_SKIPPED).increment(1);
383          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
384          return;
385        }
386      }
387
388      InputStream in = openSourceFile(context, inputInfo);
389      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
390      if (Integer.MAX_VALUE != bandwidthMB) {
391        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
392      }
393
394      Path inputPath = inputStat.getPath();
395      try {
396        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
397
398        // Ensure that the output folder is there and copy the file
399        createOutputPath(outputPath.getParent());
400        String family = new Path(inputInfo.getHfile()).getParent().getName();
401        String familyStoragePolicy = generateFamilyStoragePolicyKey(family);
402        if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) {
403          String key = context.getConfiguration().get(familyStoragePolicy);
404          LOG.info("Setting storage policy {} for {}", key, outputPath.getParent());
405          outputFs.setStoragePolicy(outputPath.getParent(), key);
406        }
407        FSDataOutputStream out = outputFs.create(outputPath, true);
408
409        long stime = EnvironmentEdgeManager.currentTime();
410        long totalBytesWritten =
411          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
412
413        // Verify the file length and checksum
414        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
415
416        long etime = EnvironmentEdgeManager.currentTime();
417        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
418        LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten)
419          + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec",
420            (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
421        context.getCounter(Counter.FILES_COPIED).increment(1);
422
423        // Try to Preserve attributes
424        if (!preserveAttributes(outputPath, inputStat)) {
425          LOG.warn("You may have to run manually chown on: " + outputPath);
426        }
427      } catch (IOException e) {
428        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
429        context.getCounter(Counter.COPY_FAILED).increment(1);
430        throw e;
431      } finally {
432        injectTestFailure(context, inputInfo);
433      }
434    }
435
436    /**
437     * Create the output folder and optionally set ownership.
438     */
439    private void createOutputPath(final Path path) throws IOException {
440      if (filesUser == null && filesGroup == null) {
441        outputFs.mkdirs(path);
442      } else {
443        Path parent = path.getParent();
444        if (!outputFs.exists(parent) && !parent.isRoot()) {
445          createOutputPath(parent);
446        }
447        outputFs.mkdirs(path);
448        if (filesUser != null || filesGroup != null) {
449          // override the owner when non-null user/group is specified
450          outputFs.setOwner(path, filesUser, filesGroup);
451        }
452        if (filesMode > 0) {
453          outputFs.setPermission(path, new FsPermission(filesMode));
454        }
455      }
456    }
457
458    /**
459     * Try to Preserve the files attribute selected by the user copying them from the source file
460     * This is only required when you are exporting as a different user than "hbase" or on a system
461     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
462     * can force a chmod with the user that knows is available on the system.
463     */
464    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
465      FileStatus stat;
466      try {
467        stat = outputFs.getFileStatus(path);
468      } catch (IOException e) {
469        LOG.warn("Unable to get the status for file=" + path);
470        return false;
471      }
472
473      try {
474        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
475          outputFs.setPermission(path, new FsPermission(filesMode));
476        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
477          outputFs.setPermission(path, refStat.getPermission());
478        }
479      } catch (IOException e) {
480        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
481        return false;
482      }
483
484      boolean hasRefStat = (refStat != null);
485      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
486      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
487      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
488        try {
489          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
490            outputFs.setOwner(path, user, group);
491          }
492        } catch (IOException e) {
493          LOG.warn(
494            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
495          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
496            + " group=" + group);
497          return false;
498        }
499      }
500
501      return true;
502    }
503
504    private boolean stringIsNotEmpty(final String str) {
505      return str != null && !str.isEmpty();
506    }
507
508    private long copyData(final Context context, final Path inputPath, final InputStream in,
509      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
510      throws IOException {
511      final String statusMessage =
512        "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)";
513
514      try {
515        byte[] buffer = new byte[bufferSize];
516        long totalBytesWritten = 0;
517        int reportBytes = 0;
518        int bytesRead;
519
520        while ((bytesRead = in.read(buffer)) > 0) {
521          out.write(buffer, 0, bytesRead);
522          totalBytesWritten += bytesRead;
523          reportBytes += bytesRead;
524
525          if (reportBytes >= reportSize) {
526            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
527            context
528              .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
529                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
530                + " to " + outputPath);
531            reportBytes = 0;
532          }
533        }
534
535        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
536        context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
537          (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
538          + outputPath);
539
540        return totalBytesWritten;
541      } finally {
542        out.close();
543        in.close();
544      }
545    }
546
547    /**
548     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
549     * fail or if the file is not found.
550     */
551    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
552      throws IOException {
553      try {
554        Configuration conf = context.getConfiguration();
555        FileLink link = null;
556        switch (fileInfo.getType()) {
557          case HFILE:
558            Path inputPath = new Path(fileInfo.getHfile());
559            link = getFileLink(inputPath, conf);
560            break;
561          case WAL:
562            String serverName = fileInfo.getWalServer();
563            String logName = fileInfo.getWalName();
564            link = new WALLink(inputRoot, serverName, logName);
565            break;
566          default:
567            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
568        }
569        return link.open(inputFs);
570      } catch (IOException e) {
571        context.getCounter(Counter.MISSING_FILES).increment(1);
572        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
573        throw e;
574      }
575    }
576
577    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
578      throws IOException {
579      try {
580        Configuration conf = context.getConfiguration();
581        FileLink link = null;
582        switch (fileInfo.getType()) {
583          case HFILE:
584            Path inputPath = new Path(fileInfo.getHfile());
585            link = getFileLink(inputPath, conf);
586            break;
587          case WAL:
588            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
589            break;
590          default:
591            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
592        }
593        return link.getFileStatus(inputFs);
594      } catch (FileNotFoundException e) {
595        context.getCounter(Counter.MISSING_FILES).increment(1);
596        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
597        throw e;
598      } catch (IOException e) {
599        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
600        throw e;
601      }
602    }
603
604    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
605      String regionName = HFileLink.getReferencedRegionName(path.getName());
606      TableName tableName = HFileLink.getReferencedTableName(path.getName());
607      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
608        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
609          HFileArchiveUtil.getArchivePath(conf), path);
610      }
611      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
612    }
613
614    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
615      try {
616        return fs.getFileChecksum(path);
617      } catch (IOException e) {
618        LOG.warn("Unable to get checksum for file=" + path, e);
619        return null;
620      }
621    }
622
623    /**
624     * Utility to compare the file length and checksums for the paths specified.
625     */
626    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
627      throws IOException {
628      long inputLen = inputStat.getLen();
629      long outputLen = outputStat.getLen();
630      Path inputPath = inputStat.getPath();
631      Path outputPath = outputStat.getPath();
632
633      if (inputLen != outputLen) {
634        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
635          + ") and output:" + outputPath + " (" + outputLen + ")");
636      }
637
638      // If length==0, we will skip checksum
639      if (inputLen != 0 && verifyChecksum) {
640        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
641        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
642
643        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
644        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
645          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
646            .append(inputPath).append(" and ").append(outputPath).append(".");
647
648          boolean addSkipHint = false;
649          String inputScheme = inputFs.getScheme();
650          String outputScheme = outputFs.getScheme();
651          if (!inputScheme.equals(outputScheme)) {
652            errMessage.append(" Input and output filesystems are of different types.\n")
653              .append("Their checksum algorithms may be incompatible.");
654            addSkipHint = true;
655          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
656            errMessage.append(" Input and output differ in block-size.");
657            addSkipHint = true;
658          } else if (
659            inChecksum != null && outChecksum != null
660              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
661          ) {
662            errMessage.append(" Input and output checksum algorithms are of different types.");
663            addSkipHint = true;
664          }
665          if (addSkipHint) {
666            errMessage
667              .append(" You can choose file-level checksum validation via "
668                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
669                + " or filesystems are different.\n")
670              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
671              .append(
672                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
673              .append(" (NOTE: By skipping checksums, one runs the risk of "
674                + "masking data-corruption during file-transfer.)\n");
675          }
676          throw new IOException(errMessage.toString());
677        }
678      }
679    }
680
681    /**
682     * Utility to compare checksums
683     */
684    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
685      final FileChecksum outChecksum) {
686      // If the input or output checksum is null, or the algorithms of input and output are not
687      // equal, that means there is no comparison
688      // and return not compatible. else if matched, return compatible with the matched result.
689      if (
690        inChecksum == null || outChecksum == null
691          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
692      ) {
693        return ChecksumComparison.INCOMPATIBLE;
694      } else if (inChecksum.equals(outChecksum)) {
695        return ChecksumComparison.TRUE;
696      }
697      return ChecksumComparison.FALSE;
698    }
699
700    /**
701     * Check if the two files are equal by looking at the file length, and at the checksum (if user
702     * has specified the verifyChecksum flag).
703     */
704    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
705      // Not matching length
706      if (inputStat.getLen() != outputStat.getLen()) return false;
707
708      // Mark files as equals, since user asked for no checksum verification
709      if (!verifyChecksum) return true;
710
711      // If checksums are not available, files are not the same.
712      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
713      if (inChecksum == null) return false;
714
715      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
716      if (outChecksum == null) return false;
717
718      return inChecksum.equals(outChecksum);
719    }
720  }
721
722  // ==========================================================================
723  // Input Format
724  // ==========================================================================
725
726  /**
727   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
728   * @return list of files referenced by the snapshot (pair of path and size)
729   */
730  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
731    final FileSystem fs, final Path snapshotDir) throws IOException {
732    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
733
734    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
735    final TableName table = TableName.valueOf(snapshotDesc.getTable());
736
737    // Get snapshot files
738    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
739    Set<String> addedFiles = new HashSet<>();
740    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
741      new SnapshotReferenceUtil.SnapshotVisitor() {
742        @Override
743        public void storeFile(final RegionInfo regionInfo, final String family,
744          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
745          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
746          if (!storeFile.hasReference()) {
747            String region = regionInfo.getEncodedName();
748            String hfile = storeFile.getName();
749            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
750              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
751          } else {
752            Pair<String, String> referredToRegionAndFile =
753              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
754            String referencedRegion = referredToRegionAndFile.getFirst();
755            String referencedHFile = referredToRegionAndFile.getSecond();
756            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
757              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
758          }
759          String fileToExport = snapshotFileAndSize.getFirst().getHfile();
760          if (!addedFiles.contains(fileToExport)) {
761            files.add(snapshotFileAndSize);
762            addedFiles.add(fileToExport);
763          } else {
764            LOG.debug("Skip the existing file: {}.", fileToExport);
765          }
766        }
767      });
768
769    return files;
770  }
771
772  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
773    Configuration conf, TableName table, String region, String family, String hfile, long size)
774    throws IOException {
775    Path path = HFileLink.createPath(table, region, family, hfile);
776    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
777      .setHfile(path.toString()).build();
778    if (size == -1) {
779      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
780    }
781    return new Pair<>(fileInfo, size);
782  }
783
784  /**
785   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
786   * The groups created will have similar amounts of bytes.
787   * <p>
788   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
789   * group fetch the bigger file available, iterating through groups alternating the direction.
790   */
791  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
792    final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int ngroups) {
793    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles);
794    // Sort files by size, from small to big
795    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
796      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
797        long r = a.getSecond() - b.getSecond();
798        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
799      }
800    });
801
802    // create balanced groups
803    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
804    int hi = files.size() - 1;
805    int lo = 0;
806
807    List<Pair<SnapshotFileInfo, Long>> group;
808    int dir = 1;
809    int g = 0;
810
811    while (hi >= lo) {
812      if (g == fileGroups.size()) {
813        group = new LinkedList<>();
814        fileGroups.add(group);
815      } else {
816        group = fileGroups.get(g);
817      }
818
819      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
820
821      // add the hi one
822      group.add(fileInfo);
823
824      // change direction when at the end or the beginning
825      g += dir;
826      if (g == ngroups) {
827        dir = -1;
828        g = ngroups - 1;
829      } else if (g < 0) {
830        dir = 1;
831        g = 0;
832      }
833    }
834
835    return fileGroups;
836  }
837
838  static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
839    @Override
840    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
841      TaskAttemptContext tac) throws IOException, InterruptedException {
842      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
843    }
844
845    @Override
846    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
847      Configuration conf = context.getConfiguration();
848      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
849      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
850
851      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
852
853      Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups =
854        groupFilesForSplits(conf, snapshotFiles);
855
856      Class<? extends FileLocationResolver> fileLocationResolverClass =
857        conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class,
858          FileLocationResolver.class);
859      FileLocationResolver fileLocationResolver =
860        ReflectionUtils.newInstance(fileLocationResolverClass, conf);
861      LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit",
862        fileLocationResolverClass);
863
864      List<InputSplit> splits = new ArrayList<>(balancedGroups.size());
865      for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) {
866        splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver));
867      }
868      return splits;
869    }
870
871    Collection<List<Pair<SnapshotFileInfo, Long>>> groupFilesForSplits(Configuration conf,
872      List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
873      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
874      if (mappers == 0 && !snapshotFiles.isEmpty()) {
875        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
876        mappers = Math.min(mappers, snapshotFiles.size());
877        conf.setInt(CONF_NUM_SPLITS, mappers);
878        conf.setInt(MR_NUM_MAPS, mappers);
879      }
880
881      Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass(
882        CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class);
883      CustomFileGrouper customFileGrouper =
884        ReflectionUtils.newInstance(inputFileGrouperClass, conf);
885      Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups =
886        customFileGrouper.getGroupedInputFiles(snapshotFiles);
887
888      LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass,
889        groups.size());
890      int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1);
891      LOG.info(
892        "Splitting each group into {} InputSplits, "
893          + "to achieve closest possible amount of mappers to target of {}",
894        mappersPerGroup, mappers);
895
896      // Within each group, create splits of equal size. Groups are not mixed together.
897      return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup))
898        .flatMap(Collection::stream).toList();
899    }
900
901    static class ExportSnapshotInputSplit extends InputSplit implements Writable {
902
903      private List<Pair<BytesWritable, Long>> files;
904      private String[] locations;
905      private long length;
906
907      public ExportSnapshotInputSplit() {
908        this.files = null;
909        this.locations = null;
910      }
911
912      public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles,
913        FileLocationResolver fileLocationResolver) {
914        this.files = new ArrayList<>(snapshotFiles.size());
915        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
916          this.files.add(
917            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
918          this.length += fileInfo.getSecond();
919        }
920        this.locations =
921          fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]);
922        LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, "
923          + "with location hints: {}", files, length, locations);
924      }
925
926      private List<Pair<BytesWritable, Long>> getSplitKeys() {
927        return files;
928      }
929
930      @Override
931      public long getLength() throws IOException, InterruptedException {
932        return length;
933      }
934
935      @Override
936      public String[] getLocations() throws IOException, InterruptedException {
937        return locations;
938      }
939
940      @Override
941      public void readFields(DataInput in) throws IOException {
942        int count = in.readInt();
943        files = new ArrayList<>(count);
944        length = 0;
945        for (int i = 0; i < count; ++i) {
946          BytesWritable fileInfo = new BytesWritable();
947          fileInfo.readFields(in);
948          long size = in.readLong();
949          files.add(new Pair<>(fileInfo, size));
950          length += size;
951        }
952        int locationCount = in.readInt();
953        List<String> locations = new ArrayList<>(locationCount);
954        for (int i = 0; i < locationCount; ++i) {
955          locations.add(in.readUTF());
956        }
957        this.locations = locations.toArray(new String[0]);
958      }
959
960      @Override
961      public void write(DataOutput out) throws IOException {
962        out.writeInt(files.size());
963        for (final Pair<BytesWritable, Long> fileInfo : files) {
964          fileInfo.getFirst().write(out);
965          out.writeLong(fileInfo.getSecond());
966        }
967        out.writeInt(locations.length);
968        for (String location : locations) {
969          out.writeUTF(location);
970        }
971      }
972    }
973
974    private static class ExportSnapshotRecordReader
975      extends RecordReader<BytesWritable, NullWritable> {
976      private final List<Pair<BytesWritable, Long>> files;
977      private long totalSize = 0;
978      private long procSize = 0;
979      private int index = -1;
980
981      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
982        this.files = files;
983        for (Pair<BytesWritable, Long> fileInfo : files) {
984          totalSize += fileInfo.getSecond();
985        }
986      }
987
988      @Override
989      public void close() {
990      }
991
992      @Override
993      public BytesWritable getCurrentKey() {
994        return files.get(index).getFirst();
995      }
996
997      @Override
998      public NullWritable getCurrentValue() {
999        return NullWritable.get();
1000      }
1001
1002      @Override
1003      public float getProgress() {
1004        return (float) procSize / totalSize;
1005      }
1006
1007      @Override
1008      public void initialize(InputSplit split, TaskAttemptContext tac) {
1009      }
1010
1011      @Override
1012      public boolean nextKeyValue() {
1013        if (index >= 0) {
1014          procSize += files.get(index).getSecond();
1015        }
1016        return (++index < files.size());
1017      }
1018    }
1019  }
1020
1021  // ==========================================================================
1022  // Tool
1023  // ==========================================================================
1024
1025  /**
1026   * Run Map-Reduce Job to perform the files copy.
1027   */
1028  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
1029    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
1030    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB,
1031    final String storagePolicy, final String customFileGrouper, final String fileLocationResolver)
1032    throws IOException, InterruptedException, ClassNotFoundException {
1033    Configuration conf = getConf();
1034    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
1035    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
1036    if (mappers > 0) {
1037      conf.setInt(CONF_NUM_SPLITS, mappers);
1038      conf.setInt(MR_NUM_MAPS, mappers);
1039    }
1040    conf.setInt(CONF_FILES_MODE, filesMode);
1041    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
1042    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
1043    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
1044    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
1045    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
1046    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
1047    if (storagePolicy != null) {
1048      for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) {
1049        conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue());
1050      }
1051    }
1052    if (customFileGrouper != null) {
1053      conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper);
1054    }
1055    if (fileLocationResolver != null) {
1056      conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver);
1057    }
1058
1059    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
1060    Job job = new Job(conf);
1061    job.setJobName(jobname);
1062    job.setJarByClass(ExportSnapshot.class);
1063    TableMapReduceUtil.addDependencyJars(job);
1064    job.setMapperClass(ExportMapper.class);
1065    job.setInputFormatClass(ExportSnapshotInputFormat.class);
1066    job.setOutputFormatClass(NullOutputFormat.class);
1067    job.setMapSpeculativeExecution(false);
1068    job.setNumReduceTasks(0);
1069
1070    // Acquire the delegation Tokens
1071    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1072    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
1073    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1074    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
1075
1076    // Run the MR Job
1077    if (!job.waitForCompletion(true)) {
1078      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
1079    }
1080  }
1081
1082  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
1083    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
1084    // Update the conf with the current root dir, since may be a different cluster
1085    Configuration conf = new Configuration(baseConf);
1086    CommonFSUtils.setRootDir(conf, rootDir);
1087    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
1088    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
1089      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
1090    if (isExpired) {
1091      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
1092    }
1093    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
1094  }
1095
1096  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
1097    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
1098    ExecutorService pool = Executors
1099      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1100    List<Future<Void>> futures = new ArrayList<>();
1101    for (Path dstPath : traversedPath) {
1102      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
1103      futures.add(future);
1104    }
1105    try {
1106      for (Future<Void> future : futures) {
1107        future.get();
1108      }
1109    } catch (InterruptedException | ExecutionException e) {
1110      throw new IOException(e);
1111    } finally {
1112      pool.shutdownNow();
1113    }
1114  }
1115
1116  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
1117    Configuration conf, List<Path> traversedPath) throws IOException {
1118    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1119      try {
1120        fs.setOwner(path, filesUser, filesGroup);
1121      } catch (IOException e) {
1122        throw new RuntimeException(
1123          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
1124      }
1125    }, conf);
1126  }
1127
1128  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
1129    final List<Path> traversedPath, final Configuration conf) throws IOException {
1130    if (filesMode <= 0) {
1131      return;
1132    }
1133    FsPermission perm = new FsPermission(filesMode);
1134    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1135      try {
1136        fs.setPermission(path, perm);
1137      } catch (IOException e) {
1138        throw new RuntimeException(
1139          "set permission for file " + path + " to " + filesMode + " failed", e);
1140      }
1141    }, conf);
1142  }
1143
1144  private Map<String, String> storagePolicyPerFamily(String storagePolicy) {
1145    Map<String, String> familyStoragePolicy = new HashMap<>();
1146    for (String familyConf : storagePolicy.split("&")) {
1147      String[] familySplit = familyConf.split("=");
1148      if (familySplit.length != 2) {
1149        continue;
1150      }
1151      // family is key, storage policy is value
1152      familyStoragePolicy.put(familySplit[0], familySplit[1]);
1153    }
1154    return familyStoragePolicy;
1155  }
1156
1157  private static String generateFamilyStoragePolicyKey(String family) {
1158    return CONF_STORAGE_POLICY + "." + family;
1159  }
1160
1161  private boolean verifyTarget = true;
1162  private boolean verifySource = true;
1163  private boolean verifyChecksum = true;
1164  private String snapshotName = null;
1165  private String targetName = null;
1166  private boolean overwrite = false;
1167  private String filesGroup = null;
1168  private String filesUser = null;
1169  private Path outputRoot = null;
1170  private Path inputRoot = null;
1171  private int bandwidthMB = Integer.MAX_VALUE;
1172  private int filesMode = 0;
1173  private int mappers = 0;
1174  private boolean resetTtl = false;
1175  private String storagePolicy = null;
1176  private String customFileGrouper = null;
1177  private String fileLocationResolver = null;
1178
1179  @Override
1180  protected void processOptions(CommandLine cmd) {
1181    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1182    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1183    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1184      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1185    }
1186    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1187      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1188    }
1189    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1190    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1191    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1192    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1193    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1194    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1195    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1196    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1197    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1198    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1199    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1200    if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) {
1201      storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt());
1202    }
1203    if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) {
1204      customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt());
1205    }
1206    if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) {
1207      fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt());
1208    }
1209  }
1210
1211  /**
1212   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1213   * @return 0 on success, and != 0 upon failure.
1214   */
1215  @Override
1216  public int doWork() throws IOException {
1217    Configuration conf = getConf();
1218
1219    // Check user options
1220    if (snapshotName == null) {
1221      System.err.println("Snapshot name not provided.");
1222      LOG.error("Use -h or --help for usage instructions.");
1223      return EXIT_FAILURE;
1224    }
1225
1226    if (outputRoot == null) {
1227      System.err
1228        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1229      LOG.error("Use -h or --help for usage instructions.");
1230      return EXIT_FAILURE;
1231    }
1232
1233    if (targetName == null) {
1234      targetName = snapshotName;
1235    }
1236    if (inputRoot == null) {
1237      inputRoot = CommonFSUtils.getRootDir(conf);
1238    } else {
1239      CommonFSUtils.setRootDir(conf, inputRoot);
1240    }
1241
1242    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1243    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1244    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1245    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1246    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1247      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1248    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1249    Path snapshotTmpDir =
1250      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1251    Path outputSnapshotDir =
1252      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1253    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1254    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1255    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1256      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1257
1258    // throw CorruptedSnapshotException if we can't read the snapshot info.
1259    SnapshotDescription sourceSnapshotDesc =
1260      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1261
1262    // Verify snapshot source before copying files
1263    if (verifySource) {
1264      LOG.info("Verify the source snapshot's expiration status and integrity.");
1265      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1266    }
1267
1268    // Find the necessary directory which need to change owner and group
1269    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1270    if (outputFs.exists(needSetOwnerDir)) {
1271      if (skipTmp) {
1272        needSetOwnerDir = outputSnapshotDir;
1273      } else {
1274        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1275        if (outputFs.exists(needSetOwnerDir)) {
1276          needSetOwnerDir = snapshotTmpDir;
1277        }
1278      }
1279    }
1280
1281    // Check if the snapshot already exists
1282    if (outputFs.exists(outputSnapshotDir)) {
1283      if (overwrite) {
1284        if (!outputFs.delete(outputSnapshotDir, true)) {
1285          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1286          return EXIT_FAILURE;
1287        }
1288      } else {
1289        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1290          + outputSnapshotDir);
1291        return EXIT_FAILURE;
1292      }
1293    }
1294
1295    if (!skipTmp) {
1296      // Check if the snapshot already in-progress
1297      if (outputFs.exists(snapshotTmpDir)) {
1298        if (overwrite) {
1299          if (!outputFs.delete(snapshotTmpDir, true)) {
1300            System.err
1301              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1302            return EXIT_FAILURE;
1303          }
1304        } else {
1305          System.err
1306            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1307          System.err
1308            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1309          System.err
1310            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1311          return EXIT_FAILURE;
1312        }
1313      }
1314    }
1315
1316    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1317    // The snapshot references must be copied before the hfiles otherwise the cleaner
1318    // will remove them because they are unreferenced.
1319    List<Path> travesedPaths = new ArrayList<>();
1320    boolean copySucceeded = false;
1321    try {
1322      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1323      travesedPaths =
1324        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1325          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1326      copySucceeded = true;
1327    } catch (IOException e) {
1328      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1329        + " to=" + initialOutputSnapshotDir, e);
1330    } finally {
1331      if (copySucceeded) {
1332        if (filesUser != null || filesGroup != null) {
1333          LOG.warn(
1334            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1335              + (filesGroup == null
1336                ? ""
1337                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1338          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1339        }
1340        if (filesMode > 0) {
1341          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1342          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1343        }
1344      }
1345    }
1346
1347    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1348    // reset TTL for target snapshot.
1349    if (!targetName.equals(snapshotName) || resetTtl) {
1350      SnapshotDescription.Builder snapshotDescBuilder =
1351        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1352      if (!targetName.equals(snapshotName)) {
1353        snapshotDescBuilder.setName(targetName);
1354      }
1355      if (resetTtl) {
1356        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1357      }
1358      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1359        initialOutputSnapshotDir, outputFs);
1360      if (filesUser != null || filesGroup != null) {
1361        outputFs.setOwner(
1362          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1363          filesGroup);
1364      }
1365      if (filesMode > 0) {
1366        outputFs.setPermission(
1367          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1368          new FsPermission((short) filesMode));
1369      }
1370    }
1371
1372    // Step 2 - Start MR Job to copy files
1373    // The snapshot references must be copied before the files otherwise the files gets removed
1374    // by the HFileArchiver, since they have no references.
1375    try {
1376      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1377        filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper,
1378        fileLocationResolver);
1379
1380      LOG.info("Finalize the Snapshot Export");
1381      if (!skipTmp) {
1382        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1383        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1384          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1385            + snapshotTmpDir + " to=" + outputSnapshotDir);
1386        }
1387      }
1388
1389      // Step 4 - Verify snapshot integrity
1390      if (verifyTarget) {
1391        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1392        SnapshotDescription targetSnapshotDesc =
1393          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1394        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1395      }
1396
1397      LOG.info("Export Completed: " + targetName);
1398      return EXIT_SUCCESS;
1399    } catch (Exception e) {
1400      LOG.error("Snapshot export failed", e);
1401      if (!skipTmp) {
1402        outputFs.delete(snapshotTmpDir, true);
1403      }
1404      outputFs.delete(outputSnapshotDir, true);
1405      return EXIT_FAILURE;
1406    }
1407  }
1408
1409  @Override
1410  protected void printUsage() {
1411    super.printUsage();
1412    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1413      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1414      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1415      + "  hbase snapshot export \\\n"
1416      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1417      + "    --copy-to hdfs://srv1:50070/hbase");
1418  }
1419
1420  @Override
1421  protected void addOptions() {
1422    addRequiredOption(Options.SNAPSHOT);
1423    addOption(Options.COPY_TO);
1424    addOption(Options.COPY_FROM);
1425    addOption(Options.TARGET_NAME);
1426    addOption(Options.NO_CHECKSUM_VERIFY);
1427    addOption(Options.NO_TARGET_VERIFY);
1428    addOption(Options.NO_SOURCE_VERIFY);
1429    addOption(Options.OVERWRITE);
1430    addOption(Options.CHUSER);
1431    addOption(Options.CHGROUP);
1432    addOption(Options.CHMOD);
1433    addOption(Options.MAPPERS);
1434    addOption(Options.BANDWIDTH);
1435    addOption(Options.RESET_TTL);
1436    addOption(Options.CUSTOM_FILE_GROUPER);
1437    addOption(Options.FILE_LOCATION_RESOLVER);
1438  }
1439
1440  public static void main(String[] args) {
1441    new ExportSnapshot().doStaticMain(args);
1442  }
1443}