001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.snapshot;
020
021import java.io.BufferedInputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.Comparator;
030import java.util.LinkedList;
031import java.util.List;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileChecksum;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.FileUtil;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.permission.FsPermission;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.io.FileLink;
047import org.apache.hadoop.hbase.io.HFileLink;
048import org.apache.hadoop.hbase.io.WALLink;
049import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
051import org.apache.hadoop.hbase.mob.MobUtils;
052import org.apache.hadoop.hbase.util.AbstractHBaseTool;
053import org.apache.hadoop.hbase.util.FSUtils;
054import org.apache.hadoop.hbase.util.HFileArchiveUtil;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.io.BytesWritable;
057import org.apache.hadoop.io.IOUtils;
058import org.apache.hadoop.io.NullWritable;
059import org.apache.hadoop.io.Writable;
060import org.apache.hadoop.mapreduce.InputFormat;
061import org.apache.hadoop.mapreduce.InputSplit;
062import org.apache.hadoop.mapreduce.Job;
063import org.apache.hadoop.mapreduce.JobContext;
064import org.apache.hadoop.mapreduce.Mapper;
065import org.apache.hadoop.mapreduce.RecordReader;
066import org.apache.hadoop.mapreduce.TaskAttemptContext;
067import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
068import org.apache.hadoop.mapreduce.security.TokenCache;
069import org.apache.hadoop.util.StringUtils;
070import org.apache.hadoop.util.Tool;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
075import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
079
080/**
081 * Export the specified snapshot to a given FileSystem.
082 *
083 * The .snapshot/name folder is copied to the destination cluster
084 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
085 * When everything is done, the second cluster can restore the snapshot.
086 */
087@InterfaceAudience.Public
088public class ExportSnapshot extends AbstractHBaseTool implements Tool {
089  public static final String NAME = "exportsnapshot";
090  /** Configuration prefix for overrides for the source filesystem */
091  public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
092  /** Configuration prefix for overrides for the destination filesystem */
093  public static final String CONF_DEST_PREFIX = NAME + ".to.";
094
095  private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class);
096
097  private static final String MR_NUM_MAPS = "mapreduce.job.maps";
098  private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
099  private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100  private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101  private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102  private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103  private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104  private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105  private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106  private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107  private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108  private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110  private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
111  protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
112
113  static class Testing {
114    static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
115    static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count";
116    int failuresCountToInject = 0;
117    int injectedFailureCount = 0;
118  }
119
120  // Command line options and defaults.
121  static final class Options {
122    static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore.");
123    static final Option TARGET_NAME = new Option(null, "target", true,
124        "Target name for the snapshot.");
125    static final Option COPY_TO = new Option(null, "copy-to", true, "Remote "
126        + "destination hdfs://");
127    static final Option COPY_FROM = new Option(null, "copy-from", true,
128        "Input folder hdfs:// (default hbase.rootdir)");
129    static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false,
130        "Do not verify checksum, use name+length only.");
131    static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false,
132        "Do not verify the integrity of the exported snapshot.");
133    static final Option OVERWRITE = new Option(null, "overwrite", false,
134        "Rewrite the snapshot manifest if already exists.");
135    static final Option CHUSER = new Option(null, "chuser", true,
136        "Change the owner of the files to the specified one.");
137    static final Option CHGROUP = new Option(null, "chgroup", true,
138        "Change the group of the files to the specified one.");
139    static final Option CHMOD = new Option(null, "chmod", true,
140        "Change the permission of the files to the specified one.");
141    static final Option MAPPERS = new Option(null, "mappers", true,
142        "Number of mappers to use during the copy (mapreduce.job.maps).");
143    static final Option BANDWIDTH = new Option(null, "bandwidth", true,
144        "Limit bandwidth to this value in MB/second.");
145  }
146
147  // Export Map-Reduce Counters, to keep track of the progress
148  public enum Counter {
149    MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
150    BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
151  }
152
153  private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
154                                                   NullWritable, NullWritable> {
155    private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
156    final static int REPORT_SIZE = 1 * 1024 * 1024;
157    final static int BUFFER_SIZE = 64 * 1024;
158
159    private boolean verifyChecksum;
160    private String filesGroup;
161    private String filesUser;
162    private short filesMode;
163    private int bufferSize;
164
165    private FileSystem outputFs;
166    private Path outputArchive;
167    private Path outputRoot;
168
169    private FileSystem inputFs;
170    private Path inputArchive;
171    private Path inputRoot;
172
173    private static Testing testing = new Testing();
174
175    @Override
176    public void setup(Context context) throws IOException {
177      Configuration conf = context.getConfiguration();
178
179      Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
180      Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
181
182      verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
183
184      filesGroup = conf.get(CONF_FILES_GROUP);
185      filesUser = conf.get(CONF_FILES_USER);
186      filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
187      outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
188      inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
189
190      inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
191      outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
192
193      try {
194        srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
195        inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
196      } catch (IOException e) {
197        throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
198      }
199
200      try {
201        destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
202        outputFs = FileSystem.get(outputRoot.toUri(), destConf);
203      } catch (IOException e) {
204        throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
205      }
206
207      // Use the default block size of the outputFs if bigger
208      int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
209      bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
210      LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
211
212      for (Counter c : Counter.values()) {
213        context.getCounter(c).increment(0);
214      }
215      if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
216        testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);
217        // Get number of times we have already injected failure based on attempt number of this
218        // task.
219        testing.injectedFailureCount = context.getTaskAttemptID().getId();
220      }
221    }
222
223    @Override
224    protected void cleanup(Context context) {
225      IOUtils.closeStream(inputFs);
226      IOUtils.closeStream(outputFs);
227    }
228
229    @Override
230    public void map(BytesWritable key, NullWritable value, Context context)
231        throws InterruptedException, IOException {
232      SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
233      Path outputPath = getOutputPath(inputInfo);
234
235      copyFile(context, inputInfo, outputPath);
236    }
237
238    /**
239     * Returns the location where the inputPath will be copied.
240     */
241    private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
242      Path path = null;
243      switch (inputInfo.getType()) {
244        case HFILE:
245          Path inputPath = new Path(inputInfo.getHfile());
246          String family = inputPath.getParent().getName();
247          TableName table =HFileLink.getReferencedTableName(inputPath.getName());
248          String region = HFileLink.getReferencedRegionName(inputPath.getName());
249          String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
250          path = new Path(FSUtils.getTableDir(new Path("./"), table),
251              new Path(region, new Path(family, hfile)));
252          break;
253        case WAL:
254          LOG.warn("snapshot does not keeps WALs: " + inputInfo);
255          break;
256        default:
257          throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
258      }
259      return new Path(outputArchive, path);
260    }
261
262    @SuppressWarnings("checkstyle:linelength")
263    /**
264     * Used by TestExportSnapshot to test for retries when failures happen.
265     * Failure is injected in {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}.
266     */
267    private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
268        throws IOException {
269      if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return;
270      if (testing.injectedFailureCount >= testing.failuresCountToInject) return;
271      testing.injectedFailureCount++;
272      context.getCounter(Counter.COPY_FAILED).increment(1);
273      LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount);
274      throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s",
275          testing.injectedFailureCount, testing.failuresCountToInject, inputInfo));
276    }
277
278    private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
279        final Path outputPath) throws IOException {
280      // Get the file information
281      FileStatus inputStat = getSourceFileStatus(context, inputInfo);
282
283      // Verify if the output file exists and is the same that we want to copy
284      if (outputFs.exists(outputPath)) {
285        FileStatus outputStat = outputFs.getFileStatus(outputPath);
286        if (outputStat != null && sameFile(inputStat, outputStat)) {
287          LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
288          context.getCounter(Counter.FILES_SKIPPED).increment(1);
289          context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
290          return;
291        }
292      }
293
294      InputStream in = openSourceFile(context, inputInfo);
295      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
296      if (Integer.MAX_VALUE != bandwidthMB) {
297        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
298      }
299
300      try {
301        context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
302
303        // Ensure that the output folder is there and copy the file
304        createOutputPath(outputPath.getParent());
305        FSDataOutputStream out = outputFs.create(outputPath, true);
306        try {
307          copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
308        } finally {
309          out.close();
310        }
311
312        // Try to Preserve attributes
313        if (!preserveAttributes(outputPath, inputStat)) {
314          LOG.warn("You may have to run manually chown on: " + outputPath);
315        }
316      } finally {
317        in.close();
318        injectTestFailure(context, inputInfo);
319      }
320    }
321
322    /**
323     * Create the output folder and optionally set ownership.
324     */
325    private void createOutputPath(final Path path) throws IOException {
326      if (filesUser == null && filesGroup == null) {
327        outputFs.mkdirs(path);
328      } else {
329        Path parent = path.getParent();
330        if (!outputFs.exists(parent) && !parent.isRoot()) {
331          createOutputPath(parent);
332        }
333        outputFs.mkdirs(path);
334        if (filesUser != null || filesGroup != null) {
335          // override the owner when non-null user/group is specified
336          outputFs.setOwner(path, filesUser, filesGroup);
337        }
338        if (filesMode > 0) {
339          outputFs.setPermission(path, new FsPermission(filesMode));
340        }
341      }
342    }
343
344    /**
345     * Try to Preserve the files attribute selected by the user copying them from the source file
346     * This is only required when you are exporting as a different user than "hbase" or on a system
347     * that doesn't have the "hbase" user.
348     *
349     * This is not considered a blocking failure since the user can force a chmod with the user
350     * that knows is available on the system.
351     */
352    private boolean preserveAttributes(final Path path, final FileStatus refStat) {
353      FileStatus stat;
354      try {
355        stat = outputFs.getFileStatus(path);
356      } catch (IOException e) {
357        LOG.warn("Unable to get the status for file=" + path);
358        return false;
359      }
360
361      try {
362        if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
363          outputFs.setPermission(path, new FsPermission(filesMode));
364        } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
365          outputFs.setPermission(path, refStat.getPermission());
366        }
367      } catch (IOException e) {
368        LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
369        return false;
370      }
371
372      boolean hasRefStat = (refStat != null);
373      String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
374      String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
375      if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
376        try {
377          if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
378            outputFs.setOwner(path, user, group);
379          }
380        } catch (IOException e) {
381          LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
382          LOG.warn("The user/group may not exist on the destination cluster: user=" +
383                   user + " group=" + group);
384          return false;
385        }
386      }
387
388      return true;
389    }
390
391    private boolean stringIsNotEmpty(final String str) {
392      return str != null && str.length() > 0;
393    }
394
395    private void copyData(final Context context,
396        final Path inputPath, final InputStream in,
397        final Path outputPath, final FSDataOutputStream out,
398        final long inputFileSize)
399        throws IOException {
400      final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
401                                   " (%.1f%%)";
402
403      try {
404        byte[] buffer = new byte[bufferSize];
405        long totalBytesWritten = 0;
406        int reportBytes = 0;
407        int bytesRead;
408
409        long stime = System.currentTimeMillis();
410        while ((bytesRead = in.read(buffer)) > 0) {
411          out.write(buffer, 0, bytesRead);
412          totalBytesWritten += bytesRead;
413          reportBytes += bytesRead;
414
415          if (reportBytes >= REPORT_SIZE) {
416            context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
417            context.setStatus(String.format(statusMessage,
418                              StringUtils.humanReadableInt(totalBytesWritten),
419                              (totalBytesWritten/(float)inputFileSize) * 100.0f) +
420                              " from " + inputPath + " to " + outputPath);
421            reportBytes = 0;
422          }
423        }
424        long etime = System.currentTimeMillis();
425
426        context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
427        context.setStatus(String.format(statusMessage,
428                          StringUtils.humanReadableInt(totalBytesWritten),
429                          (totalBytesWritten/(float)inputFileSize) * 100.0f) +
430                          " from " + inputPath + " to " + outputPath);
431
432        // Verify that the written size match
433        if (totalBytesWritten != inputFileSize) {
434          String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
435                       " expected=" + inputFileSize + " for file=" + inputPath;
436          throw new IOException(msg);
437        }
438
439        LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
440        LOG.info("size=" + totalBytesWritten +
441            " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
442            " time=" + StringUtils.formatTimeDiff(etime, stime) +
443            String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
444        context.getCounter(Counter.FILES_COPIED).increment(1);
445      } catch (IOException e) {
446        LOG.error("Error copying " + inputPath + " to " + outputPath, e);
447        context.getCounter(Counter.COPY_FAILED).increment(1);
448        throw e;
449      }
450    }
451
452    /**
453     * Try to open the "source" file.
454     * Throws an IOException if the communication with the inputFs fail or
455     * if the file is not found.
456     */
457    private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
458            throws IOException {
459      try {
460        Configuration conf = context.getConfiguration();
461        FileLink link = null;
462        switch (fileInfo.getType()) {
463          case HFILE:
464            Path inputPath = new Path(fileInfo.getHfile());
465            link = getFileLink(inputPath, conf);
466            break;
467          case WAL:
468            String serverName = fileInfo.getWalServer();
469            String logName = fileInfo.getWalName();
470            link = new WALLink(inputRoot, serverName, logName);
471            break;
472          default:
473            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
474        }
475        return link.open(inputFs);
476      } catch (IOException e) {
477        context.getCounter(Counter.MISSING_FILES).increment(1);
478        LOG.error("Unable to open source file=" + fileInfo.toString(), e);
479        throw e;
480      }
481    }
482
483    private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
484        throws IOException {
485      try {
486        Configuration conf = context.getConfiguration();
487        FileLink link = null;
488        switch (fileInfo.getType()) {
489          case HFILE:
490            Path inputPath = new Path(fileInfo.getHfile());
491            link = getFileLink(inputPath, conf);
492            break;
493          case WAL:
494            link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
495            break;
496          default:
497            throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
498        }
499        return link.getFileStatus(inputFs);
500      } catch (FileNotFoundException e) {
501        context.getCounter(Counter.MISSING_FILES).increment(1);
502        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
503        throw e;
504      } catch (IOException e) {
505        LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
506        throw e;
507      }
508    }
509
510    private FileLink getFileLink(Path path, Configuration conf) throws IOException{
511      String regionName = HFileLink.getReferencedRegionName(path.getName());
512      TableName tableName = HFileLink.getReferencedTableName(path.getName());
513      if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
514        return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
515                HFileArchiveUtil.getArchivePath(conf), path);
516      }
517      return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
518    }
519
520    private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
521      try {
522        return fs.getFileChecksum(path);
523      } catch (IOException e) {
524        LOG.warn("Unable to get checksum for file=" + path, e);
525        return null;
526      }
527    }
528
529    /**
530     * Check if the two files are equal by looking at the file length,
531     * and at the checksum (if user has specified the verifyChecksum flag).
532     */
533    private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
534      // Not matching length
535      if (inputStat.getLen() != outputStat.getLen()) return false;
536
537      // Mark files as equals, since user asked for no checksum verification
538      if (!verifyChecksum) return true;
539
540      // If checksums are not available, files are not the same.
541      FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
542      if (inChecksum == null) return false;
543
544      FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
545      if (outChecksum == null) return false;
546
547      return inChecksum.equals(outChecksum);
548    }
549  }
550
551  // ==========================================================================
552  //  Input Format
553  // ==========================================================================
554
555  /**
556   * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
557   * @return list of files referenced by the snapshot (pair of path and size)
558   */
559  private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
560      final FileSystem fs, final Path snapshotDir) throws IOException {
561    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
562
563    final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>();
564    final TableName table = TableName.valueOf(snapshotDesc.getTable());
565
566    // Get snapshot files
567    LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
568    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
569      new SnapshotReferenceUtil.SnapshotVisitor() {
570        @Override
571        public void storeFile(final RegionInfo regionInfo, final String family,
572            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
573          // for storeFile.hasReference() case, copied as part of the manifest
574          if (!storeFile.hasReference()) {
575            String region = regionInfo.getEncodedName();
576            String hfile = storeFile.getName();
577            Path path = HFileLink.createPath(table, region, family, hfile);
578
579            SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
580              .setType(SnapshotFileInfo.Type.HFILE)
581              .setHfile(path.toString())
582              .build();
583
584            long size;
585            if (storeFile.hasFileSize()) {
586              size = storeFile.getFileSize();
587            } else {
588              size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
589            }
590            files.add(new Pair<>(fileInfo, size));
591          }
592        }
593    });
594
595    return files;
596  }
597
598  /**
599   * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
600   * The groups created will have similar amounts of bytes.
601   * <p>
602   * The algorithm used is pretty straightforward; the file list is sorted by size,
603   * and then each group fetch the bigger file available, iterating through groups
604   * alternating the direction.
605   */
606  static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
607      final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
608    // Sort files by size, from small to big
609    Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
610      public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
611        long r = a.getSecond() - b.getSecond();
612        return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
613      }
614    });
615
616    // create balanced groups
617    List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
618    long[] sizeGroups = new long[ngroups];
619    int hi = files.size() - 1;
620    int lo = 0;
621
622    List<Pair<SnapshotFileInfo, Long>> group;
623    int dir = 1;
624    int g = 0;
625
626    while (hi >= lo) {
627      if (g == fileGroups.size()) {
628        group = new LinkedList<>();
629        fileGroups.add(group);
630      } else {
631        group = fileGroups.get(g);
632      }
633
634      Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
635
636      // add the hi one
637      sizeGroups[g] += fileInfo.getSecond();
638      group.add(fileInfo);
639
640      // change direction when at the end or the beginning
641      g += dir;
642      if (g == ngroups) {
643        dir = -1;
644        g = ngroups - 1;
645      } else if (g < 0) {
646        dir = 1;
647        g = 0;
648      }
649    }
650
651    if (LOG.isDebugEnabled()) {
652      for (int i = 0; i < sizeGroups.length; ++i) {
653        LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
654      }
655    }
656
657    return fileGroups;
658  }
659
660  private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
661    @Override
662    public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
663        TaskAttemptContext tac) throws IOException, InterruptedException {
664      return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
665    }
666
667    @Override
668    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
669      Configuration conf = context.getConfiguration();
670      Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
671      FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
672
673      List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
674      int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
675      if (mappers == 0 && snapshotFiles.size() > 0) {
676        mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
677        mappers = Math.min(mappers, snapshotFiles.size());
678        conf.setInt(CONF_NUM_SPLITS, mappers);
679        conf.setInt(MR_NUM_MAPS, mappers);
680      }
681
682      List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
683      List<InputSplit> splits = new ArrayList(groups.size());
684      for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
685        splits.add(new ExportSnapshotInputSplit(files));
686      }
687      return splits;
688    }
689
690    private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
691      private List<Pair<BytesWritable, Long>> files;
692      private long length;
693
694      public ExportSnapshotInputSplit() {
695        this.files = null;
696      }
697
698      public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
699        this.files = new ArrayList(snapshotFiles.size());
700        for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
701          this.files.add(new Pair<>(
702            new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
703          this.length += fileInfo.getSecond();
704        }
705      }
706
707      private List<Pair<BytesWritable, Long>> getSplitKeys() {
708        return files;
709      }
710
711      @Override
712      public long getLength() throws IOException, InterruptedException {
713        return length;
714      }
715
716      @Override
717      public String[] getLocations() throws IOException, InterruptedException {
718        return new String[] {};
719      }
720
721      @Override
722      public void readFields(DataInput in) throws IOException {
723        int count = in.readInt();
724        files = new ArrayList<>(count);
725        length = 0;
726        for (int i = 0; i < count; ++i) {
727          BytesWritable fileInfo = new BytesWritable();
728          fileInfo.readFields(in);
729          long size = in.readLong();
730          files.add(new Pair<>(fileInfo, size));
731          length += size;
732        }
733      }
734
735      @Override
736      public void write(DataOutput out) throws IOException {
737        out.writeInt(files.size());
738        for (final Pair<BytesWritable, Long> fileInfo: files) {
739          fileInfo.getFirst().write(out);
740          out.writeLong(fileInfo.getSecond());
741        }
742      }
743    }
744
745    private static class ExportSnapshotRecordReader
746        extends RecordReader<BytesWritable, NullWritable> {
747      private final List<Pair<BytesWritable, Long>> files;
748      private long totalSize = 0;
749      private long procSize = 0;
750      private int index = -1;
751
752      ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
753        this.files = files;
754        for (Pair<BytesWritable, Long> fileInfo: files) {
755          totalSize += fileInfo.getSecond();
756        }
757      }
758
759      @Override
760      public void close() { }
761
762      @Override
763      public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
764
765      @Override
766      public NullWritable getCurrentValue() { return NullWritable.get(); }
767
768      @Override
769      public float getProgress() { return (float)procSize / totalSize; }
770
771      @Override
772      public void initialize(InputSplit split, TaskAttemptContext tac) { }
773
774      @Override
775      public boolean nextKeyValue() {
776        if (index >= 0) {
777          procSize += files.get(index).getSecond();
778        }
779        return(++index < files.size());
780      }
781    }
782  }
783
784  // ==========================================================================
785  //  Tool
786  // ==========================================================================
787
788  /**
789   * Run Map-Reduce Job to perform the files copy.
790   */
791  private void runCopyJob(final Path inputRoot, final Path outputRoot,
792      final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
793      final String filesUser, final String filesGroup, final int filesMode,
794      final int mappers, final int bandwidthMB)
795          throws IOException, InterruptedException, ClassNotFoundException {
796    Configuration conf = getConf();
797    if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
798    if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
799    if (mappers > 0) {
800      conf.setInt(CONF_NUM_SPLITS, mappers);
801      conf.setInt(MR_NUM_MAPS, mappers);
802    }
803    conf.setInt(CONF_FILES_MODE, filesMode);
804    conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
805    conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
806    conf.set(CONF_INPUT_ROOT, inputRoot.toString());
807    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
808    conf.set(CONF_SNAPSHOT_NAME, snapshotName);
809    conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
810
811    String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
812    Job job = new Job(conf);
813    job.setJobName(jobname);
814    job.setJarByClass(ExportSnapshot.class);
815    TableMapReduceUtil.addDependencyJars(job);
816    job.setMapperClass(ExportMapper.class);
817    job.setInputFormatClass(ExportSnapshotInputFormat.class);
818    job.setOutputFormatClass(NullOutputFormat.class);
819    job.setMapSpeculativeExecution(false);
820    job.setNumReduceTasks(0);
821
822    // Acquire the delegation Tokens
823    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
824    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
825      new Path[] { inputRoot }, srcConf);
826    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
827    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
828        new Path[] { outputRoot }, destConf);
829
830    // Run the MR Job
831    if (!job.waitForCompletion(true)) {
832      throw new ExportSnapshotException(job.getStatus().getFailureInfo());
833    }
834  }
835
836  private void verifySnapshot(final Configuration baseConf,
837      final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
838    // Update the conf with the current root dir, since may be a different cluster
839    Configuration conf = new Configuration(baseConf);
840    FSUtils.setRootDir(conf, rootDir);
841    FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
842    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
843    SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
844  }
845
846  /**
847   * Set path ownership.
848   */
849  private void setOwner(final FileSystem fs, final Path path, final String user,
850      final String group, final boolean recursive) throws IOException {
851    if (user != null || group != null) {
852      if (recursive && fs.isDirectory(path)) {
853        for (FileStatus child : fs.listStatus(path)) {
854          setOwner(fs, child.getPath(), user, group, recursive);
855        }
856      }
857      fs.setOwner(path, user, group);
858    }
859  }
860
861  /**
862   * Set path permission.
863   */
864  private void setPermission(final FileSystem fs, final Path path, final short filesMode,
865      final boolean recursive) throws IOException {
866    if (filesMode > 0) {
867      FsPermission perm = new FsPermission(filesMode);
868      if (recursive && fs.isDirectory(path)) {
869        for (FileStatus child : fs.listStatus(path)) {
870          setPermission(fs, child.getPath(), filesMode, recursive);
871        }
872      }
873      fs.setPermission(path, perm);
874    }
875  }
876
877  private boolean verifyTarget = true;
878  private boolean verifyChecksum = true;
879  private String snapshotName = null;
880  private String targetName = null;
881  private boolean overwrite = false;
882  private String filesGroup = null;
883  private String filesUser = null;
884  private Path outputRoot = null;
885  private Path inputRoot = null;
886  private int bandwidthMB = Integer.MAX_VALUE;
887  private int filesMode = 0;
888  private int mappers = 0;
889
890  @Override
891  protected void processOptions(CommandLine cmd) {
892    snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName);
893    targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName);
894    if (cmd.hasOption(Options.COPY_TO.getLongOpt())) {
895      outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt()));
896    }
897    if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) {
898      inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt()));
899    }
900    mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers);
901    filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser);
902    filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup);
903    filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode);
904    bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB);
905    overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt());
906    // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...).
907    verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt());
908    verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
909  }
910
911  /**
912   * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
913   * @return 0 on success, and != 0 upon failure.
914   */
915  @Override
916  public int doWork() throws IOException {
917    Configuration conf = getConf();
918
919    // Check user options
920    if (snapshotName == null) {
921      System.err.println("Snapshot name not provided.");
922      LOG.error("Use -h or --help for usage instructions.");
923      return 0;
924    }
925
926    if (outputRoot == null) {
927      System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt()
928              + ") not provided.");
929      LOG.error("Use -h or --help for usage instructions.");
930      return 0;
931    }
932
933    if (targetName == null) {
934      targetName = snapshotName;
935    }
936    if (inputRoot == null) {
937      inputRoot = FSUtils.getRootDir(conf);
938    } else {
939      FSUtils.setRootDir(conf, inputRoot);
940    }
941
942    Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
943    srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
944    FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
945    LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
946    Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
947    destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
948    FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
949    LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
950
951    boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
952
953    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
954    Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
955    Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
956    Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
957
958    // Find the necessary directory which need to change owner and group
959    Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
960    if (outputFs.exists(needSetOwnerDir)) {
961      if (skipTmp) {
962        needSetOwnerDir = outputSnapshotDir;
963      } else {
964        needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot);
965        if (outputFs.exists(needSetOwnerDir)) {
966          needSetOwnerDir = snapshotTmpDir;
967        }
968      }
969    }
970
971    // Check if the snapshot already exists
972    if (outputFs.exists(outputSnapshotDir)) {
973      if (overwrite) {
974        if (!outputFs.delete(outputSnapshotDir, true)) {
975          System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
976          return 1;
977        }
978      } else {
979        System.err.println("The snapshot '" + targetName +
980          "' already exists in the destination: " + outputSnapshotDir);
981        return 1;
982      }
983    }
984
985    if (!skipTmp) {
986      // Check if the snapshot already in-progress
987      if (outputFs.exists(snapshotTmpDir)) {
988        if (overwrite) {
989          if (!outputFs.delete(snapshotTmpDir, true)) {
990            System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
991            return 1;
992          }
993        } else {
994          System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
995          System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
996          System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
997          return 1;
998        }
999      }
1000    }
1001
1002    // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
1003    // The snapshot references must be copied before the hfiles otherwise the cleaner
1004    // will remove them because they are unreferenced.
1005    try {
1006      LOG.info("Copy Snapshot Manifest");
1007      FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
1008    } catch (IOException e) {
1009      throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
1010        snapshotDir + " to=" + initialOutputSnapshotDir, e);
1011    } finally {
1012      if (filesUser != null || filesGroup != null) {
1013        LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1014            + filesUser)
1015            + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1016            + filesGroup));
1017        setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
1018      }
1019      if (filesMode > 0) {
1020        LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1021        setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
1022      }
1023    }
1024
1025    // Write a new .snapshotinfo if the target name is different from the source name
1026    if (!targetName.equals(snapshotName)) {
1027      SnapshotDescription snapshotDesc =
1028        SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1029          .toBuilder()
1030          .setName(targetName)
1031          .build();
1032      SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs);
1033      if (filesUser != null || filesGroup != null) {
1034        outputFs.setOwner(new Path(initialOutputSnapshotDir,
1035          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup);
1036      }
1037      if (filesMode > 0) {
1038        outputFs.setPermission(new Path(initialOutputSnapshotDir,
1039          SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode));
1040      }
1041    }
1042
1043    // Step 2 - Start MR Job to copy files
1044    // The snapshot references must be copied before the files otherwise the files gets removed
1045    // by the HFileArchiver, since they have no references.
1046    try {
1047      runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1048                 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1049
1050      LOG.info("Finalize the Snapshot Export");
1051      if (!skipTmp) {
1052        // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1053        if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1054          throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1055            snapshotTmpDir + " to=" + outputSnapshotDir);
1056        }
1057      }
1058
1059      // Step 4 - Verify snapshot integrity
1060      if (verifyTarget) {
1061        LOG.info("Verify snapshot integrity");
1062        verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1063      }
1064
1065      LOG.info("Export Completed: " + targetName);
1066      return 0;
1067    } catch (Exception e) {
1068      LOG.error("Snapshot export failed", e);
1069      if (!skipTmp) {
1070        outputFs.delete(snapshotTmpDir, true);
1071      }
1072      outputFs.delete(outputSnapshotDir, true);
1073      return 1;
1074    } finally {
1075      IOUtils.closeStream(inputFs);
1076      IOUtils.closeStream(outputFs);
1077    }
1078  }
1079
1080  @Override
1081  protected void printUsage() {
1082    super.printUsage();
1083    System.out.println("\n"
1084        + "Examples:\n"
1085        + "  hbase snapshot export \\\n"
1086        + "    --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n"
1087        + "    --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n"
1088        + "\n"
1089        + "  hbase snapshot export \\\n"
1090        + "    --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n"
1091        + "    --copy-to hdfs://srv1:50070/hbase");
1092  }
1093
1094  @Override protected void addOptions() {
1095    addRequiredOption(Options.SNAPSHOT);
1096    addOption(Options.COPY_TO);
1097    addOption(Options.COPY_FROM);
1098    addOption(Options.TARGET_NAME);
1099    addOption(Options.NO_CHECKSUM_VERIFY);
1100    addOption(Options.NO_TARGET_VERIFY);
1101    addOption(Options.OVERWRITE);
1102    addOption(Options.CHUSER);
1103    addOption(Options.CHGROUP);
1104    addOption(Options.CHMOD);
1105    addOption(Options.MAPPERS);
1106    addOption(Options.BANDWIDTH);
1107  }
1108
1109  public static void main(String[] args) {
1110    new ExportSnapshot().doStaticMain(args);
1111  }
1112}