1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Comparator;
25  import java.util.LinkedList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.classification.InterfaceStability;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.conf.Configured;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.fs.FSDataOutputStream;
36  import org.apache.hadoop.fs.FileChecksum;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.FileUtil;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.fs.permission.FsPermission;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.exceptions.ExportSnapshotException;
45  import org.apache.hadoop.hbase.io.HFileLink;
46  import org.apache.hadoop.hbase.io.HLogLink;
47  import org.apache.hadoop.hbase.mapreduce.JobUtil;
48  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
49  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.FSUtils;
52  import org.apache.hadoop.hbase.util.Pair;
53  import org.apache.hadoop.io.NullWritable;
54  import org.apache.hadoop.io.SequenceFile;
55  import org.apache.hadoop.io.Text;
56  import org.apache.hadoop.mapreduce.Job;
57  import org.apache.hadoop.mapreduce.Mapper;
58  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
59  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
60  import org.apache.hadoop.util.StringUtils;
61  import org.apache.hadoop.util.Tool;
62  import org.apache.hadoop.util.ToolRunner;
63  
64  /**
65   * Export the specified snapshot to a given FileSystem.
66   *
67   * The .snapshot/name folder is copied to the destination cluster
68   * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
69   * When everything is done, the second cluster can restore the snapshot.
70   */
71  @InterfaceAudience.Public
72  @InterfaceStability.Evolving
73  public final class ExportSnapshot extends Configured implements Tool {
74    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
75  
76    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
77    private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
78    private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
79    private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
80    private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
81    private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
82  
83    private static final String INPUT_FOLDER_PREFIX = "export-files.";
84  
85    // Export Map-Reduce Counters, to keep track of the progress
86    public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED };
87  
88    private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
89      final static int REPORT_SIZE = 1 * 1024 * 1024;
90      final static int BUFFER_SIZE = 64 * 1024;
91  
92      private boolean verifyChecksum;
93      private String filesGroup;
94      private String filesUser;
95      private short filesMode;
96  
97      private FileSystem outputFs;
98      private Path outputArchive;
99      private Path outputRoot;
100 
101     private FileSystem inputFs;
102     private Path inputArchive;
103     private Path inputRoot;
104 
105     @Override
106     public void setup(Context context) {
107       Configuration conf = context.getConfiguration();
108       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
109 
110       filesGroup = conf.get(CONF_FILES_GROUP);
111       filesUser = conf.get(CONF_FILES_USER);
112       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
113       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
114       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
115 
116       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
117       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
118 
119       try {
120         inputFs = FileSystem.get(inputRoot.toUri(), conf);
121       } catch (IOException e) {
122         throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e);
123       }
124 
125       try {
126         outputFs = FileSystem.get(outputRoot.toUri(), conf);
127       } catch (IOException e) {
128         throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e);
129       }
130     }
131 
132     @Override
133     public void map(Text key, NullWritable value, Context context)
134         throws InterruptedException, IOException {
135       Path inputPath = new Path(key.toString());
136       Path outputPath = getOutputPath(inputPath);
137 
138       LOG.info("copy file input=" + inputPath + " output=" + outputPath);
139       if (copyFile(context, inputPath, outputPath)) {
140         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
141       }
142     }
143 
144     /**
145      * Returns the location where the inputPath will be copied.
146      *  - hfiles are encoded as hfile links hfile-region-table
147      *  - logs are encoded as serverName/logName
148      */
149     private Path getOutputPath(final Path inputPath) throws IOException {
150       Path path;
151       if (HFileLink.isHFileLink(inputPath) || StoreFileInfo.isReference(inputPath)) {
152         String family = inputPath.getParent().getName();
153         String table = HFileLink.getReferencedTableName(inputPath.getName());
154         String region = HFileLink.getReferencedRegionName(inputPath.getName());
155         String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
156         path = new Path(table, new Path(region, new Path(family, hfile)));
157       } else if (isHLogLinkPath(inputPath)) {
158         String logName = inputPath.getName();
159         path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName);
160       } else {
161         path = inputPath;
162       }
163       return new Path(outputArchive, path);
164     }
165 
166     private boolean copyFile(final Context context, final Path inputPath, final Path outputPath)
167         throws IOException {
168       FSDataInputStream in = openSourceFile(inputPath);
169       if (in == null) {
170         context.getCounter(Counter.MISSING_FILES).increment(1);
171         return false;
172       }
173 
174       try {
175         // Verify if the input file exists
176         FileStatus inputStat = getFileStatus(inputFs, inputPath);
177         if (inputStat == null) return false;
178 
179         // Verify if the output file exists and is the same that we want to copy
180         if (outputFs.exists(outputPath)) {
181           FileStatus outputStat = outputFs.getFileStatus(outputPath);
182           if (sameFile(inputStat, outputStat)) {
183             LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
184             return true;
185           }
186         }
187 
188         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
189 
190         // Ensure that the output folder is there and copy the file
191         outputFs.mkdirs(outputPath.getParent());
192         FSDataOutputStream out = outputFs.create(outputPath, true);
193         try {
194           if (!copyData(context, inputPath, in, outputPath, out, inputStat.getLen()))
195             return false;
196         } finally {
197           out.close();
198         }
199 
200         // Preserve attributes
201         return preserveAttributes(outputPath, inputStat);
202       } finally {
203         in.close();
204       }
205     }
206 
207     /**
208      * Preserve the files attribute selected by the user copying them from the source file
209      */
210     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
211       FileStatus stat;
212       try {
213         stat = outputFs.getFileStatus(path);
214       } catch (IOException e) {
215         LOG.warn("Unable to get the status for file=" + path);
216         return false;
217       }
218 
219       try {
220         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
221           outputFs.setPermission(path, new FsPermission(filesMode));
222         } else if (!stat.getPermission().equals(refStat.getPermission())) {
223           outputFs.setPermission(path, refStat.getPermission());
224         }
225       } catch (IOException e) {
226         LOG.error("Unable to set the permission for file=" + path, e);
227         return false;
228       }
229 
230       try {
231         String user = (filesUser != null) ? filesUser : refStat.getOwner();
232         String group = (filesGroup != null) ? filesGroup : refStat.getGroup();
233         if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
234           outputFs.setOwner(path, user, group);
235         }
236       } catch (IOException e) {
237         LOG.error("Unable to set the owner/group for file=" + path, e);
238         return false;
239       }
240 
241       return true;
242     }
243 
244     private boolean copyData(final Context context,
245         final Path inputPath, final FSDataInputStream in,
246         final Path outputPath, final FSDataOutputStream out,
247         final long inputFileSize) {
248       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
249                                    " (%.3f%%) from " + inputPath + " to " + outputPath;
250 
251       try {
252         byte[] buffer = new byte[BUFFER_SIZE];
253         long totalBytesWritten = 0;
254         int reportBytes = 0;
255         int bytesRead;
256 
257         while ((bytesRead = in.read(buffer)) > 0) {
258           out.write(buffer, 0, bytesRead);
259           totalBytesWritten += bytesRead;
260           reportBytes += bytesRead;
261 
262           if (reportBytes >= REPORT_SIZE) {
263             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
264             context.setStatus(String.format(statusMessage,
265                               StringUtils.humanReadableInt(totalBytesWritten),
266                               reportBytes/(float)inputFileSize));
267             reportBytes = 0;
268           }
269         }
270 
271         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
272         context.setStatus(String.format(statusMessage,
273                           StringUtils.humanReadableInt(totalBytesWritten),
274                           reportBytes/(float)inputFileSize));
275 
276         // Verify that the written size match
277         if (totalBytesWritten != inputFileSize) {
278           LOG.error("number of bytes copied not matching copied=" + totalBytesWritten +
279                     " expected=" + inputFileSize + " for file=" + inputPath);
280           context.getCounter(Counter.COPY_FAILED).increment(1);
281           return false;
282         }
283 
284         return true;
285       } catch (IOException e) {
286         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
287         context.getCounter(Counter.COPY_FAILED).increment(1);
288         return false;
289       }
290     }
291 
292     private FSDataInputStream openSourceFile(final Path path) {
293       try {
294         if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
295           return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
296         } else if (isHLogLinkPath(path)) {
297           String serverName = path.getParent().getName();
298           String logName = path.getName();
299           return new HLogLink(inputRoot, serverName, logName).open(inputFs);
300         }
301         return inputFs.open(path);
302       } catch (IOException e) {
303         LOG.error("Unable to open source file=" + path, e);
304         return null;
305       }
306     }
307 
308     private FileStatus getFileStatus(final FileSystem fs, final Path path) {
309       try {
310         if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
311           HFileLink link = new HFileLink(inputRoot, inputArchive, path);
312           return link.getFileStatus(fs);
313         } else if (isHLogLinkPath(path)) {
314           String serverName = path.getParent().getName();
315           String logName = path.getName();
316           return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs);
317         }
318         return fs.getFileStatus(path);
319       } catch (IOException e) {
320         LOG.warn("Unable to get the status for file=" + path);
321         return null;
322       }
323     }
324 
325     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
326       try {
327         return fs.getFileChecksum(path);
328       } catch (IOException e) {
329         LOG.warn("Unable to get checksum for file=" + path, e);
330         return null;
331       }
332     }
333 
334     /**
335      * Check if the two files are equal by looking at the file length,
336      * and at the checksum (if user has specified the verifyChecksum flag).
337      */
338     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
339       // Not matching length
340       if (inputStat.getLen() != outputStat.getLen()) return false;
341 
342       // Mark files as equals, since user asked for no checksum verification
343       if (!verifyChecksum) return true;
344 
345       // If checksums are not available, files are not the same.
346       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
347       if (inChecksum == null) return false;
348 
349       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
350       if (outChecksum == null) return false;
351 
352       return inChecksum.equals(outChecksum);
353     }
354 
355     /**
356      * HLog files are encoded as serverName/logName
357      * and since all the other files should be in /hbase/table/..path..
358      * we can rely on the depth, for now.
359      */
360     private static boolean isHLogLinkPath(final Path path) {
361       return path.depth() == 2;
362     }
363   }
364 
365   /**
366    * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
367    * @return list of files referenced by the snapshot (pair of path and size)
368    */
369   private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir)
370       throws IOException {
371     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
372 
373     final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
374     final String table = snapshotDesc.getTable();
375     final Configuration conf = getConf();
376 
377     // Get snapshot files
378     SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir,
379       new SnapshotReferenceUtil.FileVisitor() {
380         public void storeFile (final String region, final String family, final String hfile)
381             throws IOException {
382           Path path = new Path(family, HFileLink.createHFileLinkName(table, region, hfile));
383           long size = new HFileLink(conf, path).getFileStatus(fs).getLen();
384           files.add(new Pair<Path, Long>(path, size));
385         }
386 
387         public void recoveredEdits (final String region, final String logfile)
388             throws IOException {
389           // copied with the snapshot referenecs
390         }
391 
392         public void logFile (final String server, final String logfile)
393             throws IOException {
394           long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
395           files.add(new Pair<Path, Long>(new Path(server, logfile), size));
396         }
397     });
398 
399     return files;
400   }
401 
402   /**
403    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
404    * The groups created will have similar amounts of bytes.
405    * <p>
406    * The algorithm used is pretty straightforward; the file list is sorted by size,
407    * and then each group fetch the bigger file available, iterating through groups
408    * alternating the direction.
409    */
410   static List<List<Path>> getBalancedSplits(final List<Pair<Path, Long>> files, int ngroups) {
411     // Sort files by size, from small to big
412     Collections.sort(files, new Comparator<Pair<Path, Long>>() {
413       public int compare(Pair<Path, Long> a, Pair<Path, Long> b) {
414         long r = a.getSecond() - b.getSecond();
415         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
416       }
417     });
418 
419     // create balanced groups
420     List<List<Path>> fileGroups = new LinkedList<List<Path>>();
421     long[] sizeGroups = new long[ngroups];
422     int hi = files.size() - 1;
423     int lo = 0;
424 
425     List<Path> group;
426     int dir = 1;
427     int g = 0;
428 
429     while (hi >= lo) {
430       if (g == fileGroups.size()) {
431         group = new LinkedList<Path>();
432         fileGroups.add(group);
433       } else {
434         group = fileGroups.get(g);
435       }
436 
437       Pair<Path, Long> fileInfo = files.get(hi--);
438 
439       // add the hi one
440       sizeGroups[g] += fileInfo.getSecond();
441       group.add(fileInfo.getFirst());
442 
443       // change direction when at the end or the beginning
444       g += dir;
445       if (g == ngroups) {
446         dir = -1;
447         g = ngroups - 1;
448       } else if (g < 0) {
449         dir = 1;
450         g = 0;
451       }
452     }
453 
454     if (LOG.isDebugEnabled()) {
455       for (int i = 0; i < sizeGroups.length; ++i) {
456         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
457       }
458     }
459 
460     return fileGroups;
461   }
462 
463   private static Path getInputFolderPath(Configuration conf)
464       throws IOException, InterruptedException {
465     Path stagingDir = JobUtil.getStagingDir(conf);
466     return new Path(stagingDir, INPUT_FOLDER_PREFIX +
467       String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
468   }
469 
470   /**
471    * Create the input files, with the path to copy, for the MR job.
472    * Each input files contains n files, and each input file has a similar amount data to copy.
473    * The number of input files created are based on the number of mappers provided as argument
474    * and the number of the files to copy.
475    */
476   private static Path[] createInputFiles(final Configuration conf,
477       final List<Pair<Path, Long>> snapshotFiles, int mappers)
478       throws IOException, InterruptedException {
479     Path inputFolderPath = getInputFolderPath(conf);
480     FileSystem fs = inputFolderPath.getFileSystem(conf);
481     LOG.debug("Input folder location: " + inputFolderPath);
482 
483     List<List<Path>> splits = getBalancedSplits(snapshotFiles, mappers);
484     Path[] inputFiles = new Path[splits.size()];
485 
486     Text key = new Text();
487     for (int i = 0; i < inputFiles.length; i++) {
488       List<Path> files = splits.get(i);
489       inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
490       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
491         Text.class, NullWritable.class);
492       LOG.debug("Input split: " + i);
493       try {
494         for (Path file: files) {
495           LOG.debug(file.toString());
496           key.set(file.toString());
497           writer.append(key, NullWritable.get());
498         }
499       } finally {
500         writer.close();
501       }
502     }
503 
504     return inputFiles;
505   }
506 
507   /**
508    * Run Map-Reduce Job to perform the files copy.
509    */
510   private boolean runCopyJob(final Path inputRoot, final Path outputRoot,
511       final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
512       final String filesUser, final String filesGroup, final int filesMode,
513       final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
514     Configuration conf = getConf();
515     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
516     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
517     conf.setInt(CONF_FILES_MODE, filesMode);
518     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
519     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
520     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
521     conf.setInt("mapreduce.job.maps", mappers);
522 
523     Job job = new Job(conf);
524     job.setJobName("ExportSnapshot");
525     job.setJarByClass(ExportSnapshot.class);
526     job.setMapperClass(ExportMapper.class);
527     job.setInputFormatClass(SequenceFileInputFormat.class);
528     job.setOutputFormatClass(NullOutputFormat.class);
529     job.setMapSpeculativeExecution(false);
530     job.setNumReduceTasks(0);
531     for (Path path: createInputFiles(conf, snapshotFiles, mappers)) {
532       LOG.debug("Add Input Path=" + path);
533       SequenceFileInputFormat.addInputPath(job, path);
534     }
535 
536     return job.waitForCompletion(true);
537   }
538 
539   /**
540    * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
541    * @return 0 on success, and != 0 upon failure.
542    */
543   @Override
544   public int run(String[] args) throws Exception {
545     boolean verifyChecksum = true;
546     String snapshotName = null;
547     String filesGroup = null;
548     String filesUser = null;
549     Path outputRoot = null;
550     int filesMode = 0;
551     int mappers = getConf().getInt("mapreduce.job.maps", 1);
552 
553     // Process command line args
554     for (int i = 0; i < args.length; i++) {
555       String cmd = args[i];
556       try {
557         if (cmd.equals("-snapshot")) {
558           snapshotName = args[++i];
559         } else if (cmd.equals("-copy-to")) {
560           outputRoot = new Path(args[++i]);
561         } else if (cmd.equals("-no-checksum-verify")) {
562           verifyChecksum = false;
563         } else if (cmd.equals("-mappers")) {
564           mappers = Integer.parseInt(args[++i]);
565         } else if (cmd.equals("-chuser")) {
566           filesUser = args[++i];
567         } else if (cmd.equals("-chgroup")) {
568           filesGroup = args[++i];
569         } else if (cmd.equals("-chmod")) {
570           filesMode = Integer.parseInt(args[++i], 8);
571         } else if (cmd.equals("-h") || cmd.equals("--help")) {
572           printUsageAndExit();
573         } else {
574           System.err.println("UNEXPECTED: " + cmd);
575           printUsageAndExit();
576         }
577       } catch (Exception e) {
578         printUsageAndExit();
579       }
580     }
581 
582     // Check user options
583     if (snapshotName == null) {
584       System.err.println("Snapshot name not provided.");
585       printUsageAndExit();
586     }
587 
588     if (outputRoot == null) {
589       System.err.println("Destination file-system not provided.");
590       printUsageAndExit();
591     }
592 
593     Configuration conf = getConf();
594     Path inputRoot = FSUtils.getRootDir(conf);
595     FileSystem inputFs = FileSystem.get(conf);
596     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
597 
598     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
599     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotName, outputRoot);
600     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, outputRoot);
601 
602     // Check if the snapshot already exists
603     if (outputFs.exists(outputSnapshotDir)) {
604       System.err.println("The snapshot '" + snapshotName +
605         "' already exists in the destination: " + outputSnapshotDir);
606       return 1;
607     }
608 
609     // Check if the snapshot already in-progress
610     if (outputFs.exists(snapshotTmpDir)) {
611       System.err.println("A snapshot with the same name '" + snapshotName + "' may be in-progress");
612       System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
613       System.err.println("consider removing " + snapshotTmpDir + " before retrying export"); 
614       return 1;
615     }
616 
617     // Step 0 - Extract snapshot files to copy
618     final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
619 
620     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
621     // The snapshot references must be copied before the hfiles otherwise the cleaner
622     // will remove them because they are unreferenced.
623     try {
624       FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf);
625     } catch (IOException e) {
626       System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir +
627         " to=" + snapshotTmpDir);
628       e.printStackTrace(System.err);
629       return 1;
630     }
631 
632     // Step 2 - Start MR Job to copy files
633     // The snapshot references must be copied before the files otherwise the files gets removed
634     // by the HFileArchiver, since they have no references.
635     try {
636       if (files.size() == 0) {
637         LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
638       } else {
639         if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
640             filesUser, filesGroup, filesMode, mappers)) {
641           throw new ExportSnapshotException("Snapshot export failed!");
642         }
643       }
644 
645       // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
646       if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
647         System.err.println("Snapshot export failed!");
648         System.err.println("Unable to rename snapshot directory from=" +
649                            snapshotTmpDir + " to=" + outputSnapshotDir);
650         return 1;
651       }
652 
653       return 0;
654     } catch (Exception e) {
655       System.err.println("Snapshot export failed!");
656       e.printStackTrace(System.err);
657       outputFs.delete(outputSnapshotDir, true);
658       return 1;
659     }
660   }
661 
662   // ExportSnapshot
663   private void printUsageAndExit() {
664     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
665     System.err.println(" where [options] are:");
666     System.err.println("  -h|-help                Show this help and exit.");
667     System.err.println("  -snapshot NAME          Snapshot to restore.");
668     System.err.println("  -copy-to NAME           Remote destination hdfs://");
669     System.err.println("  -no-checksum-verify     Do not verify checksum.");
670     System.err.println("  -chuser USERNAME        Change the owner of the files to the specified one.");
671     System.err.println("  -chgroup GROUP          Change the group of the files to the specified one.");
672     System.err.println("  -chmod MODE             Change the permission of the files to the specified one.");
673     System.err.println("  -mappers                Number of mappers to use during the copy (mapreduce.job.maps).");
674     System.err.println();
675     System.err.println("Examples:");
676     System.err.println("  hbase " + getClass() + " \\");
677     System.err.println("    -snapshot MySnapshot -copy-to hdfs:///srv2:8082/hbase \\");
678     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
679     System.exit(1);
680   }
681 
682   /**
683    * The guts of the {@link #main} method.
684    * Call this method to avoid the {@link #main(String[])} System.exit.
685    * @param args
686    * @return errCode
687    * @throws Exception
688    */
689   static int innerMain(final Configuration conf, final String [] args) throws Exception {
690     return ToolRunner.run(conf, new ExportSnapshot(), args);
691   }
692 
693   public static void main(String[] args) throws Exception {
694      System.exit(innerMain(HBaseConfiguration.create(), args));
695   }
696 }