001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.BufferedInputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.FileNotFoundException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.function.BiConsumer;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.FileChecksum;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.Path;
048import org.apache.hadoop.fs.permission.FsPermission;
049import org.apache.hadoop.hbase.HBaseConfiguration;
050import org.apache.hadoop.hbase.HConstants;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.io.FileLink;
054import org.apache.hadoop.hbase.io.HFileLink;
055import org.apache.hadoop.hbase.io.WALLink;
056import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
057import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
058import org.apache.hadoop.hbase.mob.MobUtils;
059import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
060import org.apache.hadoop.hbase.util.AbstractHBaseTool;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.FSUtils;
064import org.apache.hadoop.hbase.util.HFileArchiveUtil;
065import org.apache.hadoop.hbase.util.Pair;
066import org.apache.hadoop.hbase.util.Strings;
067import org.apache.hadoop.io.BytesWritable;
068import org.apache.hadoop.io.NullWritable;
069import org.apache.hadoop.io.Writable;
070import org.apache.hadoop.mapreduce.InputFormat;
071import org.apache.hadoop.mapreduce.InputSplit;
072import org.apache.hadoop.mapreduce.Job;
073import org.apache.hadoop.mapreduce.JobContext;
074import org.apache.hadoop.mapreduce.Mapper;
075import org.apache.hadoop.mapreduce.RecordReader;
076import org.apache.hadoop.mapreduce.TaskAttemptContext;
077import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
078import org.apache.hadoop.mapreduce.security.TokenCache;
079import org.apache.hadoop.util.ReflectionUtils;
080import org.apache.hadoop.util.StringUtils;
081import org.apache.hadoop.util.Tool;
082import org.apache.yetus.audience.InterfaceAudience;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
087import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
088import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
089import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
090
091import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
095
096/**
097 * Export the specified snapshot to a given FileSystem. The .snapshot/name folder is copied to the
098 * destination cluster and then all the hfiles/wals are copied using a Map-Reduce Job in the
099 * .archive/ location. When everything is done, the second cluster can restore the snapshot.
100 */
101@InterfaceAudience.Public
102public class ExportSnapshot extends AbstractHBaseTool implements Tool {
103  public static final String NAME = "exportsnapshot";
104  /** Configuration prefix for overrides for the source filesystem */
105  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
106  /** Configuration prefix for overrides for the destination filesystem */
107  public static final String CONF_DEST_PREFIX = NAME + ".to.";
108
109  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
110
111  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
112  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
113  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
114  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
115  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
116  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
117  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
118  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
119  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
120  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
121  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
122  private static final String CONF_REPORT_SIZE = "snapshot.export.report.size";
123  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
124  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
125  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
126  private static final String CONF_INPUT_FILE_GROUPER_CLASS =
127    "snapshot.export.input.file.grouper.class";
128  private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS =
129    "snapshot.export.input.file.location.resolver.class";
130  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
131  private static final String CONF_COPY_MANIFEST_THREADS =
132    "snapshot.export.copy.references.threads";
133  private static final int DEFAULT_COPY_MANIFEST_THREADS =
134    Runtime.getRuntime().availableProcessors();
135  private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family";
136
137  static class Testing {
138    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
139    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
140    int failuresCountToInject = 0;
141    int injectedFailureCount = 0;
142  }
143
144  // Command line options and defaults.
145  static final class Options {
146    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
147    static final Option TARGET_NAME =
148      new Option(null, "target", true, "Target name for the snapshot.");
149    static final Option COPY_TO =
150      new Option(null, "copy-to", true, "Remote " + "destination hdfs://");
151    static final Option COPY_FROM =
152      new Option(null, "copy-from", true, "Input folder hdfs:// (default hbase.rootdir)");
153    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
154      "Do not verify checksum, use name+length only.");
155    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
156      "Do not verify the exported snapshot's expiration status and integrity.");
157    static final Option NO_SOURCE_VERIFY = new Option(null, "no-source-verify", false,
158      "Do not verify the source snapshot's expiration status and integrity.");
159    static final Option OVERWRITE =
160      new Option(null, "overwrite", false, "Rewrite the snapshot manifest if already exists.");
161    static final Option CHUSER =
162      new Option(null, "chuser", true, "Change the owner of the files to the specified one.");
163    static final Option CHGROUP =
164      new Option(null, "chgroup", true, "Change the group of the files to the specified one.");
165    static final Option CHMOD =
166      new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
167    static final Option MAPPERS = new Option(null, "mappers", true,
168      "Number of mappers to use during the copy (mapreduce.job.maps). "
169        + "If you provide a --custom-file-grouper, "
170        + "then --mappers is interpreted as the number of mappers per group.");
171    static final Option BANDWIDTH =
172      new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
173    static final Option RESET_TTL =
174      new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
175    static final Option STORAGE_POLICY = new Option(null, "storage-policy", true,
176      "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD");
177    static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true,
178      "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. "
179        + "See JavaDoc on that class for more information.");
180    static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true,
181      "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. "
182        + "See JavaDoc on that class for more information.");
183  }
184
185  // Export Map-Reduce Counters, to keep track of the progress
186  public enum Counter {
187    MISSING_FILES,
188    FILES_COPIED,
189    FILES_SKIPPED,
190    COPY_FAILED,
191    BYTES_EXPECTED,
192    BYTES_SKIPPED,
193    BYTES_COPIED
194  }
195
196  /**
197   * Indicates the checksum comparison result.
198   */
199  public enum ChecksumComparison {
200    TRUE, // checksum comparison is compatible and true.
201    FALSE, // checksum comparison is compatible and false.
202    INCOMPATIBLE, // checksum comparison is not compatible.
203  }
204
205  /**
206   * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot
207   * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation
208   * must return a data structure that contains each input file exactly once. Files that appear in
209   * separate entries in the top-level returned Collection are guaranteed to not be placed in the
210   * same InputSplit. This can be used to segregate your input files by the rack or host on which
211   * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve
212   * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper
213   * argument with the fully qualified class name of an implementation of CustomFileGrouper that's
214   * on the classpath. If this argument is not used, no particular grouping logic will be applied.
215   */
216  @InterfaceAudience.Public
217  public interface CustomFileGrouper {
218    Collection<Collection<Pair<SnapshotFileInfo, Long>>>
219      getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles);
220  }
221
222  private static class NoopCustomFileGrouper implements CustomFileGrouper {
223    @Override
224    public Collection<Collection<Pair<SnapshotFileInfo, Long>>>
225      getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
226      return ImmutableList.of(snapshotFiles);
227    }
228  }
229
230  /**
231   * If desired, you may implement a FileLocationResolver in order to influence the _location_
232   * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The
233   * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit
234   * being constructed. Whatever is returned will ultimately be reported by that split's
235   * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the
236   * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the
237   * --file-location-resolver argument with the fully qualified class name of an implementation of
238   * FileLocationResolver that's on the classpath. If this argument is not used, no locations will
239   * be attached to the InputSplits.
240   */
241  @InterfaceAudience.Public
242  public interface FileLocationResolver {
243    Set<String> getLocationsForInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> files);
244  }
245
246  static class NoopFileLocationResolver implements FileLocationResolver {
247    @Override
248    public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) {
249      return ImmutableSet.of();
250    }
251  }
252
253  private static class ExportMapper
254    extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
255    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
256    final static int REPORT_SIZE = 1 * 1024 * 1024;
257    final static int BUFFER_SIZE = 64 * 1024;
258
259    private boolean verifyChecksum;
260    private String filesGroup;
261    private String filesUser;
262    private short filesMode;
263    private int bufferSize;
264    private int reportSize;
265
266    private FileSystem outputFs;
267    private Path outputArchive;
268    private Path outputRoot;
269
270    private FileSystem inputFs;
271    private Path inputArchive;
272    private Path inputRoot;
273
274    private static Testing testing = new Testing();
275
276    @Override
277    public void setup(Context context) throws IOException {
278      Configuration conf = context.getConfiguration();
279
280      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
281      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
282
283      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
284
285      filesGroup = conf.get(CONF_FILES_GROUP);
286      filesUser = conf.get(CONF_FILES_USER);
287      filesMode = (short) conf.getInt(CONF_FILES_MODE, 0);
288      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
289      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
290
291      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
292      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
293
294      try {
295        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
296      } catch (IOException e) {
297        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
298      }
299
300      try {
301        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
302      } catch (IOException e) {
303        throw new IOException("Could not get the output FileSystem with root=" + outputRoot, e);
304      }
305
306      // Use the default block size of the outputFs if bigger
307      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
308      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
309      LOG.info("Using bufferSize=" + Strings.humanReadableInt(bufferSize));
310      reportSize = conf.getInt(CONF_REPORT_SIZE, REPORT_SIZE);
311
312      for (Counter c : Counter.values()) {
313        context.getCounter(c).increment(0);
314      }
315      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
316        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
317        // Get number of times we have already injected failure based on attempt number of this
318        // task.
319        testing.injectedFailureCount = context.getTaskAttemptID().getId();
320      }
321    }
322
323    @Override
324    public void map(BytesWritable key, NullWritable value, Context context)
325      throws InterruptedException, IOException {
326      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
327      Path outputPath = getOutputPath(inputInfo);
328
329      copyFile(context, inputInfo, outputPath);
330      // inject failure
331      injectTestFailure(context, inputInfo);
332    }
333
334    /**
335     * Returns the location where the inputPath will be copied.
336     */
337    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
338      Path path = null;
339      switch (inputInfo.getType()) {
340        case HFILE:
341          Path inputPath = new Path(inputInfo.getHfile());
342          String family = inputPath.getParent().getName();
343          TableName table = HFileLink.getReferencedTableName(inputPath.getName());
344          String region = HFileLink.getReferencedRegionName(inputPath.getName());
345          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
346          path = new Path(CommonFSUtils.getTableDir(new Path("./"), table),
347            new Path(region, new Path(family, hfile)));
348          break;
349        case WAL:
350          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
351          break;
352        default:
353          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
354      }
355      return new Path(outputArchive, path);
356    }
357
358    /**
359     * Used by TestExportSnapshot to test for retries when failures happen. Failure is injected in
360     * {@link #map(BytesWritable, NullWritable, org.apache.hadoop.mapreduce.Mapper.Context)}
361     */
362    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
363      throws IOException {
364      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
365        return;
366      }
367      if (testing.injectedFailureCount >= testing.failuresCountToInject) {
368        return;
369      }
370      testing.injectedFailureCount++;
371      context.getCounter(Counter.COPY_FAILED).increment(1);
372      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
373      throw new IOException(String.format(
374        context.getTaskAttemptID() + " TEST FAILURE (%d of max %d): Unable to copy input=%s",
375        testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
376    }
377
378    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
379      final Path outputPath) throws IOException {
380      // Get the file information
381      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
382
383      // Verify if the output file exists and is the same that we want to copy
384      if (outputFs.exists(outputPath)) {
385        FileStatus outputStat = outputFs.getFileStatus(outputPath);
386        if (outputStat != null && sameFile(inputStat, outputStat)) {
387          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
388          context.getCounter(Counter.FILES_SKIPPED).increment(1);
389          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
390          return;
391        }
392      }
393
394      InputStream in = openSourceFile(context, inputInfo);
395      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
396      if (Integer.MAX_VALUE != bandwidthMB) {
397        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
398      }
399
400      Path inputPath = inputStat.getPath();
401      try {
402        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
403
404        // Ensure that the output folder is there and copy the file
405        createOutputPath(outputPath.getParent());
406        String family = new Path(inputInfo.getHfile()).getParent().getName();
407        String familyStoragePolicy = generateFamilyStoragePolicyKey(family);
408        if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) {
409          String key = context.getConfiguration().get(familyStoragePolicy);
410          LOG.info("Setting storage policy {} for {}", key, outputPath.getParent());
411          outputFs.setStoragePolicy(outputPath.getParent(), key);
412        }
413        FSDataOutputStream out = outputFs.create(outputPath, true);
414
415        long stime = EnvironmentEdgeManager.currentTime();
416        long totalBytesWritten =
417          copyData(context, inputPath, in, outputPath, out, inputStat.getLen());
418
419        // Verify the file length and checksum
420        verifyCopyResult(inputStat, outputFs.getFileStatus(outputPath));
421
422        long etime = EnvironmentEdgeManager.currentTime();
423        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
424        LOG.info("size=" + totalBytesWritten + " (" + Strings.humanReadableInt(totalBytesWritten)
425          + ")" + " time=" + StringUtils.formatTimeDiff(etime, stime) + String.format(" %.3fM/sec",
426            (totalBytesWritten / ((etime - stime) / 1000.0)) / 1048576.0));
427        context.getCounter(Counter.FILES_COPIED).increment(1);
428
429        // Try to Preserve attributes
430        if (!preserveAttributes(outputPath, inputStat)) {
431          LOG.warn("You may have to run manually chown on: " + outputPath);
432        }
433      } catch (IOException e) {
434        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
435        context.getCounter(Counter.COPY_FAILED).increment(1);
436        throw e;
437      }
438    }
439
440    /**
441     * Create the output folder and optionally set ownership.
442     */
443    private void createOutputPath(final Path path) throws IOException {
444      if (filesUser == null && filesGroup == null) {
445        outputFs.mkdirs(path);
446      } else {
447        Path parent = path.getParent();
448        if (!outputFs.exists(parent) && !parent.isRoot()) {
449          createOutputPath(parent);
450        }
451        outputFs.mkdirs(path);
452        if (filesUser != null || filesGroup != null) {
453          // override the owner when non-null user/group is specified
454          outputFs.setOwner(path, filesUser, filesGroup);
455        }
456        if (filesMode > 0) {
457          outputFs.setPermission(path, new FsPermission(filesMode));
458        }
459      }
460    }
461
462    /**
463     * Try to Preserve the files attribute selected by the user copying them from the source file
464     * This is only required when you are exporting as a different user than "hbase" or on a system
465     * that doesn't have the "hbase" user. This is not considered a blocking failure since the user
466     * can force a chmod with the user that knows is available on the system.
467     */
468    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
469      FileStatus stat;
470      try {
471        stat = outputFs.getFileStatus(path);
472      } catch (IOException e) {
473        LOG.warn("Unable to get the status for file=" + path);
474        return false;
475      }
476
477      try {
478        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
479          outputFs.setPermission(path, new FsPermission(filesMode));
480        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
481          outputFs.setPermission(path, refStat.getPermission());
482        }
483      } catch (IOException e) {
484        LOG.warn("Unable to set the permission for file=" + stat.getPath() + ": " + e.getMessage());
485        return false;
486      }
487
488      boolean hasRefStat = (refStat != null);
489      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
490      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
491      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
492        try {
493          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
494            outputFs.setOwner(path, user, group);
495          }
496        } catch (IOException e) {
497          LOG.warn(
498            "Unable to set the owner/group for file=" + stat.getPath() + ": " + e.getMessage());
499          LOG.warn("The user/group may not exist on the destination cluster: user=" + user
500            + " group=" + group);
501          return false;
502        }
503      }
504
505      return true;
506    }
507
508    private boolean stringIsNotEmpty(final String str) {
509      return str != null && !str.isEmpty();
510    }
511
512    private long copyData(final Context context, final Path inputPath, final InputStream in,
513      final Path outputPath, final FSDataOutputStream out, final long inputFileSize)
514      throws IOException {
515      final String statusMessage =
516        "copied %s/" + Strings.humanReadableInt(inputFileSize) + " (%.1f%%)";
517
518      try {
519        byte[] buffer = new byte[bufferSize];
520        long totalBytesWritten = 0;
521        int reportBytes = 0;
522        int bytesRead;
523
524        while ((bytesRead = in.read(buffer)) > 0) {
525          out.write(buffer, 0, bytesRead);
526          totalBytesWritten += bytesRead;
527          reportBytes += bytesRead;
528
529          if (reportBytes >= reportSize) {
530            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
531            context
532              .setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
533                (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath
534                + " to " + outputPath);
535            reportBytes = 0;
536          }
537        }
538
539        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
540        context.setStatus(String.format(statusMessage, Strings.humanReadableInt(totalBytesWritten),
541          (totalBytesWritten / (float) inputFileSize) * 100.0f) + " from " + inputPath + " to "
542          + outputPath);
543
544        return totalBytesWritten;
545      } finally {
546        out.close();
547        in.close();
548      }
549    }
550
551    /**
552     * Try to open the "source" file. Throws an IOException if the communication with the inputFs
553     * fail or if the file is not found.
554     */
555    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
556      throws IOException {
557      try {
558        Configuration conf = context.getConfiguration();
559        FileLink link = null;
560        switch (fileInfo.getType()) {
561          case HFILE:
562            Path inputPath = new Path(fileInfo.getHfile());
563            link = getFileLink(inputPath, conf);
564            break;
565          case WAL:
566            String serverName = fileInfo.getWalServer();
567            String logName = fileInfo.getWalName();
568            link = new WALLink(inputRoot, serverName, logName);
569            break;
570          default:
571            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
572        }
573        return link.open(inputFs);
574      } catch (IOException e) {
575        context.getCounter(Counter.MISSING_FILES).increment(1);
576        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
577        throw e;
578      }
579    }
580
581    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
582      throws IOException {
583      try {
584        Configuration conf = context.getConfiguration();
585        FileLink link = null;
586        switch (fileInfo.getType()) {
587          case HFILE:
588            Path inputPath = new Path(fileInfo.getHfile());
589            link = getFileLink(inputPath, conf);
590            break;
591          case WAL:
592            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
593            break;
594          default:
595            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
596        }
597        return link.getFileStatus(inputFs);
598      } catch (FileNotFoundException e) {
599        context.getCounter(Counter.MISSING_FILES).increment(1);
600        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
601        throw e;
602      } catch (IOException e) {
603        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
604        throw e;
605      }
606    }
607
608    private FileLink getFileLink(Path path, Configuration conf) throws IOException {
609      String regionName = HFileLink.getReferencedRegionName(path.getName());
610      TableName tableName = HFileLink.getReferencedTableName(path.getName());
611      if (MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
612        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
613          HFileArchiveUtil.getArchivePath(conf), path);
614      }
615      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
616    }
617
618    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
619      try {
620        return fs.getFileChecksum(path);
621      } catch (IOException e) {
622        LOG.warn("Unable to get checksum for file=" + path, e);
623        return null;
624      }
625    }
626
627    /**
628     * Utility to compare the file length and checksums for the paths specified.
629     */
630    private void verifyCopyResult(final FileStatus inputStat, final FileStatus outputStat)
631      throws IOException {
632      long inputLen = inputStat.getLen();
633      long outputLen = outputStat.getLen();
634      Path inputPath = inputStat.getPath();
635      Path outputPath = outputStat.getPath();
636
637      if (inputLen != outputLen) {
638        throw new IOException("Mismatch in length of input:" + inputPath + " (" + inputLen
639          + ") and output:" + outputPath + " (" + outputLen + ")");
640      }
641
642      // If length==0, we will skip checksum
643      if (inputLen != 0 && verifyChecksum) {
644        FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
645        FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
646
647        ChecksumComparison checksumComparison = verifyChecksum(inChecksum, outChecksum);
648        if (!checksumComparison.equals(ChecksumComparison.TRUE)) {
649          StringBuilder errMessage = new StringBuilder("Checksum mismatch between ")
650            .append(inputPath).append(" and ").append(outputPath).append(".");
651
652          boolean addSkipHint = false;
653          String inputScheme = inputFs.getScheme();
654          String outputScheme = outputFs.getScheme();
655          if (!inputScheme.equals(outputScheme)) {
656            errMessage.append(" Input and output filesystems are of different types.\n")
657              .append("Their checksum algorithms may be incompatible.");
658            addSkipHint = true;
659          } else if (inputStat.getBlockSize() != outputStat.getBlockSize()) {
660            errMessage.append(" Input and output differ in block-size.");
661            addSkipHint = true;
662          } else if (
663            inChecksum != null && outChecksum != null
664              && !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
665          ) {
666            errMessage.append(" Input and output checksum algorithms are of different types.");
667            addSkipHint = true;
668          }
669          if (addSkipHint) {
670            errMessage
671              .append(" You can choose file-level checksum validation via "
672                + "-Ddfs.checksum.combine.mode=COMPOSITE_CRC when block-sizes"
673                + " or filesystems are different.\n")
674              .append(" Or you can skip checksum-checks altogether with -no-checksum-verify,")
675              .append(
676                " for the table backup scenario, you should use -i option to skip checksum-checks.\n")
677              .append(" (NOTE: By skipping checksums, one runs the risk of "
678                + "masking data-corruption during file-transfer.)\n");
679          }
680          throw new IOException(errMessage.toString());
681        }
682      }
683    }
684
685    /**
686     * Utility to compare checksums
687     */
688    private ChecksumComparison verifyChecksum(final FileChecksum inChecksum,
689      final FileChecksum outChecksum) {
690      // If the input or output checksum is null, or the algorithms of input and output are not
691      // equal, that means there is no comparison
692      // and return not compatible. else if matched, return compatible with the matched result.
693      if (
694        inChecksum == null || outChecksum == null
695          || !inChecksum.getAlgorithmName().equals(outChecksum.getAlgorithmName())
696      ) {
697        return ChecksumComparison.INCOMPATIBLE;
698      } else if (inChecksum.equals(outChecksum)) {
699        return ChecksumComparison.TRUE;
700      }
701      return ChecksumComparison.FALSE;
702    }
703
704    /**
705     * Check if the two files are equal by looking at the file length, and at the checksum (if user
706     * has specified the verifyChecksum flag).
707     */
708    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
709      // Not matching length
710      if (inputStat.getLen() != outputStat.getLen()) return false;
711
712      // Mark files as equals, since user asked for no checksum verification
713      if (!verifyChecksum) return true;
714
715      // If checksums are not available, files are not the same.
716      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
717      if (inChecksum == null) return false;
718
719      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
720      if (outChecksum == null) return false;
721
722      return inChecksum.equals(outChecksum);
723    }
724  }
725
726  // ==========================================================================
727  // Input Format
728  // ==========================================================================
729
730  /**
731   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
732   * @return list of files referenced by the snapshot (pair of path and size)
733   */
734  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
735    final FileSystem fs, final Path snapshotDir) throws IOException {
736    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
737
738    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
739    final TableName table = TableName.valueOf(snapshotDesc.getTable());
740
741    // Get snapshot files
742    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
743    Set<String> addedFiles = new HashSet<>();
744    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
745      new SnapshotReferenceUtil.SnapshotVisitor() {
746        @Override
747        public void storeFile(final RegionInfo regionInfo, final String family,
748          final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
749          Pair<SnapshotFileInfo, Long> snapshotFileAndSize = null;
750          if (!storeFile.hasReference()) {
751            String region = regionInfo.getEncodedName();
752            String hfile = storeFile.getName();
753            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, region, family, hfile,
754              storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
755          } else {
756            Pair<String, String> referredToRegionAndFile =
757              StoreFileInfo.getReferredToRegionAndFile(storeFile.getName());
758            String referencedRegion = referredToRegionAndFile.getFirst();
759            String referencedHFile = referredToRegionAndFile.getSecond();
760            snapshotFileAndSize = getSnapshotFileAndSize(fs, conf, table, referencedRegion, family,
761              referencedHFile, storeFile.hasFileSize() ? storeFile.getFileSize() : -1);
762          }
763          String fileToExport = snapshotFileAndSize.getFirst().getHfile();
764          if (!addedFiles.contains(fileToExport)) {
765            files.add(snapshotFileAndSize);
766            addedFiles.add(fileToExport);
767          } else {
768            LOG.debug("Skip the existing file: {}.", fileToExport);
769          }
770        }
771      });
772
773    return files;
774  }
775
776  private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs,
777    Configuration conf, TableName table, String region, String family, String hfile, long size)
778    throws IOException {
779    Path path = HFileLink.createPath(table, region, family, hfile);
780    SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE)
781      .setHfile(path.toString()).build();
782    if (size == -1) {
783      size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
784    }
785    return new Pair<>(fileInfo, size);
786  }
787
788  /**
789   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
790   * The groups created will have similar amounts of bytes.
791   * <p>
792   * The algorithm used is pretty straightforward; the file list is sorted by size, and then each
793   * group fetch the bigger file available, iterating through groups alternating the direction.
794   */
795  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
796    final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int ngroups) {
797    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles);
798    // Sort files by size, from small to big
799    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
800      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
801        long r = a.getSecond() - b.getSecond();
802        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
803      }
804    });
805
806    // create balanced groups
807    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
808    int hi = files.size() - 1;
809    int lo = 0;
810
811    List<Pair<SnapshotFileInfo, Long>> group;
812    int dir = 1;
813    int g = 0;
814
815    while (hi >= lo) {
816      if (g == fileGroups.size()) {
817        group = new LinkedList<>();
818        fileGroups.add(group);
819      } else {
820        group = fileGroups.get(g);
821      }
822
823      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
824
825      // add the hi one
826      group.add(fileInfo);
827
828      // change direction when at the end or the beginning
829      g += dir;
830      if (g == ngroups) {
831        dir = -1;
832        g = ngroups - 1;
833      } else if (g < 0) {
834        dir = 1;
835        g = 0;
836      }
837    }
838
839    return fileGroups;
840  }
841
842  static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
843    @Override
844    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
845      TaskAttemptContext tac) throws IOException, InterruptedException {
846      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit) split).getSplitKeys());
847    }
848
849    @Override
850    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
851      Configuration conf = context.getConfiguration();
852      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
853      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
854
855      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
856
857      Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups =
858        groupFilesForSplits(conf, snapshotFiles);
859
860      Class<? extends FileLocationResolver> fileLocationResolverClass =
861        conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class,
862          FileLocationResolver.class);
863      FileLocationResolver fileLocationResolver =
864        ReflectionUtils.newInstance(fileLocationResolverClass, conf);
865      LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit",
866        fileLocationResolverClass);
867
868      List<InputSplit> splits = new ArrayList<>(balancedGroups.size());
869      for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) {
870        splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver));
871      }
872      return splits;
873    }
874
875    Collection<List<Pair<SnapshotFileInfo, Long>>> groupFilesForSplits(Configuration conf,
876      List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
877      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
878      if (mappers == 0 && !snapshotFiles.isEmpty()) {
879        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
880        mappers = Math.min(mappers, snapshotFiles.size());
881        conf.setInt(CONF_NUM_SPLITS, mappers);
882        conf.setInt(MR_NUM_MAPS, mappers);
883      }
884
885      Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass(
886        CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class);
887      CustomFileGrouper customFileGrouper =
888        ReflectionUtils.newInstance(inputFileGrouperClass, conf);
889      Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups =
890        customFileGrouper.getGroupedInputFiles(snapshotFiles);
891
892      LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass,
893        groups.size());
894      int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1);
895      LOG.info(
896        "Splitting each group into {} InputSplits, "
897          + "to achieve closest possible amount of mappers to target of {}",
898        mappersPerGroup, mappers);
899
900      // Within each group, create splits of equal size. Groups are not mixed together.
901      return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup))
902        .flatMap(Collection::stream).toList();
903    }
904
905    static class ExportSnapshotInputSplit extends InputSplit implements Writable {
906
907      private List<Pair<BytesWritable, Long>> files;
908      private String[] locations;
909      private long length;
910
911      public ExportSnapshotInputSplit() {
912        this.files = null;
913        this.locations = null;
914      }
915
916      public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles,
917        FileLocationResolver fileLocationResolver) {
918        this.files = new ArrayList<>(snapshotFiles.size());
919        for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
920          this.files.add(
921            new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
922          this.length += fileInfo.getSecond();
923        }
924        this.locations =
925          fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]);
926        LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, "
927          + "with location hints: {}", files, length, locations);
928      }
929
930      private List<Pair<BytesWritable, Long>> getSplitKeys() {
931        return files;
932      }
933
934      @Override
935      public long getLength() throws IOException, InterruptedException {
936        return length;
937      }
938
939      @Override
940      public String[] getLocations() throws IOException, InterruptedException {
941        return locations;
942      }
943
944      @Override
945      public void readFields(DataInput in) throws IOException {
946        int count = in.readInt();
947        files = new ArrayList<>(count);
948        length = 0;
949        for (int i = 0; i < count; ++i) {
950          BytesWritable fileInfo = new BytesWritable();
951          fileInfo.readFields(in);
952          long size = in.readLong();
953          files.add(new Pair<>(fileInfo, size));
954          length += size;
955        }
956        int locationCount = in.readInt();
957        List<String> locations = new ArrayList<>(locationCount);
958        for (int i = 0; i < locationCount; ++i) {
959          locations.add(in.readUTF());
960        }
961        this.locations = locations.toArray(new String[0]);
962      }
963
964      @Override
965      public void write(DataOutput out) throws IOException {
966        out.writeInt(files.size());
967        for (final Pair<BytesWritable, Long> fileInfo : files) {
968          fileInfo.getFirst().write(out);
969          out.writeLong(fileInfo.getSecond());
970        }
971        out.writeInt(locations.length);
972        for (String location : locations) {
973          out.writeUTF(location);
974        }
975      }
976    }
977
978    private static class ExportSnapshotRecordReader
979      extends RecordReader<BytesWritable, NullWritable> {
980      private final List<Pair<BytesWritable, Long>> files;
981      private long totalSize = 0;
982      private long procSize = 0;
983      private int index = -1;
984
985      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
986        this.files = files;
987        for (Pair<BytesWritable, Long> fileInfo : files) {
988          totalSize += fileInfo.getSecond();
989        }
990      }
991
992      @Override
993      public void close() {
994      }
995
996      @Override
997      public BytesWritable getCurrentKey() {
998        return files.get(index).getFirst();
999      }
1000
1001      @Override
1002      public NullWritable getCurrentValue() {
1003        return NullWritable.get();
1004      }
1005
1006      @Override
1007      public float getProgress() {
1008        return (float) procSize / totalSize;
1009      }
1010
1011      @Override
1012      public void initialize(InputSplit split, TaskAttemptContext tac) {
1013      }
1014
1015      @Override
1016      public boolean nextKeyValue() {
1017        if (index >= 0) {
1018          procSize += files.get(index).getSecond();
1019        }
1020        return (++index < files.size());
1021      }
1022    }
1023  }
1024
1025  // ==========================================================================
1026  // Tool
1027  // ==========================================================================
1028
1029  /**
1030   * Run Map-Reduce Job to perform the files copy.
1031   */
1032  private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
1033    final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
1034    final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB,
1035    final String storagePolicy, final String customFileGrouper, final String fileLocationResolver)
1036    throws IOException, InterruptedException, ClassNotFoundException {
1037    Configuration conf = getConf();
1038    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
1039    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
1040    if (mappers > 0) {
1041      conf.setInt(CONF_NUM_SPLITS, mappers);
1042      conf.setInt(MR_NUM_MAPS, mappers);
1043    }
1044    conf.setInt(CONF_FILES_MODE, filesMode);
1045    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
1046    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
1047    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
1048    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
1049    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
1050    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
1051    if (storagePolicy != null) {
1052      for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) {
1053        conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue());
1054      }
1055    }
1056    if (customFileGrouper != null) {
1057      conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper);
1058    }
1059    if (fileLocationResolver != null) {
1060      conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver);
1061    }
1062
1063    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
1064    Job job = new Job(conf);
1065    job.setJobName(jobname);
1066    job.setJarByClass(ExportSnapshot.class);
1067    TableMapReduceUtil.addDependencyJars(job);
1068    job.setMapperClass(ExportMapper.class);
1069    job.setInputFormatClass(ExportSnapshotInputFormat.class);
1070    job.setOutputFormatClass(NullOutputFormat.class);
1071    job.setMapSpeculativeExecution(false);
1072    job.setNumReduceTasks(0);
1073
1074    // Acquire the delegation Tokens
1075    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1076    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { inputRoot }, srcConf);
1077    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1078    TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { outputRoot }, destConf);
1079
1080    // Run the MR Job
1081    if (!job.waitForCompletion(true)) {
1082      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
1083    }
1084  }
1085
1086  private void verifySnapshot(final SnapshotDescription snapshotDesc, final Configuration baseConf,
1087    final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
1088    // Update the conf with the current root dir, since may be a different cluster
1089    Configuration conf = new Configuration(baseConf);
1090    CommonFSUtils.setRootDir(conf, rootDir);
1091    CommonFSUtils.setFsDefault(conf, CommonFSUtils.getRootDir(conf));
1092    boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(),
1093      snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime());
1094    if (isExpired) {
1095      throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc));
1096    }
1097    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
1098  }
1099
1100  private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
1101    BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
1102    ExecutorService pool = Executors
1103      .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1104    List<Future<Void>> futures = new ArrayList<>();
1105    for (Path dstPath : traversedPath) {
1106      Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
1107      futures.add(future);
1108    }
1109    try {
1110      for (Future<Void> future : futures) {
1111        future.get();
1112      }
1113    } catch (InterruptedException | ExecutionException e) {
1114      throw new IOException(e);
1115    } finally {
1116      pool.shutdownNow();
1117    }
1118  }
1119
1120  private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
1121    Configuration conf, List<Path> traversedPath) throws IOException {
1122    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1123      try {
1124        fs.setOwner(path, filesUser, filesGroup);
1125      } catch (IOException e) {
1126        throw new RuntimeException(
1127          "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed", e);
1128      }
1129    }, conf);
1130  }
1131
1132  private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
1133    final List<Path> traversedPath, final Configuration conf) throws IOException {
1134    if (filesMode <= 0) {
1135      return;
1136    }
1137    FsPermission perm = new FsPermission(filesMode);
1138    setConfigParallel(outputFs, traversedPath, (fs, path) -> {
1139      try {
1140        fs.setPermission(path, perm);
1141      } catch (IOException e) {
1142        throw new RuntimeException(
1143          "set permission for file " + path + " to " + filesMode + " failed", e);
1144      }
1145    }, conf);
1146  }
1147
1148  private Map<String, String> storagePolicyPerFamily(String storagePolicy) {
1149    Map<String, String> familyStoragePolicy = new HashMap<>();
1150    for (String familyConf : storagePolicy.split("&")) {
1151      String[] familySplit = familyConf.split("=");
1152      if (familySplit.length != 2) {
1153        continue;
1154      }
1155      // family is key, storage policy is value
1156      familyStoragePolicy.put(familySplit[0], familySplit[1]);
1157    }
1158    return familyStoragePolicy;
1159  }
1160
1161  private static String generateFamilyStoragePolicyKey(String family) {
1162    return CONF_STORAGE_POLICY + "." + family;
1163  }
1164
1165  private boolean verifyTarget = true;
1166  private boolean verifySource = true;
1167  private boolean verifyChecksum = true;
1168  private String snapshotName = null;
1169  private String targetName = null;
1170  private boolean overwrite = false;
1171  private String filesGroup = null;
1172  private String filesUser = null;
1173  private Path outputRoot = null;
1174  private Path inputRoot = null;
1175  private int bandwidthMB = Integer.MAX_VALUE;
1176  private int filesMode = 0;
1177  private int mappers = 0;
1178  private boolean resetTtl = false;
1179  private String storagePolicy = null;
1180  private String customFileGrouper = null;
1181  private String fileLocationResolver = null;
1182
1183  @Override
1184  protected void processOptions(CommandLine cmd) {
1185    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
1186    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
1187    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
1188      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
1189    }
1190    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
1191      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
1192    }
1193    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
1194    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
1195    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
1196    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode, 8);
1197    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
1198    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
1199    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
1200    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
1201    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
1202    verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
1203    resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1204    if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) {
1205      storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt());
1206    }
1207    if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) {
1208      customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt());
1209    }
1210    if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) {
1211      fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt());
1212    }
1213  }
1214
1215  /**
1216   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
1217   * @return 0 on success, and != 0 upon failure.
1218   */
1219  @Override
1220  public int doWork() throws IOException {
1221    Configuration conf = getConf();
1222
1223    // Check user options
1224    if (snapshotName == null) {
1225      System.err.println("Snapshot name not provided.");
1226      LOG.error("Use -h or --help for usage instructions.");
1227      return EXIT_FAILURE;
1228    }
1229
1230    if (outputRoot == null) {
1231      System.err
1232        .println("Destination file-system (--" + Options.COPY_TO.getLongOpt() + ") not provided.");
1233      LOG.error("Use -h or --help for usage instructions.");
1234      return EXIT_FAILURE;
1235    }
1236
1237    if (targetName == null) {
1238      targetName = snapshotName;
1239    }
1240    if (inputRoot == null) {
1241      inputRoot = CommonFSUtils.getRootDir(conf);
1242    } else {
1243      CommonFSUtils.setRootDir(conf, inputRoot);
1244    }
1245
1246    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
1247    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
1248    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
1249    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
1250    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false)
1251      || conf.get(SnapshotDescriptionUtils.SNAPSHOT_WORKING_DIR) != null;
1252    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
1253    Path snapshotTmpDir =
1254      SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot, destConf);
1255    Path outputSnapshotDir =
1256      SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
1257    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
1258    LOG.debug("inputFs={}, inputRoot={}", inputFs.getUri().toString(), inputRoot);
1259    LOG.debug("outputFs={}, outputRoot={}, skipTmp={}, initialOutputSnapshotDir={}", outputFs,
1260      outputRoot.toString(), skipTmp, initialOutputSnapshotDir);
1261
1262    // throw CorruptedSnapshotException if we can't read the snapshot info.
1263    SnapshotDescription sourceSnapshotDesc =
1264      SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir);
1265
1266    // Verify snapshot source before copying files
1267    if (verifySource) {
1268      LOG.info("Verify the source snapshot's expiration status and integrity.");
1269      verifySnapshot(sourceSnapshotDesc, srcConf, inputFs, inputRoot, snapshotDir);
1270    }
1271
1272    // Find the necessary directory which need to change owner and group
1273    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
1274    if (outputFs.exists(needSetOwnerDir)) {
1275      if (skipTmp) {
1276        needSetOwnerDir = outputSnapshotDir;
1277      } else {
1278        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot, destConf);
1279        if (outputFs.exists(needSetOwnerDir)) {
1280          needSetOwnerDir = snapshotTmpDir;
1281        }
1282      }
1283    }
1284
1285    // Check if the snapshot already exists
1286    if (outputFs.exists(outputSnapshotDir)) {
1287      if (overwrite) {
1288        if (!outputFs.delete(outputSnapshotDir, true)) {
1289          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
1290          return EXIT_FAILURE;
1291        }
1292      } else {
1293        System.err.println("The snapshot '" + targetName + "' already exists in the destination: "
1294          + outputSnapshotDir);
1295        return EXIT_FAILURE;
1296      }
1297    }
1298
1299    if (!skipTmp) {
1300      // Check if the snapshot already in-progress
1301      if (outputFs.exists(snapshotTmpDir)) {
1302        if (overwrite) {
1303          if (!outputFs.delete(snapshotTmpDir, true)) {
1304            System.err
1305              .println("Unable to remove existing snapshot tmp directory: " + snapshotTmpDir);
1306            return EXIT_FAILURE;
1307          }
1308        } else {
1309          System.err
1310            .println("A snapshot with the same name '" + targetName + "' may be in-progress");
1311          System.err
1312            .println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
1313          System.err
1314            .println("consider removing " + snapshotTmpDir + " by using the -overwrite option");
1315          return EXIT_FAILURE;
1316        }
1317      }
1318    }
1319
1320    // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
1321    // The snapshot references must be copied before the hfiles otherwise the cleaner
1322    // will remove them because they are unreferenced.
1323    List<Path> travesedPaths = new ArrayList<>();
1324    boolean copySucceeded = false;
1325    try {
1326      LOG.info("Copy Snapshot Manifest from " + snapshotDir + " to " + initialOutputSnapshotDir);
1327      travesedPaths =
1328        FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
1329          conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
1330      copySucceeded = true;
1331    } catch (IOException e) {
1332      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir
1333        + " to=" + initialOutputSnapshotDir, e);
1334    } finally {
1335      if (copySucceeded) {
1336        if (filesUser != null || filesGroup != null) {
1337          LOG.warn(
1338            (filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " + filesUser)
1339              + (filesGroup == null
1340                ? ""
1341                : ", Change the group of " + needSetOwnerDir + " to " + filesGroup));
1342          setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
1343        }
1344        if (filesMode > 0) {
1345          LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1346          setPermissionParallel(outputFs, (short) filesMode, travesedPaths, conf);
1347        }
1348      }
1349    }
1350
1351    // Write a new .snapshotinfo if the target name is different from the source name or we want to
1352    // reset TTL for target snapshot.
1353    if (!targetName.equals(snapshotName) || resetTtl) {
1354      SnapshotDescription.Builder snapshotDescBuilder =
1355        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir).toBuilder();
1356      if (!targetName.equals(snapshotName)) {
1357        snapshotDescBuilder.setName(targetName);
1358      }
1359      if (resetTtl) {
1360        snapshotDescBuilder.setTtl(HConstants.DEFAULT_SNAPSHOT_TTL);
1361      }
1362      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDescBuilder.build(),
1363        initialOutputSnapshotDir, outputFs);
1364      if (filesUser != null || filesGroup != null) {
1365        outputFs.setOwner(
1366          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser,
1367          filesGroup);
1368      }
1369      if (filesMode > 0) {
1370        outputFs.setPermission(
1371          new Path(initialOutputSnapshotDir, SnapshotDescriptionUtils.SNAPSHOTINFO_FILE),
1372          new FsPermission((short) filesMode));
1373      }
1374    }
1375
1376    // Step 2 - Start MR Job to copy files
1377    // The snapshot references must be copied before the files otherwise the files gets removed
1378    // by the HFileArchiver, since they have no references.
1379    try {
1380      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1381        filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper,
1382        fileLocationResolver);
1383
1384      LOG.info("Finalize the Snapshot Export");
1385      if (!skipTmp) {
1386        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1387        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1388          throw new ExportSnapshotException("Unable to rename snapshot directory from="
1389            + snapshotTmpDir + " to=" + outputSnapshotDir);
1390        }
1391      }
1392
1393      // Step 4 - Verify snapshot integrity
1394      if (verifyTarget) {
1395        LOG.info("Verify the exported snapshot's expiration status and integrity.");
1396        SnapshotDescription targetSnapshotDesc =
1397          SnapshotDescriptionUtils.readSnapshotInfo(outputFs, outputSnapshotDir);
1398        verifySnapshot(targetSnapshotDesc, destConf, outputFs, outputRoot, outputSnapshotDir);
1399      }
1400
1401      LOG.info("Export Completed: " + targetName);
1402      return EXIT_SUCCESS;
1403    } catch (Exception e) {
1404      LOG.error("Snapshot export failed", e);
1405      if (!skipTmp) {
1406        outputFs.delete(snapshotTmpDir, true);
1407      }
1408      outputFs.delete(outputSnapshotDir, true);
1409      return EXIT_FAILURE;
1410    }
1411  }
1412
1413  @Override
1414  protected void printUsage() {
1415    super.printUsage();
1416    System.out.println("\n" + "Examples:\n" + "  hbase snapshot export \\\n"
1417      + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1418      + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" + "\n"
1419      + "  hbase snapshot export \\\n"
1420      + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1421      + "    --copy-to hdfs://srv1:50070/hbase");
1422  }
1423
1424  @Override
1425  protected void addOptions() {
1426    addRequiredOption(Options.SNAPSHOT);
1427    addOption(Options.COPY_TO);
1428    addOption(Options.COPY_FROM);
1429    addOption(Options.TARGET_NAME);
1430    addOption(Options.NO_CHECKSUM_VERIFY);
1431    addOption(Options.NO_TARGET_VERIFY);
1432    addOption(Options.NO_SOURCE_VERIFY);
1433    addOption(Options.OVERWRITE);
1434    addOption(Options.CHUSER);
1435    addOption(Options.CHGROUP);
1436    addOption(Options.CHMOD);
1437    addOption(Options.MAPPERS);
1438    addOption(Options.BANDWIDTH);
1439    addOption(Options.RESET_TTL);
1440    addOption(Options.CUSTOM_FILE_GROUPER);
1441    addOption(Options.FILE_LOCATION_RESOLVER);
1442  }
1443
1444  public static void main(String[] args) {
1445    new ExportSnapshot().doStaticMain(args);
1446  }
1447}