View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.io.BytesWritable;
62  import org.apache.hadoop.io.IOUtils;
63  import org.apache.hadoop.io.NullWritable;
64  import org.apache.hadoop.io.Writable;
65  import org.apache.hadoop.mapreduce.Job;
66  import org.apache.hadoop.mapreduce.JobContext;
67  import org.apache.hadoop.mapreduce.Mapper;
68  import org.apache.hadoop.mapreduce.InputFormat;
69  import org.apache.hadoop.mapreduce.InputSplit;
70  import org.apache.hadoop.mapreduce.RecordReader;
71  import org.apache.hadoop.mapreduce.TaskAttemptContext;
72  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73  import org.apache.hadoop.mapreduce.security.TokenCache;
74  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75  import org.apache.hadoop.util.StringUtils;
76  import org.apache.hadoop.util.Tool;
77  import org.apache.hadoop.util.ToolRunner;
78  
79  /**
80   * Export the specified snapshot to a given FileSystem.
81   *
82   * The .snapshot/name folder is copied to the destination cluster
83   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
84   * When everything is done, the second cluster can restore the snapshot.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class ExportSnapshot extends Configured implements Tool {
89    public static final String NAME = "exportsnapshot";
90  
91    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
92  
93    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
94    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
95    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
96    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
97    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
98    private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
99    private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
100   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
101   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
102   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
103   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
104   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
105   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
106   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
107 
108   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
109   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
110 
111   private static final String INPUT_FOLDER_PREFIX = "export-files.";
112 
113   // Export Map-Reduce Counters, to keep track of the progress
114   public enum Counter {
115     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
116     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
117   }
118 
119   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
120                                                    NullWritable, NullWritable> {
121     final static int REPORT_SIZE = 1 * 1024 * 1024;
122     final static int BUFFER_SIZE = 64 * 1024;
123 
124     private boolean testFailures;
125     private Random random;
126 
127     private boolean verifyChecksum;
128     private String filesGroup;
129     private String filesUser;
130     private short filesMode;
131     private int bufferSize;
132 
133     private FileSystem outputFs;
134     private Path outputArchive;
135     private Path outputRoot;
136 
137     private FileSystem inputFs;
138     private Path inputArchive;
139     private Path inputRoot;
140 
141     @Override
142     public void setup(Context context) throws IOException {
143       Configuration conf = context.getConfiguration();
144       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
145 
146       filesGroup = conf.get(CONF_FILES_GROUP);
147       filesUser = conf.get(CONF_FILES_USER);
148       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
149       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
150       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
151 
152       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
153       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
154 
155       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
156 
157       try {
158         conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
159         inputFs = FileSystem.get(inputRoot.toUri(), conf);
160       } catch (IOException e) {
161         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
162       }
163 
164       try {
165         conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166         outputFs = FileSystem.get(outputRoot.toUri(), conf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
169       }
170 
171       // Use the default block size of the outputFs if bigger
172       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
173       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
174       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
175 
176       for (Counter c : Counter.values()) {
177         context.getCounter(c).increment(0);
178       }
179     }
180 
181     @Override
182     protected void cleanup(Context context) {
183       IOUtils.closeStream(inputFs);
184       IOUtils.closeStream(outputFs);
185     }
186 
187     @Override
188     public void map(BytesWritable key, NullWritable value, Context context)
189         throws InterruptedException, IOException {
190       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
191       Path outputPath = getOutputPath(inputInfo);
192 
193       copyFile(context, inputInfo, outputPath);
194     }
195 
196     /**
197      * Returns the location where the inputPath will be copied.
198      */
199     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
200       Path path = null;
201       switch (inputInfo.getType()) {
202         case HFILE:
203           Path inputPath = new Path(inputInfo.getHfile());
204           String family = inputPath.getParent().getName();
205           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
206           String region = HFileLink.getReferencedRegionName(inputPath.getName());
207           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
208           path = new Path(FSUtils.getTableDir(new Path("./"), table),
209               new Path(region, new Path(family, hfile)));
210           break;
211         case WAL:
212           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
213           path = new Path(oldLogsDir, inputInfo.getWalName());
214           break;
215         default:
216           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
217       }
218       return new Path(outputArchive, path);
219     }
220 
221     /*
222      * Used by TestExportSnapshot to simulate a failure
223      */
224     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
225         throws IOException {
226       if (testFailures) {
227         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
228           if (random == null) {
229             random = new Random();
230           }
231 
232           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
233           // retry, but at least we reduce the number of test failures due to
234           // this test exception from the same map task.
235           if (random.nextFloat() < 0.03) {
236             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
237                                   + " time=" + System.currentTimeMillis());
238           }
239         } else {
240           context.getCounter(Counter.COPY_FAILED).increment(1);
241           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
242         }
243       }
244     }
245 
246     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
247         final Path outputPath) throws IOException {
248       injectTestFailure(context, inputInfo);
249 
250       // Get the file information
251       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
252 
253       // Verify if the output file exists and is the same that we want to copy
254       if (outputFs.exists(outputPath)) {
255         FileStatus outputStat = outputFs.getFileStatus(outputPath);
256         if (outputStat != null && sameFile(inputStat, outputStat)) {
257           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
258           context.getCounter(Counter.FILES_SKIPPED).increment(1);
259           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
260           return;
261         }
262       }
263 
264       InputStream in = openSourceFile(context, inputInfo);
265       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
266       if (Integer.MAX_VALUE != bandwidthMB) {
267         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
268       }
269 
270       try {
271         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
272 
273         // Ensure that the output folder is there and copy the file
274         createOutputPath(outputPath.getParent());
275         FSDataOutputStream out = outputFs.create(outputPath, true);
276         try {
277           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
278         } finally {
279           out.close();
280         }
281 
282         // Try to Preserve attributes
283         if (!preserveAttributes(outputPath, inputStat)) {
284           LOG.warn("You may have to run manually chown on: " + outputPath);
285         }
286       } finally {
287         in.close();
288       }
289     }
290 
291     /**
292      * Create the output folder and optionally set ownership.
293      */
294     private void createOutputPath(final Path path) throws IOException {
295       if (filesUser == null && filesGroup == null) {
296         outputFs.mkdirs(path);
297       } else {
298         Path parent = path.getParent();
299         if (!outputFs.exists(parent) && !parent.isRoot()) {
300           createOutputPath(parent);
301         }
302         outputFs.mkdirs(path);
303         if (filesUser != null || filesGroup != null) {
304           // override the owner when non-null user/group is specified
305           outputFs.setOwner(path, filesUser, filesGroup);
306         }
307         if (filesMode > 0) {
308           outputFs.setPermission(path, new FsPermission(filesMode));
309         }
310       }
311     }
312 
313     /**
314      * Try to Preserve the files attribute selected by the user copying them from the source file
315      * This is only required when you are exporting as a different user than "hbase" or on a system
316      * that doesn't have the "hbase" user.
317      *
318      * This is not considered a blocking failure since the user can force a chmod with the user
319      * that knows is available on the system.
320      */
321     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
322       FileStatus stat;
323       try {
324         stat = outputFs.getFileStatus(path);
325       } catch (IOException e) {
326         LOG.warn("Unable to get the status for file=" + path);
327         return false;
328       }
329 
330       try {
331         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
332           outputFs.setPermission(path, new FsPermission(filesMode));
333         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
334           outputFs.setPermission(path, refStat.getPermission());
335         }
336       } catch (IOException e) {
337         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
338         return false;
339       }
340 
341       boolean hasRefStat = (refStat != null);
342       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
343       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
344       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
345         try {
346           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
347             outputFs.setOwner(path, user, group);
348           }
349         } catch (IOException e) {
350           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
351           LOG.warn("The user/group may not exist on the destination cluster: user=" +
352                    user + " group=" + group);
353           return false;
354         }
355       }
356 
357       return true;
358     }
359 
360     private boolean stringIsNotEmpty(final String str) {
361       return str != null && str.length() > 0;
362     }
363 
364     private void copyData(final Context context,
365         final Path inputPath, final InputStream in,
366         final Path outputPath, final FSDataOutputStream out,
367         final long inputFileSize)
368         throws IOException {
369       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
370                                    " (%.1f%%)";
371 
372       try {
373         byte[] buffer = new byte[bufferSize];
374         long totalBytesWritten = 0;
375         int reportBytes = 0;
376         int bytesRead;
377 
378         long stime = System.currentTimeMillis();
379         while ((bytesRead = in.read(buffer)) > 0) {
380           out.write(buffer, 0, bytesRead);
381           totalBytesWritten += bytesRead;
382           reportBytes += bytesRead;
383 
384           if (reportBytes >= REPORT_SIZE) {
385             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
386             context.setStatus(String.format(statusMessage,
387                               StringUtils.humanReadableInt(totalBytesWritten),
388                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
389                               " from " + inputPath + " to " + outputPath);
390             reportBytes = 0;
391           }
392         }
393         long etime = System.currentTimeMillis();
394 
395         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
396         context.setStatus(String.format(statusMessage,
397                           StringUtils.humanReadableInt(totalBytesWritten),
398                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
399                           " from " + inputPath + " to " + outputPath);
400 
401         // Verify that the written size match
402         if (totalBytesWritten != inputFileSize) {
403           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
404                        " expected=" + inputFileSize + " for file=" + inputPath;
405           throw new IOException(msg);
406         }
407 
408         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
409         LOG.info("size=" + totalBytesWritten +
410             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
411             " time=" + StringUtils.formatTimeDiff(etime, stime) +
412             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
413         context.getCounter(Counter.FILES_COPIED).increment(1);
414       } catch (IOException e) {
415         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
416         context.getCounter(Counter.COPY_FAILED).increment(1);
417         throw e;
418       }
419     }
420 
421     /**
422      * Try to open the "source" file.
423      * Throws an IOException if the communication with the inputFs fail or
424      * if the file is not found.
425      */
426     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
427             throws IOException {
428       try {
429         Configuration conf = context.getConfiguration();
430         FileLink link = null;
431         switch (fileInfo.getType()) {
432           case HFILE:
433             Path inputPath = new Path(fileInfo.getHfile());
434             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
435             break;
436           case WAL:
437             String serverName = fileInfo.getWalServer();
438             String logName = fileInfo.getWalName();
439             link = new WALLink(inputRoot, serverName, logName);
440             break;
441           default:
442             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
443         }
444         return link.open(inputFs);
445       } catch (IOException e) {
446         context.getCounter(Counter.MISSING_FILES).increment(1);
447         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
448         throw e;
449       }
450     }
451 
452     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
453         throws IOException {
454       try {
455         Configuration conf = context.getConfiguration();
456         FileLink link = null;
457         switch (fileInfo.getType()) {
458           case HFILE:
459             Path inputPath = new Path(fileInfo.getHfile());
460             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
461             break;
462           case WAL:
463             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
464             break;
465           default:
466             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
467         }
468         return link.getFileStatus(inputFs);
469       } catch (FileNotFoundException e) {
470         context.getCounter(Counter.MISSING_FILES).increment(1);
471         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
472         throw e;
473       } catch (IOException e) {
474         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
475         throw e;
476       }
477     }
478 
479     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
480       try {
481         return fs.getFileChecksum(path);
482       } catch (IOException e) {
483         LOG.warn("Unable to get checksum for file=" + path, e);
484         return null;
485       }
486     }
487 
488     /**
489      * Check if the two files are equal by looking at the file length,
490      * and at the checksum (if user has specified the verifyChecksum flag).
491      */
492     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
493       // Not matching length
494       if (inputStat.getLen() != outputStat.getLen()) return false;
495 
496       // Mark files as equals, since user asked for no checksum verification
497       if (!verifyChecksum) return true;
498 
499       // If checksums are not available, files are not the same.
500       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
501       if (inChecksum == null) return false;
502 
503       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
504       if (outChecksum == null) return false;
505 
506       return inChecksum.equals(outChecksum);
507     }
508   }
509 
510   // ==========================================================================
511   //  Input Format
512   // ==========================================================================
513 
514   /**
515    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
516    * @return list of files referenced by the snapshot (pair of path and size)
517    */
518   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
519       final FileSystem fs, final Path snapshotDir) throws IOException {
520     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
521 
522     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
523     final TableName table = TableName.valueOf(snapshotDesc.getTable());
524 
525     // Get snapshot files
526     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
527     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
528       new SnapshotReferenceUtil.SnapshotVisitor() {
529         @Override
530         public void storeFile(final HRegionInfo regionInfo, final String family,
531             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
532           if (storeFile.hasReference()) {
533             // copied as part of the manifest
534           } else {
535             String region = regionInfo.getEncodedName();
536             String hfile = storeFile.getName();
537             Path path = HFileLink.createPath(table, region, family, hfile);
538 
539             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
540               .setType(SnapshotFileInfo.Type.HFILE)
541               .setHfile(path.toString())
542               .build();
543 
544             long size;
545             if (storeFile.hasFileSize()) {
546               size = storeFile.getFileSize();
547             } else {
548               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
549             }
550             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
551           }
552         }
553 
554         @Override
555         public void logFile (final String server, final String logfile)
556             throws IOException {
557           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
558             .setType(SnapshotFileInfo.Type.WAL)
559             .setWalServer(server)
560             .setWalName(logfile)
561             .build();
562 
563           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
564           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
565         }
566     });
567 
568     return files;
569   }
570 
571   /**
572    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
573    * The groups created will have similar amounts of bytes.
574    * <p>
575    * The algorithm used is pretty straightforward; the file list is sorted by size,
576    * and then each group fetch the bigger file available, iterating through groups
577    * alternating the direction.
578    */
579   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
580       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
581     // Sort files by size, from small to big
582     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
583       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
584         long r = a.getSecond() - b.getSecond();
585         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
586       }
587     });
588 
589     // create balanced groups
590     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
591       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
592     long[] sizeGroups = new long[ngroups];
593     int hi = files.size() - 1;
594     int lo = 0;
595 
596     List<Pair<SnapshotFileInfo, Long>> group;
597     int dir = 1;
598     int g = 0;
599 
600     while (hi >= lo) {
601       if (g == fileGroups.size()) {
602         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
603         fileGroups.add(group);
604       } else {
605         group = fileGroups.get(g);
606       }
607 
608       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
609 
610       // add the hi one
611       sizeGroups[g] += fileInfo.getSecond();
612       group.add(fileInfo);
613 
614       // change direction when at the end or the beginning
615       g += dir;
616       if (g == ngroups) {
617         dir = -1;
618         g = ngroups - 1;
619       } else if (g < 0) {
620         dir = 1;
621         g = 0;
622       }
623     }
624 
625     if (LOG.isDebugEnabled()) {
626       for (int i = 0; i < sizeGroups.length; ++i) {
627         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
628       }
629     }
630 
631     return fileGroups;
632   }
633 
634   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
635     @Override
636     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
637         TaskAttemptContext tac) throws IOException, InterruptedException {
638       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
639     }
640 
641     @Override
642     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
643       Configuration conf = context.getConfiguration();
644       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
645       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
646       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
647 
648       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
649       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
650       if (mappers == 0 && snapshotFiles.size() > 0) {
651         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
652         mappers = Math.min(mappers, snapshotFiles.size());
653         conf.setInt(CONF_NUM_SPLITS, mappers);
654         conf.setInt(MR_NUM_MAPS, mappers);
655       }
656 
657       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
658       List<InputSplit> splits = new ArrayList(groups.size());
659       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
660         splits.add(new ExportSnapshotInputSplit(files));
661       }
662       return splits;
663     }
664 
665     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
666       private List<Pair<BytesWritable, Long>> files;
667       private long length;
668 
669       public ExportSnapshotInputSplit() {
670         this.files = null;
671       }
672 
673       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
674         this.files = new ArrayList(snapshotFiles.size());
675         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
676           this.files.add(new Pair<BytesWritable, Long>(
677             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
678           this.length += fileInfo.getSecond();
679         }
680       }
681 
682       private List<Pair<BytesWritable, Long>> getSplitKeys() {
683         return files;
684       }
685 
686       @Override
687       public long getLength() throws IOException, InterruptedException {
688         return length;
689       }
690 
691       @Override
692       public String[] getLocations() throws IOException, InterruptedException {
693         return new String[] {};
694       }
695 
696       @Override
697       public void readFields(DataInput in) throws IOException {
698         int count = in.readInt();
699         files = new ArrayList<Pair<BytesWritable, Long>>(count);
700         length = 0;
701         for (int i = 0; i < count; ++i) {
702           BytesWritable fileInfo = new BytesWritable();
703           fileInfo.readFields(in);
704           long size = in.readLong();
705           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
706           length += size;
707         }
708       }
709 
710       @Override
711       public void write(DataOutput out) throws IOException {
712         out.writeInt(files.size());
713         for (final Pair<BytesWritable, Long> fileInfo: files) {
714           fileInfo.getFirst().write(out);
715           out.writeLong(fileInfo.getSecond());
716         }
717       }
718     }
719 
720     private static class ExportSnapshotRecordReader
721         extends RecordReader<BytesWritable, NullWritable> {
722       private final List<Pair<BytesWritable, Long>> files;
723       private long totalSize = 0;
724       private long procSize = 0;
725       private int index = -1;
726 
727       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
728         this.files = files;
729         for (Pair<BytesWritable, Long> fileInfo: files) {
730           totalSize += fileInfo.getSecond();
731         }
732       }
733 
734       @Override
735       public void close() { }
736 
737       @Override
738       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
739 
740       @Override
741       public NullWritable getCurrentValue() { return NullWritable.get(); }
742 
743       @Override
744       public float getProgress() { return (float)procSize / totalSize; }
745 
746       @Override
747       public void initialize(InputSplit split, TaskAttemptContext tac) { }
748 
749       @Override
750       public boolean nextKeyValue() {
751         if (index >= 0) {
752           procSize += files.get(index).getSecond();
753         }
754         return(++index < files.size());
755       }
756     }
757   }
758 
759   // ==========================================================================
760   //  Tool
761   // ==========================================================================
762 
763   /**
764    * Run Map-Reduce Job to perform the files copy.
765    */
766   private void runCopyJob(final Path inputRoot, final Path outputRoot,
767       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
768       final String filesUser, final String filesGroup, final int filesMode,
769       final int mappers, final int bandwidthMB)
770           throws IOException, InterruptedException, ClassNotFoundException {
771     Configuration conf = getConf();
772     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
773     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
774     if (mappers > 0) {
775       conf.setInt(CONF_NUM_SPLITS, mappers);
776       conf.setInt(MR_NUM_MAPS, mappers);
777     }
778     conf.setInt(CONF_FILES_MODE, filesMode);
779     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
780     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
781     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
782     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
783     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
784     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
785 
786     Job job = new Job(conf);
787     job.setJobName("ExportSnapshot-" + snapshotName);
788     job.setJarByClass(ExportSnapshot.class);
789     TableMapReduceUtil.addDependencyJars(job);
790     job.setMapperClass(ExportMapper.class);
791     job.setInputFormatClass(ExportSnapshotInputFormat.class);
792     job.setOutputFormatClass(NullOutputFormat.class);
793     job.setMapSpeculativeExecution(false);
794     job.setNumReduceTasks(0);
795 
796     // Acquire the delegation Tokens
797     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
798       new Path[] { inputRoot, outputRoot }, conf);
799 
800     // Run the MR Job
801     if (!job.waitForCompletion(true)) {
802       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
803       // when it will be available on all the supported versions.
804       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
805     }
806   }
807 
808   private void verifySnapshot(final Configuration baseConf,
809       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
810     // Update the conf with the current root dir, since may be a different cluster
811     Configuration conf = new Configuration(baseConf);
812     FSUtils.setRootDir(conf, rootDir);
813     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
814     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
815     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
816   }
817 
818   /**
819    * Set path ownership.
820    */
821   private void setOwner(final FileSystem fs, final Path path, final String user,
822       final String group, final boolean recursive) throws IOException {
823     if (user != null || group != null) {
824       if (recursive && fs.isDirectory(path)) {
825         for (FileStatus child : fs.listStatus(path)) {
826           setOwner(fs, child.getPath(), user, group, recursive);
827         }
828       }
829       fs.setOwner(path, user, group);
830     }
831   }
832 
833   /**
834    * Set path permission.
835    */
836   private void setPermission(final FileSystem fs, final Path path, final short filesMode,
837       final boolean recursive) throws IOException {
838     if (filesMode > 0) {
839       FsPermission perm = new FsPermission(filesMode);
840       if (recursive && fs.isDirectory(path)) {
841         for (FileStatus child : fs.listStatus(path)) {
842           setPermission(fs, child.getPath(), filesMode, recursive);
843         }
844       }
845       fs.setPermission(path, perm);
846     }
847   }
848 
849   /**
850    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
851    * @return 0 on success, and != 0 upon failure.
852    */
853   @Override
854   public int run(String[] args) throws IOException {
855     boolean verifyTarget = true;
856     boolean verifyChecksum = true;
857     String snapshotName = null;
858     String targetName = null;
859     boolean overwrite = false;
860     String filesGroup = null;
861     String filesUser = null;
862     Path outputRoot = null;
863     int bandwidthMB = Integer.MAX_VALUE;
864     int filesMode = 0;
865     int mappers = 0;
866 
867     Configuration conf = getConf();
868     Path inputRoot = FSUtils.getRootDir(conf);
869 
870     // Process command line args
871     for (int i = 0; i < args.length; i++) {
872       String cmd = args[i];
873       if (cmd.equals("-snapshot")) {
874         snapshotName = args[++i];
875       } else if (cmd.equals("-target")) {
876         targetName = args[++i];
877       } else if (cmd.equals("-copy-to")) {
878         outputRoot = new Path(args[++i]);
879       } else if (cmd.equals("-copy-from")) {
880         inputRoot = new Path(args[++i]);
881         FSUtils.setRootDir(conf, inputRoot);
882       } else if (cmd.equals("-no-checksum-verify")) {
883         verifyChecksum = false;
884       } else if (cmd.equals("-no-target-verify")) {
885         verifyTarget = false;
886       } else if (cmd.equals("-mappers")) {
887         mappers = Integer.parseInt(args[++i]);
888       } else if (cmd.equals("-chuser")) {
889         filesUser = args[++i];
890       } else if (cmd.equals("-chgroup")) {
891         filesGroup = args[++i];
892       } else if (cmd.equals("-bandwidth")) {
893         bandwidthMB = Integer.parseInt(args[++i]);
894       } else if (cmd.equals("-chmod")) {
895         filesMode = Integer.parseInt(args[++i], 8);
896       } else if (cmd.equals("-overwrite")) {
897         overwrite = true;
898       } else if (cmd.equals("-h") || cmd.equals("--help")) {
899         printUsageAndExit();
900       } else {
901         System.err.println("UNEXPECTED: " + cmd);
902         printUsageAndExit();
903       }
904     }
905 
906     // Check user options
907     if (snapshotName == null) {
908       System.err.println("Snapshot name not provided.");
909       printUsageAndExit();
910     }
911 
912     if (outputRoot == null) {
913       System.err.println("Destination file-system not provided.");
914       printUsageAndExit();
915     }
916 
917     if (targetName == null) {
918       targetName = snapshotName;
919     }
920 
921     conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
922     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
923     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
924     conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
925     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
926     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
927 
928     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
929 
930     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
931     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
932     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
933     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
934 
935     // Check if the snapshot already exists
936     if (outputFs.exists(outputSnapshotDir)) {
937       if (overwrite) {
938         if (!outputFs.delete(outputSnapshotDir, true)) {
939           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
940           return 1;
941         }
942       } else {
943         System.err.println("The snapshot '" + targetName +
944           "' already exists in the destination: " + outputSnapshotDir);
945         return 1;
946       }
947     }
948 
949     if (!skipTmp) {
950       // Check if the snapshot already in-progress
951       if (outputFs.exists(snapshotTmpDir)) {
952         if (overwrite) {
953           if (!outputFs.delete(snapshotTmpDir, true)) {
954             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
955             return 1;
956           }
957         } else {
958           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
959           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
960           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
961           return 1;
962         }
963       }
964     }
965 
966     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
967     // The snapshot references must be copied before the hfiles otherwise the cleaner
968     // will remove them because they are unreferenced.
969     try {
970       LOG.info("Copy Snapshot Manifest");
971       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
972       if (filesUser != null || filesGroup != null) {
973         setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
974       }
975       if (filesMode > 0) {
976         setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
977       }
978     } catch (IOException e) {
979       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
980         snapshotDir + " to=" + initialOutputSnapshotDir, e);
981     }
982 
983     // Write a new .snapshotinfo if the target name is different from the source name
984     if (!targetName.equals(snapshotName)) {
985       SnapshotDescription snapshotDesc =
986         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
987           .toBuilder()
988           .setName(targetName)
989           .build();
990       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
991     }
992 
993     // Step 2 - Start MR Job to copy files
994     // The snapshot references must be copied before the files otherwise the files gets removed
995     // by the HFileArchiver, since they have no references.
996     try {
997       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
998                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
999 
1000       LOG.info("Finalize the Snapshot Export");
1001       if (!skipTmp) {
1002         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1003         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1004           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1005             snapshotTmpDir + " to=" + outputSnapshotDir);
1006         }
1007       }
1008 
1009       // Step 4 - Verify snapshot integrity
1010       if (verifyTarget) {
1011         LOG.info("Verify snapshot integrity");
1012         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
1013       }
1014 
1015       LOG.info("Export Completed: " + targetName);
1016       return 0;
1017     } catch (Exception e) {
1018       LOG.error("Snapshot export failed", e);
1019       if (!skipTmp) {
1020         outputFs.delete(snapshotTmpDir, true);
1021       }
1022       outputFs.delete(outputSnapshotDir, true);
1023       return 1;
1024     } finally {
1025       IOUtils.closeStream(inputFs);
1026       IOUtils.closeStream(outputFs);
1027     }
1028   }
1029 
1030   // ExportSnapshot
1031   private void printUsageAndExit() {
1032     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1033     System.err.println(" where [options] are:");
1034     System.err.println("  -h|-help                Show this help and exit.");
1035     System.err.println("  -snapshot NAME          Snapshot to restore.");
1036     System.err.println("  -copy-to NAME           Remote destination hdfs://");
1037     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
1038     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
1039     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
1040         "exported snapshot.");
1041     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
1042     System.err.println("  -chuser USERNAME        Change the owner of the files " +
1043         "to the specified one.");
1044     System.err.println("  -chgroup GROUP          Change the group of the files to " +
1045         "the specified one.");
1046     System.err.println("  -chmod MODE             Change the permission of the files " +
1047         "to the specified one.");
1048     System.err.println("  -mappers                Number of mappers to use during the " +
1049         "copy (mapreduce.job.maps).");
1050     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
1051     System.err.println();
1052     System.err.println("Examples:");
1053     System.err.println("  hbase " + getClass().getName() + " \\");
1054     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1055     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1056     System.err.println();
1057     System.err.println("  hbase " + getClass().getName() + " \\");
1058     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1059     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1060     System.exit(1);
1061   }
1062 
1063   /**
1064    * The guts of the {@link #main} method.
1065    * Call this method to avoid the {@link #main(String[])} System.exit.
1066    * @param args
1067    * @return errCode
1068    * @throws Exception
1069    */
1070   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1071     return ToolRunner.run(conf, new ExportSnapshot(), args);
1072   }
1073 
1074   public static void main(String[] args) throws Exception {
1075     System.exit(innerMain(HBaseConfiguration.create(), args));
1076   }
1077 }