View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.net.URI;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.Comparator;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.conf.Configured;
41  import org.apache.hadoop.fs.FSDataInputStream;
42  import org.apache.hadoop.fs.FSDataOutputStream;
43  import org.apache.hadoop.fs.FileChecksum;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.FileUtil;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.fs.permission.FsPermission;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.io.FileLink;
54  import org.apache.hadoop.hbase.io.HFileLink;
55  import org.apache.hadoop.hbase.io.HLogLink;
56  import org.apache.hadoop.hbase.mapreduce.JobUtil;
57  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
58  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
59  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
60  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.Pair;
64  import org.apache.hadoop.io.BytesWritable;
65  import org.apache.hadoop.io.NullWritable;
66  import org.apache.hadoop.io.SequenceFile;
67  import org.apache.hadoop.io.Writable;
68  import org.apache.hadoop.mapreduce.Job;
69  import org.apache.hadoop.mapreduce.JobContext;
70  import org.apache.hadoop.mapreduce.Mapper;
71  import org.apache.hadoop.mapreduce.InputFormat;
72  import org.apache.hadoop.mapreduce.InputSplit;
73  import org.apache.hadoop.mapreduce.RecordReader;
74  import org.apache.hadoop.mapreduce.TaskAttemptContext;
75  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
76  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
77  import org.apache.hadoop.mapreduce.security.TokenCache;
78  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
79  import org.apache.hadoop.util.StringUtils;
80  import org.apache.hadoop.util.Tool;
81  import org.apache.hadoop.util.ToolRunner;
82  
83  /**
84   * Export the specified snapshot to a given FileSystem.
85   *
86   * The .snapshot/name folder is copied to the destination cluster
87   * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
88   * When everything is done, the second cluster can restore the snapshot.
89   */
90  @InterfaceAudience.Public
91  @InterfaceStability.Evolving
92  public class ExportSnapshot extends Configured implements Tool {
93    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
94  
95    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
96    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
97    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
98    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
99    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
100   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
101   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
102   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
103   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
104   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
105   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
106   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
107   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
108   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
109 
110   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
111   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
112 
113   private static final String INPUT_FOLDER_PREFIX = "export-files.";
114 
115   // Export Map-Reduce Counters, to keep track of the progress
116   public enum Counter {
117     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
118     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
119   }
120 
121   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
122                                                    NullWritable, NullWritable> {
123     final static int REPORT_SIZE = 1 * 1024 * 1024;
124     final static int BUFFER_SIZE = 64 * 1024;
125 
126     private boolean testFailures;
127     private Random random;
128 
129     private boolean verifyChecksum;
130     private String filesGroup;
131     private String filesUser;
132     private short filesMode;
133     private int bufferSize;
134 
135     private FileSystem outputFs;
136     private Path outputArchive;
137     private Path outputRoot;
138 
139     private FileSystem inputFs;
140     private Path inputArchive;
141     private Path inputRoot;
142 
143     @Override
144     public void setup(Context context) throws IOException {
145       Configuration conf = context.getConfiguration();
146       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
147 
148       filesGroup = conf.get(CONF_FILES_GROUP);
149       filesUser = conf.get(CONF_FILES_USER);
150       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
151       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
152       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
153 
154       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
155       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
156 
157       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
158 
159       try {
160         inputFs = FileSystem.get(inputRoot.toUri(), conf);
161       } catch (IOException e) {
162         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
163       }
164 
165       try {
166         outputFs = FileSystem.get(outputRoot.toUri(), conf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
169       }
170 
171       // Use the default block size of the outputFs if bigger
172       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
173       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
174       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
175 
176       for (Counter c : Counter.values()) {
177         context.getCounter(c).increment(0);
178       }
179     }
180 
181     @Override
182     public void map(BytesWritable key, NullWritable value, Context context)
183         throws InterruptedException, IOException {
184       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
185       Path outputPath = getOutputPath(inputInfo);
186 
187       copyFile(context, inputInfo, outputPath);
188     }
189 
190     /**
191      * Returns the location where the inputPath will be copied.
192      */
193     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
194       Path path = null;
195       switch (inputInfo.getType()) {
196         case HFILE:
197           Path inputPath = new Path(inputInfo.getHfile());
198           String family = inputPath.getParent().getName();
199           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
200           String region = HFileLink.getReferencedRegionName(inputPath.getName());
201           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
202           path = new Path(FSUtils.getTableDir(new Path("./"), table),
203               new Path(region, new Path(family, hfile)));
204           break;
205         case WAL:
206           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
207           path = new Path(oldLogsDir, inputInfo.getWalName());
208           break;
209         default:
210           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
211       }
212       return new Path(outputArchive, path);
213     }
214 
215     /*
216      * Used by TestExportSnapshot to simulate a failure
217      */
218     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
219         throws IOException {
220       if (testFailures) {
221         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
222           if (random == null) {
223             random = new Random();
224           }
225 
226           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
227           // retry, but at least we reduce the number of test failures due to
228           // this test exception from the same map task.
229           if (random.nextFloat() < 0.03) {
230             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
231                                   + " time=" + System.currentTimeMillis());
232           }
233         } else {
234           context.getCounter(Counter.COPY_FAILED).increment(1);
235           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
236         }
237       }
238     }
239 
240     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
241         final Path outputPath) throws IOException {
242       injectTestFailure(context, inputInfo);
243 
244       // Get the file information
245       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
246 
247       // Verify if the output file exists and is the same that we want to copy
248       if (outputFs.exists(outputPath)) {
249         FileStatus outputStat = outputFs.getFileStatus(outputPath);
250         if (outputStat != null && sameFile(inputStat, outputStat)) {
251           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
252           context.getCounter(Counter.FILES_SKIPPED).increment(1);
253           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
254           return;
255         }
256       }
257 
258       InputStream in = openSourceFile(context, inputInfo);
259       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
260       if (Integer.MAX_VALUE != bandwidthMB) {
261         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
262       }
263 
264       try {
265         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
266 
267         // Ensure that the output folder is there and copy the file
268         outputFs.mkdirs(outputPath.getParent());
269         FSDataOutputStream out = outputFs.create(outputPath, true);
270         try {
271           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
272         } finally {
273           out.close();
274         }
275 
276         // Try to Preserve attributes
277         if (!preserveAttributes(outputPath, inputStat)) {
278           LOG.warn("You may have to run manually chown on: " + outputPath);
279         }
280       } finally {
281         in.close();
282       }
283     }
284 
285     /**
286      * Try to Preserve the files attribute selected by the user copying them from the source file
287      * This is only required when you are exporting as a different user than "hbase" or on a system
288      * that doesn't have the "hbase" user.
289      *
290      * This is not considered a blocking failure since the user can force a chmod with the user
291      * that knows is available on the system.
292      */
293     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
294       FileStatus stat;
295       try {
296         stat = outputFs.getFileStatus(path);
297       } catch (IOException e) {
298         LOG.warn("Unable to get the status for file=" + path);
299         return false;
300       }
301 
302       try {
303         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
304           outputFs.setPermission(path, new FsPermission(filesMode));
305         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
306           outputFs.setPermission(path, refStat.getPermission());
307         }
308       } catch (IOException e) {
309         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
310         return false;
311       }
312 
313       boolean hasRefStat = (refStat != null);
314       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
315       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
316       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
317         try {
318           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
319             outputFs.setOwner(path, user, group);
320           }
321         } catch (IOException e) {
322           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
323           LOG.warn("The user/group may not exist on the destination cluster: user=" +
324                    user + " group=" + group);
325           return false;
326         }
327       }
328 
329       return true;
330     }
331 
332     private boolean stringIsNotEmpty(final String str) {
333       return str != null && str.length() > 0;
334     }
335 
336     private void copyData(final Context context,
337         final Path inputPath, final InputStream in,
338         final Path outputPath, final FSDataOutputStream out,
339         final long inputFileSize)
340         throws IOException {
341       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
342                                    " (%.1f%%)";
343 
344       try {
345         byte[] buffer = new byte[bufferSize];
346         long totalBytesWritten = 0;
347         int reportBytes = 0;
348         int bytesRead;
349 
350         long stime = System.currentTimeMillis();
351         while ((bytesRead = in.read(buffer)) > 0) {
352           out.write(buffer, 0, bytesRead);
353           totalBytesWritten += bytesRead;
354           reportBytes += bytesRead;
355 
356           if (reportBytes >= REPORT_SIZE) {
357             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
358             context.setStatus(String.format(statusMessage,
359                               StringUtils.humanReadableInt(totalBytesWritten),
360                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
361                               " from " + inputPath + " to " + outputPath);
362             reportBytes = 0;
363           }
364         }
365         long etime = System.currentTimeMillis();
366 
367         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
368         context.setStatus(String.format(statusMessage,
369                           StringUtils.humanReadableInt(totalBytesWritten),
370                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
371                           " from " + inputPath + " to " + outputPath);
372 
373         // Verify that the written size match
374         if (totalBytesWritten != inputFileSize) {
375           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
376                        " expected=" + inputFileSize + " for file=" + inputPath;
377           throw new IOException(msg);
378         }
379 
380         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
381         LOG.info("size=" + totalBytesWritten +
382             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
383             " time=" + StringUtils.formatTimeDiff(etime, stime) +
384             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
385         context.getCounter(Counter.FILES_COPIED).increment(1);
386       } catch (IOException e) {
387         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
388         context.getCounter(Counter.COPY_FAILED).increment(1);
389         throw e;
390       }
391     }
392 
393     /**
394      * Try to open the "source" file.
395      * Throws an IOException if the communication with the inputFs fail or
396      * if the file is not found.
397      */
398     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
399         throws IOException {
400       try {
401         FileLink link = null;
402         switch (fileInfo.getType()) {
403           case HFILE:
404             Path inputPath = new Path(fileInfo.getHfile());
405             link = new HFileLink(inputRoot, inputArchive, inputPath);
406             break;
407           case WAL:
408             String serverName = fileInfo.getWalServer();
409             String logName = fileInfo.getWalName();
410             link = new HLogLink(inputRoot, serverName, logName);
411             break;
412           default:
413             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
414         }
415         return link.open(inputFs);
416       } catch (IOException e) {
417         context.getCounter(Counter.MISSING_FILES).increment(1);
418         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
419         throw e;
420       }
421     }
422 
423     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
424         throws IOException {
425       try {
426         FileLink link = null;
427         switch (fileInfo.getType()) {
428           case HFILE:
429             Path inputPath = new Path(fileInfo.getHfile());
430             link = new HFileLink(inputRoot, inputArchive, inputPath);
431             break;
432           case WAL:
433             link = new HLogLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
434             break;
435           default:
436             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
437         }
438         return link.getFileStatus(inputFs);
439       } catch (FileNotFoundException e) {
440         context.getCounter(Counter.MISSING_FILES).increment(1);
441         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
442         throw e;
443       } catch (IOException e) {
444         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
445         throw e;
446       }
447     }
448 
449     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
450       try {
451         return fs.getFileChecksum(path);
452       } catch (IOException e) {
453         LOG.warn("Unable to get checksum for file=" + path, e);
454         return null;
455       }
456     }
457 
458     /**
459      * Check if the two files are equal by looking at the file length,
460      * and at the checksum (if user has specified the verifyChecksum flag).
461      */
462     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
463       // Not matching length
464       if (inputStat.getLen() != outputStat.getLen()) return false;
465 
466       // Mark files as equals, since user asked for no checksum verification
467       if (!verifyChecksum) return true;
468 
469       // If checksums are not available, files are not the same.
470       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
471       if (inChecksum == null) return false;
472 
473       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
474       if (outChecksum == null) return false;
475 
476       return inChecksum.equals(outChecksum);
477     }
478   }
479 
480   // ==========================================================================
481   //  Input Format
482   // ==========================================================================
483 
484   /**
485    * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
486    * @return list of files referenced by the snapshot (pair of path and size)
487    */
488   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
489       final FileSystem fs, final Path snapshotDir) throws IOException {
490     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
491 
492     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
493     final TableName table = TableName.valueOf(snapshotDesc.getTable());
494 
495     // Get snapshot files
496     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
497     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
498       new SnapshotReferenceUtil.SnapshotVisitor() {
499         @Override
500         public void storeFile(final HRegionInfo regionInfo, final String family,
501             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
502           if (storeFile.hasReference()) {
503             // copied as part of the manifest
504           } else {
505             String region = regionInfo.getEncodedName();
506             String hfile = storeFile.getName();
507             Path path = HFileLink.createPath(table, region, family, hfile);
508 
509             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
510               .setType(SnapshotFileInfo.Type.HFILE)
511               .setHfile(path.toString())
512               .build();
513 
514             long size;
515             if (storeFile.hasFileSize()) {
516               size = storeFile.getFileSize();
517             } else {
518               size = new HFileLink(conf, path).getFileStatus(fs).getLen();
519             }
520             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
521           }
522         }
523 
524         @Override
525         public void logFile (final String server, final String logfile)
526             throws IOException {
527           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
528             .setType(SnapshotFileInfo.Type.WAL)
529             .setWalServer(server)
530             .setWalName(logfile)
531             .build();
532 
533           long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
534           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
535         }
536     });
537 
538     return files;
539   }
540 
541   /**
542    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
543    * The groups created will have similar amounts of bytes.
544    * <p>
545    * The algorithm used is pretty straightforward; the file list is sorted by size,
546    * and then each group fetch the bigger file available, iterating through groups
547    * alternating the direction.
548    */
549   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
550       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
551     // Sort files by size, from small to big
552     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
553       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
554         long r = a.getSecond() - b.getSecond();
555         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
556       }
557     });
558 
559     // create balanced groups
560     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
561       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
562     long[] sizeGroups = new long[ngroups];
563     int hi = files.size() - 1;
564     int lo = 0;
565 
566     List<Pair<SnapshotFileInfo, Long>> group;
567     int dir = 1;
568     int g = 0;
569 
570     while (hi >= lo) {
571       if (g == fileGroups.size()) {
572         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
573         fileGroups.add(group);
574       } else {
575         group = fileGroups.get(g);
576       }
577 
578       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
579 
580       // add the hi one
581       sizeGroups[g] += fileInfo.getSecond();
582       group.add(fileInfo);
583 
584       // change direction when at the end or the beginning
585       g += dir;
586       if (g == ngroups) {
587         dir = -1;
588         g = ngroups - 1;
589       } else if (g < 0) {
590         dir = 1;
591         g = 0;
592       }
593     }
594 
595     if (LOG.isDebugEnabled()) {
596       for (int i = 0; i < sizeGroups.length; ++i) {
597         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
598       }
599     }
600 
601     return fileGroups;
602   }
603 
604   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
605     @Override
606     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
607         TaskAttemptContext tac) throws IOException, InterruptedException {
608       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
609     }
610 
611     @Override
612     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
613       Configuration conf = context.getConfiguration();
614       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
615       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
616       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
617 
618       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
619       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
620       if (mappers == 0 && snapshotFiles.size() > 0) {
621         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
622         mappers = Math.min(mappers, snapshotFiles.size());
623         conf.setInt(CONF_NUM_SPLITS, mappers);
624         conf.setInt(MR_NUM_MAPS, mappers);
625       }
626 
627       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
628       List<InputSplit> splits = new ArrayList(groups.size());
629       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
630         splits.add(new ExportSnapshotInputSplit(files));
631       }
632       return splits;
633     }
634 
635     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
636       private List<Pair<BytesWritable, Long>> files;
637       private long length;
638 
639       public ExportSnapshotInputSplit() {
640         this.files = null;
641       }
642 
643       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
644         this.files = new ArrayList(snapshotFiles.size());
645         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
646           this.files.add(new Pair<BytesWritable, Long>(
647             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
648           this.length += fileInfo.getSecond();
649         }
650       }
651 
652       private List<Pair<BytesWritable, Long>> getSplitKeys() {
653         return files;
654       }
655 
656       @Override
657       public long getLength() throws IOException, InterruptedException {
658         return length;
659       }
660 
661       @Override
662       public String[] getLocations() throws IOException, InterruptedException {
663         return new String[] {};
664       }
665 
666       @Override
667       public void readFields(DataInput in) throws IOException {
668         int count = in.readInt();
669         files = new ArrayList<Pair<BytesWritable, Long>>(count);
670         length = 0;
671         for (int i = 0; i < count; ++i) {
672           BytesWritable fileInfo = new BytesWritable();
673           fileInfo.readFields(in);
674           long size = in.readLong();
675           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
676           length += size;
677         }
678       }
679 
680       @Override
681       public void write(DataOutput out) throws IOException {
682         out.writeInt(files.size());
683         for (final Pair<BytesWritable, Long> fileInfo: files) {
684           fileInfo.getFirst().write(out);
685           out.writeLong(fileInfo.getSecond());
686         }
687       }
688     }
689 
690     private static class ExportSnapshotRecordReader
691         extends RecordReader<BytesWritable, NullWritable> {
692       private final List<Pair<BytesWritable, Long>> files;
693       private long totalSize = 0;
694       private long procSize = 0;
695       private int index = -1;
696 
697       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
698         this.files = files;
699         for (Pair<BytesWritable, Long> fileInfo: files) {
700           totalSize += fileInfo.getSecond();
701         }
702       }
703 
704       @Override
705       public void close() { }
706 
707       @Override
708       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
709 
710       @Override
711       public NullWritable getCurrentValue() { return NullWritable.get(); }
712 
713       @Override
714       public float getProgress() { return (float)procSize / totalSize; }
715 
716       @Override
717       public void initialize(InputSplit split, TaskAttemptContext tac) { }
718 
719       @Override
720       public boolean nextKeyValue() {
721         if (index >= 0) {
722           procSize += files.get(index).getSecond();
723         }
724         return(++index < files.size());
725       }
726     }
727   }
728 
729   // ==========================================================================
730   //  Tool
731   // ==========================================================================
732 
733   /**
734    * Run Map-Reduce Job to perform the files copy.
735    */
736   private void runCopyJob(final Path inputRoot, final Path outputRoot,
737       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
738       final String filesUser, final String filesGroup, final int filesMode,
739       final int mappers, final int bandwidthMB)
740           throws IOException, InterruptedException, ClassNotFoundException {
741     Configuration conf = getConf();
742     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
743     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
744     if (mappers > 0) {
745       conf.setInt(CONF_NUM_SPLITS, mappers);
746       conf.setInt(MR_NUM_MAPS, mappers);
747     }
748     conf.setInt(CONF_FILES_MODE, filesMode);
749     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
750     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
751     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
752     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
753     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
754     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
755 
756     Job job = new Job(conf);
757     job.setJobName("ExportSnapshot-" + snapshotName);
758     job.setJarByClass(ExportSnapshot.class);
759     TableMapReduceUtil.addDependencyJars(job);
760     job.setMapperClass(ExportMapper.class);
761     job.setInputFormatClass(ExportSnapshotInputFormat.class);
762     job.setOutputFormatClass(NullOutputFormat.class);
763     job.setMapSpeculativeExecution(false);
764     job.setNumReduceTasks(0);
765 
766     // Acquire the delegation Tokens
767     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
768       new Path[] { inputRoot, outputRoot }, conf);
769 
770     // Run the MR Job
771     if (!job.waitForCompletion(true)) {
772       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
773       // when it will be available on all the supported versions.
774       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
775     }
776   }
777 
778   private void verifySnapshot(final Configuration baseConf,
779       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
780     // Update the conf with the current root dir, since may be a different cluster
781     Configuration conf = new Configuration(baseConf);
782     FSUtils.setRootDir(conf, rootDir);
783     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
784     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
785     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
786   }
787 
788   /**
789    * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
790    * @return 0 on success, and != 0 upon failure.
791    */
792   @Override
793   public int run(String[] args) throws IOException {
794     boolean verifyTarget = true;
795     boolean verifyChecksum = true;
796     String snapshotName = null;
797     String targetName = null;
798     boolean overwrite = false;
799     String filesGroup = null;
800     String filesUser = null;
801     Path outputRoot = null;
802     int bandwidthMB = Integer.MAX_VALUE;
803     int filesMode = 0;
804     int mappers = 0;
805 
806     Configuration conf = getConf();
807     Path inputRoot = FSUtils.getRootDir(conf);
808 
809     // Process command line args
810     for (int i = 0; i < args.length; i++) {
811       String cmd = args[i];
812       if (cmd.equals("-snapshot")) {
813         snapshotName = args[++i];
814       } else if (cmd.equals("-target")) {
815         targetName = args[++i];
816       } else if (cmd.equals("-copy-to")) {
817         outputRoot = new Path(args[++i]);
818       } else if (cmd.equals("-copy-from")) {
819         inputRoot = new Path(args[++i]);
820         FSUtils.setRootDir(conf, inputRoot);
821       } else if (cmd.equals("-no-checksum-verify")) {
822         verifyChecksum = false;
823       } else if (cmd.equals("-no-target-verify")) {
824         verifyTarget = false;
825       } else if (cmd.equals("-mappers")) {
826         mappers = Integer.parseInt(args[++i]);
827       } else if (cmd.equals("-chuser")) {
828         filesUser = args[++i];
829       } else if (cmd.equals("-chgroup")) {
830         filesGroup = args[++i];
831       } else if (cmd.equals("-bandwidth")) {
832         bandwidthMB = Integer.parseInt(args[++i]);
833       } else if (cmd.equals("-chmod")) {
834         filesMode = Integer.parseInt(args[++i], 8);
835       } else if (cmd.equals("-overwrite")) {
836         overwrite = true;
837       } else if (cmd.equals("-h") || cmd.equals("--help")) {
838         printUsageAndExit();
839       } else {
840         System.err.println("UNEXPECTED: " + cmd);
841         printUsageAndExit();
842       }
843     }
844 
845     // Check user options
846     if (snapshotName == null) {
847       System.err.println("Snapshot name not provided.");
848       printUsageAndExit();
849     }
850 
851     if (outputRoot == null) {
852       System.err.println("Destination file-system not provided.");
853       printUsageAndExit();
854     }
855 
856     if (targetName == null) {
857       targetName = snapshotName;
858     }
859 
860     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
861     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
862     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
863     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
864 
865     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
866 
867     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
868     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
869     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
870     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
871 
872     // Check if the snapshot already exists
873     if (outputFs.exists(outputSnapshotDir)) {
874       if (overwrite) {
875         if (!outputFs.delete(outputSnapshotDir, true)) {
876           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
877           return 1;
878         }
879       } else {
880         System.err.println("The snapshot '" + targetName +
881           "' already exists in the destination: " + outputSnapshotDir);
882         return 1;
883       }
884     }
885 
886     if (!skipTmp) {
887       // Check if the snapshot already in-progress
888       if (outputFs.exists(snapshotTmpDir)) {
889         if (overwrite) {
890           if (!outputFs.delete(snapshotTmpDir, true)) {
891             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
892             return 1;
893           }
894         } else {
895           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
896           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
897           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
898           return 1;
899         }
900       }
901     }
902 
903     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
904     // The snapshot references must be copied before the hfiles otherwise the cleaner
905     // will remove them because they are unreferenced.
906     try {
907       LOG.info("Copy Snapshot Manifest");
908       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
909     } catch (IOException e) {
910       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
911         snapshotDir + " to=" + initialOutputSnapshotDir, e);
912     }
913 
914     // Write a new .snapshotinfo if the target name is different from the source name
915     if (!targetName.equals(snapshotName)) {
916       SnapshotDescription snapshotDesc =
917         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
918           .toBuilder()
919           .setName(targetName)
920           .build();
921       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
922     }
923 
924     // Step 2 - Start MR Job to copy files
925     // The snapshot references must be copied before the files otherwise the files gets removed
926     // by the HFileArchiver, since they have no references.
927     try {
928       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
929                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
930 
931       LOG.info("Finalize the Snapshot Export");
932       if (!skipTmp) {
933         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
934         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
935           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
936             snapshotTmpDir + " to=" + outputSnapshotDir);
937         }
938       }
939 
940       // Step 4 - Verify snapshot integrity
941       if (verifyTarget) {
942         LOG.info("Verify snapshot integrity");
943         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
944       }
945 
946       LOG.info("Export Completed: " + targetName);
947       return 0;
948     } catch (Exception e) {
949       LOG.error("Snapshot export failed", e);
950       if (!skipTmp) {
951         outputFs.delete(snapshotTmpDir, true);
952       }
953       outputFs.delete(outputSnapshotDir, true);
954       return 1;
955     }
956   }
957 
958   // ExportSnapshot
959   private void printUsageAndExit() {
960     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
961     System.err.println(" where [options] are:");
962     System.err.println("  -h|-help                Show this help and exit.");
963     System.err.println("  -snapshot NAME          Snapshot to restore.");
964     System.err.println("  -copy-to NAME           Remote destination hdfs://");
965     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
966     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
967     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
968         "exported snapshot.");
969     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
970     System.err.println("  -chuser USERNAME        Change the owner of the files " +
971         "to the specified one.");
972     System.err.println("  -chgroup GROUP          Change the group of the files to " +
973         "the specified one.");
974     System.err.println("  -chmod MODE             Change the permission of the files " +
975         "to the specified one.");
976     System.err.println("  -mappers                Number of mappers to use during the " +
977         "copy (mapreduce.job.maps).");
978     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
979     System.err.println();
980     System.err.println("Examples:");
981     System.err.println("  hbase " + getClass().getName() + " \\");
982     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
983     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
984     System.err.println();
985     System.err.println("  hbase " + getClass().getName() + " \\");
986     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
987     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
988     System.exit(1);
989   }
990 
991   /**
992    * The guts of the {@link #main} method.
993    * Call this method to avoid the {@link #main(String[])} System.exit.
994    * @param args
995    * @return errCode
996    * @throws Exception
997    */
998   static int innerMain(final Configuration conf, final String [] args) throws Exception {
999     return ToolRunner.run(conf, new ExportSnapshot(), args);
1000   }
1001 
1002   public static void main(String[] args) throws Exception {
1003     System.exit(innerMain(HBaseConfiguration.create(), args));
1004   }
1005 }