View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.io.BytesWritable;
62  import org.apache.hadoop.io.IOUtils;
63  import org.apache.hadoop.io.NullWritable;
64  import org.apache.hadoop.io.Writable;
65  import org.apache.hadoop.mapreduce.Job;
66  import org.apache.hadoop.mapreduce.JobContext;
67  import org.apache.hadoop.mapreduce.Mapper;
68  import org.apache.hadoop.mapreduce.InputFormat;
69  import org.apache.hadoop.mapreduce.InputSplit;
70  import org.apache.hadoop.mapreduce.RecordReader;
71  import org.apache.hadoop.mapreduce.TaskAttemptContext;
72  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73  import org.apache.hadoop.mapreduce.security.TokenCache;
74  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75  import org.apache.hadoop.util.StringUtils;
76  import org.apache.hadoop.util.Tool;
77  import org.apache.hadoop.util.ToolRunner;
78  
79  /**
80   * Export the specified snapshot to a given FileSystem.
81   *
82   * The .snapshot/name folder is copied to the destination cluster
83   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
84   * When everything is done, the second cluster can restore the snapshot.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class ExportSnapshot extends Configured implements Tool {
89    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
90  
91    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
92    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
93    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
94    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
95    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
96    private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
97    private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
98    private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
99    private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
100   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
101   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
102   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
103   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
104   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
105 
106   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
107   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
108 
109   private static final String INPUT_FOLDER_PREFIX = "export-files.";
110 
111   // Export Map-Reduce Counters, to keep track of the progress
112   public enum Counter {
113     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
114     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
115   }
116 
117   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
118                                                    NullWritable, NullWritable> {
119     final static int REPORT_SIZE = 1 * 1024 * 1024;
120     final static int BUFFER_SIZE = 64 * 1024;
121 
122     private boolean testFailures;
123     private Random random;
124 
125     private boolean verifyChecksum;
126     private String filesGroup;
127     private String filesUser;
128     private short filesMode;
129     private int bufferSize;
130 
131     private FileSystem outputFs;
132     private Path outputArchive;
133     private Path outputRoot;
134 
135     private FileSystem inputFs;
136     private Path inputArchive;
137     private Path inputRoot;
138 
139     @Override
140     public void setup(Context context) throws IOException {
141       Configuration conf = context.getConfiguration();
142       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
143 
144       filesGroup = conf.get(CONF_FILES_GROUP);
145       filesUser = conf.get(CONF_FILES_USER);
146       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
147       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
148       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
149 
150       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
151       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
152 
153       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
154 
155       try {
156         conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
157         inputFs = FileSystem.get(inputRoot.toUri(), conf);
158       } catch (IOException e) {
159         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
160       }
161 
162       try {
163         conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
164         outputFs = FileSystem.get(outputRoot.toUri(), conf);
165       } catch (IOException e) {
166         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
167       }
168 
169       // Use the default block size of the outputFs if bigger
170       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
171       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
172       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
173 
174       for (Counter c : Counter.values()) {
175         context.getCounter(c).increment(0);
176       }
177     }
178 
179     @Override
180     protected void cleanup(Context context) {
181       IOUtils.closeStream(inputFs);
182       IOUtils.closeStream(outputFs);
183     }
184 
185     @Override
186     public void map(BytesWritable key, NullWritable value, Context context)
187         throws InterruptedException, IOException {
188       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
189       Path outputPath = getOutputPath(inputInfo);
190 
191       copyFile(context, inputInfo, outputPath);
192     }
193 
194     /**
195      * Returns the location where the inputPath will be copied.
196      */
197     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
198       Path path = null;
199       switch (inputInfo.getType()) {
200         case HFILE:
201           Path inputPath = new Path(inputInfo.getHfile());
202           String family = inputPath.getParent().getName();
203           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
204           String region = HFileLink.getReferencedRegionName(inputPath.getName());
205           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
206           path = new Path(FSUtils.getTableDir(new Path("./"), table),
207               new Path(region, new Path(family, hfile)));
208           break;
209         case WAL:
210           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
211           path = new Path(oldLogsDir, inputInfo.getWalName());
212           break;
213         default:
214           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
215       }
216       return new Path(outputArchive, path);
217     }
218 
219     /*
220      * Used by TestExportSnapshot to simulate a failure
221      */
222     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
223         throws IOException {
224       if (testFailures) {
225         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
226           if (random == null) {
227             random = new Random();
228           }
229 
230           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
231           // retry, but at least we reduce the number of test failures due to
232           // this test exception from the same map task.
233           if (random.nextFloat() < 0.03) {
234             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
235                                   + " time=" + System.currentTimeMillis());
236           }
237         } else {
238           context.getCounter(Counter.COPY_FAILED).increment(1);
239           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
240         }
241       }
242     }
243 
244     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
245         final Path outputPath) throws IOException {
246       injectTestFailure(context, inputInfo);
247 
248       // Get the file information
249       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
250 
251       // Verify if the output file exists and is the same that we want to copy
252       if (outputFs.exists(outputPath)) {
253         FileStatus outputStat = outputFs.getFileStatus(outputPath);
254         if (outputStat != null && sameFile(inputStat, outputStat)) {
255           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
256           context.getCounter(Counter.FILES_SKIPPED).increment(1);
257           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
258           return;
259         }
260       }
261 
262       InputStream in = openSourceFile(context, inputInfo);
263       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
264       if (Integer.MAX_VALUE != bandwidthMB) {
265         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
266       }
267 
268       try {
269         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
270 
271         // Ensure that the output folder is there and copy the file
272         outputFs.mkdirs(outputPath.getParent());
273         FSDataOutputStream out = outputFs.create(outputPath, true);
274         try {
275           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
276         } finally {
277           out.close();
278         }
279 
280         // Try to Preserve attributes
281         if (!preserveAttributes(outputPath, inputStat)) {
282           LOG.warn("You may have to run manually chown on: " + outputPath);
283         }
284       } finally {
285         in.close();
286       }
287     }
288 
289     /**
290      * Try to Preserve the files attribute selected by the user copying them from the source file
291      * This is only required when you are exporting as a different user than "hbase" or on a system
292      * that doesn't have the "hbase" user.
293      *
294      * This is not considered a blocking failure since the user can force a chmod with the user
295      * that knows is available on the system.
296      */
297     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
298       FileStatus stat;
299       try {
300         stat = outputFs.getFileStatus(path);
301       } catch (IOException e) {
302         LOG.warn("Unable to get the status for file=" + path);
303         return false;
304       }
305 
306       try {
307         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
308           outputFs.setPermission(path, new FsPermission(filesMode));
309         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
310           outputFs.setPermission(path, refStat.getPermission());
311         }
312       } catch (IOException e) {
313         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
314         return false;
315       }
316 
317       boolean hasRefStat = (refStat != null);
318       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
319       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
320       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
321         try {
322           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
323             outputFs.setOwner(path, user, group);
324           }
325         } catch (IOException e) {
326           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
327           LOG.warn("The user/group may not exist on the destination cluster: user=" +
328                    user + " group=" + group);
329           return false;
330         }
331       }
332 
333       return true;
334     }
335 
336     private boolean stringIsNotEmpty(final String str) {
337       return str != null && str.length() > 0;
338     }
339 
340     private void copyData(final Context context,
341         final Path inputPath, final InputStream in,
342         final Path outputPath, final FSDataOutputStream out,
343         final long inputFileSize)
344         throws IOException {
345       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
346                                    " (%.1f%%)";
347 
348       try {
349         byte[] buffer = new byte[bufferSize];
350         long totalBytesWritten = 0;
351         int reportBytes = 0;
352         int bytesRead;
353 
354         long stime = System.currentTimeMillis();
355         while ((bytesRead = in.read(buffer)) > 0) {
356           out.write(buffer, 0, bytesRead);
357           totalBytesWritten += bytesRead;
358           reportBytes += bytesRead;
359 
360           if (reportBytes >= REPORT_SIZE) {
361             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
362             context.setStatus(String.format(statusMessage,
363                               StringUtils.humanReadableInt(totalBytesWritten),
364                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
365                               " from " + inputPath + " to " + outputPath);
366             reportBytes = 0;
367           }
368         }
369         long etime = System.currentTimeMillis();
370 
371         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
372         context.setStatus(String.format(statusMessage,
373                           StringUtils.humanReadableInt(totalBytesWritten),
374                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
375                           " from " + inputPath + " to " + outputPath);
376 
377         // Verify that the written size match
378         if (totalBytesWritten != inputFileSize) {
379           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
380                        " expected=" + inputFileSize + " for file=" + inputPath;
381           throw new IOException(msg);
382         }
383 
384         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
385         LOG.info("size=" + totalBytesWritten +
386             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
387             " time=" + StringUtils.formatTimeDiff(etime, stime) +
388             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
389         context.getCounter(Counter.FILES_COPIED).increment(1);
390       } catch (IOException e) {
391         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
392         context.getCounter(Counter.COPY_FAILED).increment(1);
393         throw e;
394       }
395     }
396 
397     /**
398      * Try to open the "source" file.
399      * Throws an IOException if the communication with the inputFs fail or
400      * if the file is not found.
401      */
402     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
403             throws IOException {
404       try {
405         Configuration conf = context.getConfiguration();
406         FileLink link = null;
407         switch (fileInfo.getType()) {
408           case HFILE:
409             Path inputPath = new Path(fileInfo.getHfile());
410             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
411             break;
412           case WAL:
413             String serverName = fileInfo.getWalServer();
414             String logName = fileInfo.getWalName();
415             link = new WALLink(inputRoot, serverName, logName);
416             break;
417           default:
418             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
419         }
420         return link.open(inputFs);
421       } catch (IOException e) {
422         context.getCounter(Counter.MISSING_FILES).increment(1);
423         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
424         throw e;
425       }
426     }
427 
428     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
429         throws IOException {
430       try {
431         Configuration conf = context.getConfiguration();
432         FileLink link = null;
433         switch (fileInfo.getType()) {
434           case HFILE:
435             Path inputPath = new Path(fileInfo.getHfile());
436             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
437             break;
438           case WAL:
439             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
440             break;
441           default:
442             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
443         }
444         return link.getFileStatus(inputFs);
445       } catch (FileNotFoundException e) {
446         context.getCounter(Counter.MISSING_FILES).increment(1);
447         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
448         throw e;
449       } catch (IOException e) {
450         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
451         throw e;
452       }
453     }
454 
455     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
456       try {
457         return fs.getFileChecksum(path);
458       } catch (IOException e) {
459         LOG.warn("Unable to get checksum for file=" + path, e);
460         return null;
461       }
462     }
463 
464     /**
465      * Check if the two files are equal by looking at the file length,
466      * and at the checksum (if user has specified the verifyChecksum flag).
467      */
468     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
469       // Not matching length
470       if (inputStat.getLen() != outputStat.getLen()) return false;
471 
472       // Mark files as equals, since user asked for no checksum verification
473       if (!verifyChecksum) return true;
474 
475       // If checksums are not available, files are not the same.
476       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
477       if (inChecksum == null) return false;
478 
479       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
480       if (outChecksum == null) return false;
481 
482       return inChecksum.equals(outChecksum);
483     }
484   }
485 
486   // ==========================================================================
487   //  Input Format
488   // ==========================================================================
489 
490   /**
491    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
492    * @return list of files referenced by the snapshot (pair of path and size)
493    */
494   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
495       final FileSystem fs, final Path snapshotDir) throws IOException {
496     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
497 
498     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
499     final TableName table = TableName.valueOf(snapshotDesc.getTable());
500 
501     // Get snapshot files
502     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
503     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
504       new SnapshotReferenceUtil.SnapshotVisitor() {
505         @Override
506         public void storeFile(final HRegionInfo regionInfo, final String family,
507             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
508           if (storeFile.hasReference()) {
509             // copied as part of the manifest
510           } else {
511             String region = regionInfo.getEncodedName();
512             String hfile = storeFile.getName();
513             Path path = HFileLink.createPath(table, region, family, hfile);
514 
515             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
516               .setType(SnapshotFileInfo.Type.HFILE)
517               .setHfile(path.toString())
518               .build();
519 
520             long size;
521             if (storeFile.hasFileSize()) {
522               size = storeFile.getFileSize();
523             } else {
524               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
525             }
526             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
527           }
528         }
529 
530         @Override
531         public void logFile (final String server, final String logfile)
532             throws IOException {
533           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
534             .setType(SnapshotFileInfo.Type.WAL)
535             .setWalServer(server)
536             .setWalName(logfile)
537             .build();
538 
539           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
540           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
541         }
542     });
543 
544     return files;
545   }
546 
547   /**
548    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
549    * The groups created will have similar amounts of bytes.
550    * <p>
551    * The algorithm used is pretty straightforward; the file list is sorted by size,
552    * and then each group fetch the bigger file available, iterating through groups
553    * alternating the direction.
554    */
555   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
556       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
557     // Sort files by size, from small to big
558     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
559       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
560         long r = a.getSecond() - b.getSecond();
561         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
562       }
563     });
564 
565     // create balanced groups
566     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
567       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
568     long[] sizeGroups = new long[ngroups];
569     int hi = files.size() - 1;
570     int lo = 0;
571 
572     List<Pair<SnapshotFileInfo, Long>> group;
573     int dir = 1;
574     int g = 0;
575 
576     while (hi >= lo) {
577       if (g == fileGroups.size()) {
578         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
579         fileGroups.add(group);
580       } else {
581         group = fileGroups.get(g);
582       }
583 
584       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
585 
586       // add the hi one
587       sizeGroups[g] += fileInfo.getSecond();
588       group.add(fileInfo);
589 
590       // change direction when at the end or the beginning
591       g += dir;
592       if (g == ngroups) {
593         dir = -1;
594         g = ngroups - 1;
595       } else if (g < 0) {
596         dir = 1;
597         g = 0;
598       }
599     }
600 
601     if (LOG.isDebugEnabled()) {
602       for (int i = 0; i < sizeGroups.length; ++i) {
603         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
604       }
605     }
606 
607     return fileGroups;
608   }
609 
610   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
611     @Override
612     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
613         TaskAttemptContext tac) throws IOException, InterruptedException {
614       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
615     }
616 
617     @Override
618     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
619       Configuration conf = context.getConfiguration();
620       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
621       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
622       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
623 
624       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
625       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
626       if (mappers == 0 && snapshotFiles.size() > 0) {
627         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
628         mappers = Math.min(mappers, snapshotFiles.size());
629         conf.setInt(CONF_NUM_SPLITS, mappers);
630         conf.setInt(MR_NUM_MAPS, mappers);
631       }
632 
633       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
634       List<InputSplit> splits = new ArrayList(groups.size());
635       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
636         splits.add(new ExportSnapshotInputSplit(files));
637       }
638       return splits;
639     }
640 
641     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
642       private List<Pair<BytesWritable, Long>> files;
643       private long length;
644 
645       public ExportSnapshotInputSplit() {
646         this.files = null;
647       }
648 
649       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
650         this.files = new ArrayList(snapshotFiles.size());
651         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
652           this.files.add(new Pair<BytesWritable, Long>(
653             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
654           this.length += fileInfo.getSecond();
655         }
656       }
657 
658       private List<Pair<BytesWritable, Long>> getSplitKeys() {
659         return files;
660       }
661 
662       @Override
663       public long getLength() throws IOException, InterruptedException {
664         return length;
665       }
666 
667       @Override
668       public String[] getLocations() throws IOException, InterruptedException {
669         return new String[] {};
670       }
671 
672       @Override
673       public void readFields(DataInput in) throws IOException {
674         int count = in.readInt();
675         files = new ArrayList<Pair<BytesWritable, Long>>(count);
676         length = 0;
677         for (int i = 0; i < count; ++i) {
678           BytesWritable fileInfo = new BytesWritable();
679           fileInfo.readFields(in);
680           long size = in.readLong();
681           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
682           length += size;
683         }
684       }
685 
686       @Override
687       public void write(DataOutput out) throws IOException {
688         out.writeInt(files.size());
689         for (final Pair<BytesWritable, Long> fileInfo: files) {
690           fileInfo.getFirst().write(out);
691           out.writeLong(fileInfo.getSecond());
692         }
693       }
694     }
695 
696     private static class ExportSnapshotRecordReader
697         extends RecordReader<BytesWritable, NullWritable> {
698       private final List<Pair<BytesWritable, Long>> files;
699       private long totalSize = 0;
700       private long procSize = 0;
701       private int index = -1;
702 
703       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
704         this.files = files;
705         for (Pair<BytesWritable, Long> fileInfo: files) {
706           totalSize += fileInfo.getSecond();
707         }
708       }
709 
710       @Override
711       public void close() { }
712 
713       @Override
714       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
715 
716       @Override
717       public NullWritable getCurrentValue() { return NullWritable.get(); }
718 
719       @Override
720       public float getProgress() { return (float)procSize / totalSize; }
721 
722       @Override
723       public void initialize(InputSplit split, TaskAttemptContext tac) { }
724 
725       @Override
726       public boolean nextKeyValue() {
727         if (index >= 0) {
728           procSize += files.get(index).getSecond();
729         }
730         return(++index < files.size());
731       }
732     }
733   }
734 
735   // ==========================================================================
736   //  Tool
737   // ==========================================================================
738 
739   /**
740    * Run Map-Reduce Job to perform the files copy.
741    */
742   private void runCopyJob(final Path inputRoot, final Path outputRoot,
743       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
744       final String filesUser, final String filesGroup, final int filesMode,
745       final int mappers, final int bandwidthMB)
746           throws IOException, InterruptedException, ClassNotFoundException {
747     Configuration conf = getConf();
748     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
749     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
750     if (mappers > 0) {
751       conf.setInt(CONF_NUM_SPLITS, mappers);
752       conf.setInt(MR_NUM_MAPS, mappers);
753     }
754     conf.setInt(CONF_FILES_MODE, filesMode);
755     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
756     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
757     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
758     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
759     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
760     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
761 
762     Job job = new Job(conf);
763     job.setJobName("ExportSnapshot-" + snapshotName);
764     job.setJarByClass(ExportSnapshot.class);
765     TableMapReduceUtil.addDependencyJars(job);
766     job.setMapperClass(ExportMapper.class);
767     job.setInputFormatClass(ExportSnapshotInputFormat.class);
768     job.setOutputFormatClass(NullOutputFormat.class);
769     job.setMapSpeculativeExecution(false);
770     job.setNumReduceTasks(0);
771 
772     // Acquire the delegation Tokens
773     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
774       new Path[] { inputRoot, outputRoot }, conf);
775 
776     // Run the MR Job
777     if (!job.waitForCompletion(true)) {
778       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
779       // when it will be available on all the supported versions.
780       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
781     }
782   }
783 
784   private void verifySnapshot(final Configuration baseConf,
785       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
786     // Update the conf with the current root dir, since may be a different cluster
787     Configuration conf = new Configuration(baseConf);
788     FSUtils.setRootDir(conf, rootDir);
789     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
790     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
791     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
792   }
793 
794   /**
795    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
796    * @return 0 on success, and != 0 upon failure.
797    */
798   @Override
799   public int run(String[] args) throws IOException {
800     boolean verifyTarget = true;
801     boolean verifyChecksum = true;
802     String snapshotName = null;
803     String targetName = null;
804     boolean overwrite = false;
805     String filesGroup = null;
806     String filesUser = null;
807     Path outputRoot = null;
808     int bandwidthMB = Integer.MAX_VALUE;
809     int filesMode = 0;
810     int mappers = 0;
811 
812     Configuration conf = getConf();
813     Path inputRoot = FSUtils.getRootDir(conf);
814 
815     // Process command line args
816     for (int i = 0; i < args.length; i++) {
817       String cmd = args[i];
818       if (cmd.equals("-snapshot")) {
819         snapshotName = args[++i];
820       } else if (cmd.equals("-target")) {
821         targetName = args[++i];
822       } else if (cmd.equals("-copy-to")) {
823         outputRoot = new Path(args[++i]);
824       } else if (cmd.equals("-copy-from")) {
825         inputRoot = new Path(args[++i]);
826         FSUtils.setRootDir(conf, inputRoot);
827       } else if (cmd.equals("-no-checksum-verify")) {
828         verifyChecksum = false;
829       } else if (cmd.equals("-no-target-verify")) {
830         verifyTarget = false;
831       } else if (cmd.equals("-mappers")) {
832         mappers = Integer.parseInt(args[++i]);
833       } else if (cmd.equals("-chuser")) {
834         filesUser = args[++i];
835       } else if (cmd.equals("-chgroup")) {
836         filesGroup = args[++i];
837       } else if (cmd.equals("-bandwidth")) {
838         bandwidthMB = Integer.parseInt(args[++i]);
839       } else if (cmd.equals("-chmod")) {
840         filesMode = Integer.parseInt(args[++i], 8);
841       } else if (cmd.equals("-overwrite")) {
842         overwrite = true;
843       } else if (cmd.equals("-h") || cmd.equals("--help")) {
844         printUsageAndExit();
845       } else {
846         System.err.println("UNEXPECTED: " + cmd);
847         printUsageAndExit();
848       }
849     }
850 
851     // Check user options
852     if (snapshotName == null) {
853       System.err.println("Snapshot name not provided.");
854       printUsageAndExit();
855     }
856 
857     if (outputRoot == null) {
858       System.err.println("Destination file-system not provided.");
859       printUsageAndExit();
860     }
861 
862     if (targetName == null) {
863       targetName = snapshotName;
864     }
865 
866     conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
867     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
868     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
869     conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
870     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
871     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
872 
873     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
874 
875     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
876     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
877     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
878     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
879 
880     // Check if the snapshot already exists
881     if (outputFs.exists(outputSnapshotDir)) {
882       if (overwrite) {
883         if (!outputFs.delete(outputSnapshotDir, true)) {
884           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
885           return 1;
886         }
887       } else {
888         System.err.println("The snapshot '" + targetName +
889           "' already exists in the destination: " + outputSnapshotDir);
890         return 1;
891       }
892     }
893 
894     if (!skipTmp) {
895       // Check if the snapshot already in-progress
896       if (outputFs.exists(snapshotTmpDir)) {
897         if (overwrite) {
898           if (!outputFs.delete(snapshotTmpDir, true)) {
899             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
900             return 1;
901           }
902         } else {
903           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
904           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
905           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
906           return 1;
907         }
908       }
909     }
910 
911     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
912     // The snapshot references must be copied before the hfiles otherwise the cleaner
913     // will remove them because they are unreferenced.
914     try {
915       LOG.info("Copy Snapshot Manifest");
916       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
917     } catch (IOException e) {
918       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
919         snapshotDir + " to=" + initialOutputSnapshotDir, e);
920     }
921 
922     // Write a new .snapshotinfo if the target name is different from the source name
923     if (!targetName.equals(snapshotName)) {
924       SnapshotDescription snapshotDesc =
925         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
926           .toBuilder()
927           .setName(targetName)
928           .build();
929       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
930     }
931 
932     // Step 2 - Start MR Job to copy files
933     // The snapshot references must be copied before the files otherwise the files gets removed
934     // by the HFileArchiver, since they have no references.
935     try {
936       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
937                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
938 
939       LOG.info("Finalize the Snapshot Export");
940       if (!skipTmp) {
941         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
942         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
943           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
944             snapshotTmpDir + " to=" + outputSnapshotDir);
945         }
946       }
947 
948       // Step 4 - Verify snapshot integrity
949       if (verifyTarget) {
950         LOG.info("Verify snapshot integrity");
951         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
952       }
953 
954       LOG.info("Export Completed: " + targetName);
955       return 0;
956     } catch (Exception e) {
957       LOG.error("Snapshot export failed", e);
958       if (!skipTmp) {
959         outputFs.delete(snapshotTmpDir, true);
960       }
961       outputFs.delete(outputSnapshotDir, true);
962       return 1;
963     } finally {
964       IOUtils.closeStream(inputFs);
965       IOUtils.closeStream(outputFs);
966     }
967   }
968 
969   // ExportSnapshot
970   private void printUsageAndExit() {
971     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
972     System.err.println(" where [options] are:");
973     System.err.println("  -h|-help                Show this help and exit.");
974     System.err.println("  -snapshot NAME          Snapshot to restore.");
975     System.err.println("  -copy-to NAME           Remote destination hdfs://");
976     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
977     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
978     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
979         "exported snapshot.");
980     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
981     System.err.println("  -chuser USERNAME        Change the owner of the files " +
982         "to the specified one.");
983     System.err.println("  -chgroup GROUP          Change the group of the files to " +
984         "the specified one.");
985     System.err.println("  -chmod MODE             Change the permission of the files " +
986         "to the specified one.");
987     System.err.println("  -mappers                Number of mappers to use during the " +
988         "copy (mapreduce.job.maps).");
989     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
990     System.err.println();
991     System.err.println("Examples:");
992     System.err.println("  hbase " + getClass().getName() + " \\");
993     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
994     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
995     System.err.println();
996     System.err.println("  hbase " + getClass().getName() + " \\");
997     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
998     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
999     System.exit(1);
1000   }
1001 
1002   /**
1003    * The guts of the {@link #main} method.
1004    * Call this method to avoid the {@link #main(String[])} System.exit.
1005    * @param args
1006    * @return errCode
1007    * @throws Exception
1008    */
1009   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1010     return ToolRunner.run(conf, new ExportSnapshot(), args);
1011   }
1012 
1013   public static void main(String[] args) throws Exception {
1014     System.exit(innerMain(HBaseConfiguration.create(), args));
1015   }
1016 }