View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.mob.MobUtils;
57  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
59  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
60  import org.apache.hadoop.hbase.util.FSUtils;
61  import org.apache.hadoop.hbase.util.HFileArchiveUtil;
62  import org.apache.hadoop.hbase.util.Pair;
63  import org.apache.hadoop.io.BytesWritable;
64  import org.apache.hadoop.io.IOUtils;
65  import org.apache.hadoop.io.NullWritable;
66  import org.apache.hadoop.io.Writable;
67  import org.apache.hadoop.mapreduce.Job;
68  import org.apache.hadoop.mapreduce.JobContext;
69  import org.apache.hadoop.mapreduce.Mapper;
70  import org.apache.hadoop.mapreduce.InputFormat;
71  import org.apache.hadoop.mapreduce.InputSplit;
72  import org.apache.hadoop.mapreduce.RecordReader;
73  import org.apache.hadoop.mapreduce.TaskAttemptContext;
74  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
75  import org.apache.hadoop.mapreduce.security.TokenCache;
76  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
77  import org.apache.hadoop.util.StringUtils;
78  import org.apache.hadoop.util.Tool;
79  import org.apache.hadoop.util.ToolRunner;
80  
81  /**
82   * Export the specified snapshot to a given FileSystem.
83   *
84   * The .snapshot/name folder is copied to the destination cluster
85   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
86   * When everything is done, the second cluster can restore the snapshot.
87   */
88  @InterfaceAudience.Public
89  @InterfaceStability.Evolving
90  public class ExportSnapshot extends Configured implements Tool {
91    public static final String NAME = "exportsnapshot";
92    /** Configuration prefix for overrides for the source filesystem */
93    public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
94    /** Configuration prefix for overrides for the destination filesystem */
95    public static final String CONF_DEST_PREFIX = NAME + ".to.";
96  
97    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
98  
99    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
100   private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
101   private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
102   private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
103   private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
104   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
105   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
106   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
107   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
108   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
109   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
110   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
111   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
112   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
113 
114   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
115   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
116 
117   // Export Map-Reduce Counters, to keep track of the progress
118   public enum Counter {
119     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121   }
122 
123   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124                                                    NullWritable, NullWritable> {
125     final static int REPORT_SIZE = 1 * 1024 * 1024;
126     final static int BUFFER_SIZE = 64 * 1024;
127 
128     private boolean testFailures;
129     private Random random;
130 
131     private boolean verifyChecksum;
132     private String filesGroup;
133     private String filesUser;
134     private short filesMode;
135     private int bufferSize;
136 
137     private FileSystem outputFs;
138     private Path outputArchive;
139     private Path outputRoot;
140 
141     private FileSystem inputFs;
142     private Path inputArchive;
143     private Path inputRoot;
144 
145     @Override
146     public void setup(Context context) throws IOException {
147       Configuration conf = context.getConfiguration();
148       Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
149       Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
150 
151       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
152 
153       filesGroup = conf.get(CONF_FILES_GROUP);
154       filesUser = conf.get(CONF_FILES_USER);
155       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
156       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
157       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
158 
159       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
160       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
161 
162       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
163 
164       try {
165         srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166         inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
169       }
170 
171       try {
172         destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
173         outputFs = FileSystem.get(outputRoot.toUri(), destConf);
174       } catch (IOException e) {
175         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
176       }
177 
178       // Use the default block size of the outputFs if bigger
179       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
180       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
181       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
182 
183       for (Counter c : Counter.values()) {
184         context.getCounter(c).increment(0);
185       }
186     }
187 
188     @Override
189     protected void cleanup(Context context) {
190       IOUtils.closeStream(inputFs);
191       IOUtils.closeStream(outputFs);
192     }
193 
194     @Override
195     public void map(BytesWritable key, NullWritable value, Context context)
196         throws InterruptedException, IOException {
197       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
198       Path outputPath = getOutputPath(inputInfo);
199 
200       copyFile(context, inputInfo, outputPath);
201     }
202 
203     /**
204      * Returns the location where the inputPath will be copied.
205      */
206     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
207       Path path = null;
208       switch (inputInfo.getType()) {
209         case HFILE:
210           Path inputPath = new Path(inputInfo.getHfile());
211           String family = inputPath.getParent().getName();
212           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
213           String region = HFileLink.getReferencedRegionName(inputPath.getName());
214           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
215           path = new Path(FSUtils.getTableDir(new Path("./"), table),
216               new Path(region, new Path(family, hfile)));
217           break;
218         case WAL:
219           LOG.warn("snapshot does not keeps WALs: " + inputInfo);
220           break;
221         default:
222           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
223       }
224       return new Path(outputArchive, path);
225     }
226 
227     /*
228      * Used by TestExportSnapshot to simulate a failure
229      */
230     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
231         throws IOException {
232       if (testFailures) {
233         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
234           if (random == null) {
235             random = new Random();
236           }
237 
238           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
239           // retry, but at least we reduce the number of test failures due to
240           // this test exception from the same map task.
241           if (random.nextFloat() < 0.03) {
242             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
243                                   + " time=" + System.currentTimeMillis());
244           }
245         } else {
246           context.getCounter(Counter.COPY_FAILED).increment(1);
247           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
248         }
249       }
250     }
251 
252     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
253         final Path outputPath) throws IOException {
254       injectTestFailure(context, inputInfo);
255 
256       // Get the file information
257       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
258 
259       // Verify if the output file exists and is the same that we want to copy
260       if (outputFs.exists(outputPath)) {
261         FileStatus outputStat = outputFs.getFileStatus(outputPath);
262         if (outputStat != null && sameFile(inputStat, outputStat)) {
263           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
264           context.getCounter(Counter.FILES_SKIPPED).increment(1);
265           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
266           return;
267         }
268       }
269 
270       InputStream in = openSourceFile(context, inputInfo);
271       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
272       if (Integer.MAX_VALUE != bandwidthMB) {
273         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L);
274       }
275 
276       try {
277         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
278 
279         // Ensure that the output folder is there and copy the file
280         createOutputPath(outputPath.getParent());
281         FSDataOutputStream out = outputFs.create(outputPath, true);
282         try {
283           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
284         } finally {
285           out.close();
286         }
287 
288         // Try to Preserve attributes
289         if (!preserveAttributes(outputPath, inputStat)) {
290           LOG.warn("You may have to run manually chown on: " + outputPath);
291         }
292       } finally {
293         in.close();
294       }
295     }
296 
297     /**
298      * Create the output folder and optionally set ownership.
299      */
300     private void createOutputPath(final Path path) throws IOException {
301       if (filesUser == null && filesGroup == null) {
302         outputFs.mkdirs(path);
303       } else {
304         Path parent = path.getParent();
305         if (!outputFs.exists(parent) && !parent.isRoot()) {
306           createOutputPath(parent);
307         }
308         outputFs.mkdirs(path);
309         if (filesUser != null || filesGroup != null) {
310           // override the owner when non-null user/group is specified
311           outputFs.setOwner(path, filesUser, filesGroup);
312         }
313         if (filesMode > 0) {
314           outputFs.setPermission(path, new FsPermission(filesMode));
315         }
316       }
317     }
318 
319     /**
320      * Try to Preserve the files attribute selected by the user copying them from the source file
321      * This is only required when you are exporting as a different user than "hbase" or on a system
322      * that doesn't have the "hbase" user.
323      *
324      * This is not considered a blocking failure since the user can force a chmod with the user
325      * that knows is available on the system.
326      */
327     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
328       FileStatus stat;
329       try {
330         stat = outputFs.getFileStatus(path);
331       } catch (IOException e) {
332         LOG.warn("Unable to get the status for file=" + path);
333         return false;
334       }
335 
336       try {
337         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
338           outputFs.setPermission(path, new FsPermission(filesMode));
339         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
340           outputFs.setPermission(path, refStat.getPermission());
341         }
342       } catch (IOException e) {
343         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
344         return false;
345       }
346 
347       boolean hasRefStat = (refStat != null);
348       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
349       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
350       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
351         try {
352           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
353             outputFs.setOwner(path, user, group);
354           }
355         } catch (IOException e) {
356           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
357           LOG.warn("The user/group may not exist on the destination cluster: user=" +
358                    user + " group=" + group);
359           return false;
360         }
361       }
362 
363       return true;
364     }
365 
366     private boolean stringIsNotEmpty(final String str) {
367       return str != null && str.length() > 0;
368     }
369 
370     private void copyData(final Context context,
371         final Path inputPath, final InputStream in,
372         final Path outputPath, final FSDataOutputStream out,
373         final long inputFileSize)
374         throws IOException {
375       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
376                                    " (%.1f%%)";
377 
378       try {
379         byte[] buffer = new byte[bufferSize];
380         long totalBytesWritten = 0;
381         int reportBytes = 0;
382         int bytesRead;
383 
384         long stime = System.currentTimeMillis();
385         while ((bytesRead = in.read(buffer)) > 0) {
386           out.write(buffer, 0, bytesRead);
387           totalBytesWritten += bytesRead;
388           reportBytes += bytesRead;
389 
390           if (reportBytes >= REPORT_SIZE) {
391             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
392             context.setStatus(String.format(statusMessage,
393                               StringUtils.humanReadableInt(totalBytesWritten),
394                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
395                               " from " + inputPath + " to " + outputPath);
396             reportBytes = 0;
397           }
398         }
399         long etime = System.currentTimeMillis();
400 
401         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
402         context.setStatus(String.format(statusMessage,
403                           StringUtils.humanReadableInt(totalBytesWritten),
404                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
405                           " from " + inputPath + " to " + outputPath);
406 
407         // Verify that the written size match
408         if (totalBytesWritten != inputFileSize) {
409           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
410                        " expected=" + inputFileSize + " for file=" + inputPath;
411           throw new IOException(msg);
412         }
413 
414         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
415         LOG.info("size=" + totalBytesWritten +
416             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
417             " time=" + StringUtils.formatTimeDiff(etime, stime) +
418             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
419         context.getCounter(Counter.FILES_COPIED).increment(1);
420       } catch (IOException e) {
421         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
422         context.getCounter(Counter.COPY_FAILED).increment(1);
423         throw e;
424       }
425     }
426 
427     /**
428      * Try to open the "source" file.
429      * Throws an IOException if the communication with the inputFs fail or
430      * if the file is not found.
431      */
432     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
433             throws IOException {
434       try {
435         Configuration conf = context.getConfiguration();
436         FileLink link = null;
437         switch (fileInfo.getType()) {
438           case HFILE:
439             Path inputPath = new Path(fileInfo.getHfile());
440             link = getFileLink(inputPath, conf);
441             break;
442           case WAL:
443             String serverName = fileInfo.getWalServer();
444             String logName = fileInfo.getWalName();
445             link = new WALLink(inputRoot, serverName, logName);
446             break;
447           default:
448             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
449         }
450         return link.open(inputFs);
451       } catch (IOException e) {
452         context.getCounter(Counter.MISSING_FILES).increment(1);
453         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
454         throw e;
455       }
456     }
457 
458     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
459         throws IOException {
460       try {
461         Configuration conf = context.getConfiguration();
462         FileLink link = null;
463         switch (fileInfo.getType()) {
464           case HFILE:
465             Path inputPath = new Path(fileInfo.getHfile());
466             link = getFileLink(inputPath, conf);
467             break;
468           case WAL:
469             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
470             break;
471           default:
472             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
473         }
474         return link.getFileStatus(inputFs);
475       } catch (FileNotFoundException e) {
476         context.getCounter(Counter.MISSING_FILES).increment(1);
477         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
478         throw e;
479       } catch (IOException e) {
480         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
481         throw e;
482       }
483     }
484 
485     private FileLink getFileLink(Path path, Configuration conf) throws IOException{
486       String regionName = HFileLink.getReferencedRegionName(path.getName());
487       TableName tableName = HFileLink.getReferencedTableName(path.getName());
488       if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
489         return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
490                 HFileArchiveUtil.getArchivePath(conf), path);
491       }
492       return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
493     }
494 
495     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
496       try {
497         return fs.getFileChecksum(path);
498       } catch (IOException e) {
499         LOG.warn("Unable to get checksum for file=" + path, e);
500         return null;
501       }
502     }
503 
504     /**
505      * Check if the two files are equal by looking at the file length,
506      * and at the checksum (if user has specified the verifyChecksum flag).
507      */
508     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
509       // Not matching length
510       if (inputStat.getLen() != outputStat.getLen()) return false;
511 
512       // Mark files as equals, since user asked for no checksum verification
513       if (!verifyChecksum) return true;
514 
515       // If checksums are not available, files are not the same.
516       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
517       if (inChecksum == null) return false;
518 
519       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
520       if (outChecksum == null) return false;
521 
522       return inChecksum.equals(outChecksum);
523     }
524   }
525 
526   // ==========================================================================
527   //  Input Format
528   // ==========================================================================
529 
530   /**
531    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
532    * @return list of files referenced by the snapshot (pair of path and size)
533    */
534   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
535       final FileSystem fs, final Path snapshotDir) throws IOException {
536     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
537 
538     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
539     final TableName table = TableName.valueOf(snapshotDesc.getTable());
540 
541     // Get snapshot files
542     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
543     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
544       new SnapshotReferenceUtil.SnapshotVisitor() {
545         @Override
546         public void storeFile(final HRegionInfo regionInfo, final String family,
547             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
548           // for storeFile.hasReference() case, copied as part of the manifest
549           if (!storeFile.hasReference()) {
550             String region = regionInfo.getEncodedName();
551             String hfile = storeFile.getName();
552             Path path = HFileLink.createPath(table, region, family, hfile);
553 
554             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
555               .setType(SnapshotFileInfo.Type.HFILE)
556               .setHfile(path.toString())
557               .build();
558 
559             long size;
560             if (storeFile.hasFileSize()) {
561               size = storeFile.getFileSize();
562             } else {
563               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
564             }
565             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
566           }
567         }
568     });
569 
570     return files;
571   }
572 
573   /**
574    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
575    * The groups created will have similar amounts of bytes.
576    * <p>
577    * The algorithm used is pretty straightforward; the file list is sorted by size,
578    * and then each group fetch the bigger file available, iterating through groups
579    * alternating the direction.
580    */
581   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
582       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
583     // Sort files by size, from small to big
584     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
585       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
586         long r = a.getSecond() - b.getSecond();
587         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
588       }
589     });
590 
591     // create balanced groups
592     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
593       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
594     long[] sizeGroups = new long[ngroups];
595     int hi = files.size() - 1;
596     int lo = 0;
597 
598     List<Pair<SnapshotFileInfo, Long>> group;
599     int dir = 1;
600     int g = 0;
601 
602     while (hi >= lo) {
603       if (g == fileGroups.size()) {
604         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
605         fileGroups.add(group);
606       } else {
607         group = fileGroups.get(g);
608       }
609 
610       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
611 
612       // add the hi one
613       sizeGroups[g] += fileInfo.getSecond();
614       group.add(fileInfo);
615 
616       // change direction when at the end or the beginning
617       g += dir;
618       if (g == ngroups) {
619         dir = -1;
620         g = ngroups - 1;
621       } else if (g < 0) {
622         dir = 1;
623         g = 0;
624       }
625     }
626 
627     if (LOG.isDebugEnabled()) {
628       for (int i = 0; i < sizeGroups.length; ++i) {
629         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
630       }
631     }
632 
633     return fileGroups;
634   }
635 
636   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
637     @Override
638     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
639         TaskAttemptContext tac) throws IOException, InterruptedException {
640       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
641     }
642 
643     @Override
644     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
645       Configuration conf = context.getConfiguration();
646       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
647       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
648 
649       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
650       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
651       if (mappers == 0 && snapshotFiles.size() > 0) {
652         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
653         mappers = Math.min(mappers, snapshotFiles.size());
654         conf.setInt(CONF_NUM_SPLITS, mappers);
655         conf.setInt(MR_NUM_MAPS, mappers);
656       }
657 
658       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
659       List<InputSplit> splits = new ArrayList(groups.size());
660       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
661         splits.add(new ExportSnapshotInputSplit(files));
662       }
663       return splits;
664     }
665 
666     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
667       private List<Pair<BytesWritable, Long>> files;
668       private long length;
669 
670       public ExportSnapshotInputSplit() {
671         this.files = null;
672       }
673 
674       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
675         this.files = new ArrayList(snapshotFiles.size());
676         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
677           this.files.add(new Pair<BytesWritable, Long>(
678             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
679           this.length += fileInfo.getSecond();
680         }
681       }
682 
683       private List<Pair<BytesWritable, Long>> getSplitKeys() {
684         return files;
685       }
686 
687       @Override
688       public long getLength() throws IOException, InterruptedException {
689         return length;
690       }
691 
692       @Override
693       public String[] getLocations() throws IOException, InterruptedException {
694         return new String[] {};
695       }
696 
697       @Override
698       public void readFields(DataInput in) throws IOException {
699         int count = in.readInt();
700         files = new ArrayList<Pair<BytesWritable, Long>>(count);
701         length = 0;
702         for (int i = 0; i < count; ++i) {
703           BytesWritable fileInfo = new BytesWritable();
704           fileInfo.readFields(in);
705           long size = in.readLong();
706           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
707           length += size;
708         }
709       }
710 
711       @Override
712       public void write(DataOutput out) throws IOException {
713         out.writeInt(files.size());
714         for (final Pair<BytesWritable, Long> fileInfo: files) {
715           fileInfo.getFirst().write(out);
716           out.writeLong(fileInfo.getSecond());
717         }
718       }
719     }
720 
721     private static class ExportSnapshotRecordReader
722         extends RecordReader<BytesWritable, NullWritable> {
723       private final List<Pair<BytesWritable, Long>> files;
724       private long totalSize = 0;
725       private long procSize = 0;
726       private int index = -1;
727 
728       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
729         this.files = files;
730         for (Pair<BytesWritable, Long> fileInfo: files) {
731           totalSize += fileInfo.getSecond();
732         }
733       }
734 
735       @Override
736       public void close() { }
737 
738       @Override
739       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
740 
741       @Override
742       public NullWritable getCurrentValue() { return NullWritable.get(); }
743 
744       @Override
745       public float getProgress() { return (float)procSize / totalSize; }
746 
747       @Override
748       public void initialize(InputSplit split, TaskAttemptContext tac) { }
749 
750       @Override
751       public boolean nextKeyValue() {
752         if (index >= 0) {
753           procSize += files.get(index).getSecond();
754         }
755         return(++index < files.size());
756       }
757     }
758   }
759 
760   // ==========================================================================
761   //  Tool
762   // ==========================================================================
763 
764   /**
765    * Run Map-Reduce Job to perform the files copy.
766    */
767   private void runCopyJob(final Path inputRoot, final Path outputRoot,
768       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
769       final String filesUser, final String filesGroup, final int filesMode,
770       final int mappers, final int bandwidthMB)
771           throws IOException, InterruptedException, ClassNotFoundException {
772     Configuration conf = getConf();
773     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
774     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
775     if (mappers > 0) {
776       conf.setInt(CONF_NUM_SPLITS, mappers);
777       conf.setInt(MR_NUM_MAPS, mappers);
778     }
779     conf.setInt(CONF_FILES_MODE, filesMode);
780     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
781     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
782     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
783     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
784     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
785     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
786 
787     Job job = new Job(conf);
788     job.setJobName("ExportSnapshot-" + snapshotName);
789     job.setJarByClass(ExportSnapshot.class);
790     TableMapReduceUtil.addDependencyJars(job);
791     job.setMapperClass(ExportMapper.class);
792     job.setInputFormatClass(ExportSnapshotInputFormat.class);
793     job.setOutputFormatClass(NullOutputFormat.class);
794     job.setMapSpeculativeExecution(false);
795     job.setNumReduceTasks(0);
796 
797     // Acquire the delegation Tokens
798     Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
799     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
800       new Path[] { inputRoot }, srcConf);
801     Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
802     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
803         new Path[] { outputRoot }, destConf);
804 
805     // Run the MR Job
806     if (!job.waitForCompletion(true)) {
807       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
808       // when it will be available on all the supported versions.
809       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
810     }
811   }
812 
813   private void verifySnapshot(final Configuration baseConf,
814       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
815     // Update the conf with the current root dir, since may be a different cluster
816     Configuration conf = new Configuration(baseConf);
817     FSUtils.setRootDir(conf, rootDir);
818     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
819     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
820     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
821   }
822 
823   /**
824    * Set path ownership.
825    */
826   private void setOwner(final FileSystem fs, final Path path, final String user,
827       final String group, final boolean recursive) throws IOException {
828     if (user != null || group != null) {
829       if (recursive && fs.isDirectory(path)) {
830         for (FileStatus child : fs.listStatus(path)) {
831           setOwner(fs, child.getPath(), user, group, recursive);
832         }
833       }
834       fs.setOwner(path, user, group);
835     }
836   }
837 
838   /**
839    * Set path permission.
840    */
841   private void setPermission(final FileSystem fs, final Path path, final short filesMode,
842       final boolean recursive) throws IOException {
843     if (filesMode > 0) {
844       FsPermission perm = new FsPermission(filesMode);
845       if (recursive && fs.isDirectory(path)) {
846         for (FileStatus child : fs.listStatus(path)) {
847           setPermission(fs, child.getPath(), filesMode, recursive);
848         }
849       }
850       fs.setPermission(path, perm);
851     }
852   }
853 
854   /**
855    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
856    * @return 0 on success, and != 0 upon failure.
857    */
858   @Override
859   public int run(String[] args) throws IOException {
860     boolean verifyTarget = true;
861     boolean verifyChecksum = true;
862     String snapshotName = null;
863     String targetName = null;
864     boolean overwrite = false;
865     String filesGroup = null;
866     String filesUser = null;
867     Path outputRoot = null;
868     int bandwidthMB = Integer.MAX_VALUE;
869     int filesMode = 0;
870     int mappers = 0;
871 
872     Configuration conf = getConf();
873     Path inputRoot = FSUtils.getRootDir(conf);
874 
875     // Process command line args
876     for (int i = 0; i < args.length; i++) {
877       String cmd = args[i];
878       if (cmd.equals("-snapshot")) {
879         snapshotName = args[++i];
880       } else if (cmd.equals("-target")) {
881         targetName = args[++i];
882       } else if (cmd.equals("-copy-to")) {
883         outputRoot = new Path(args[++i]);
884       } else if (cmd.equals("-copy-from")) {
885         inputRoot = new Path(args[++i]);
886         FSUtils.setRootDir(conf, inputRoot);
887       } else if (cmd.equals("-no-checksum-verify")) {
888         verifyChecksum = false;
889       } else if (cmd.equals("-no-target-verify")) {
890         verifyTarget = false;
891       } else if (cmd.equals("-mappers")) {
892         mappers = Integer.parseInt(args[++i]);
893       } else if (cmd.equals("-chuser")) {
894         filesUser = args[++i];
895       } else if (cmd.equals("-chgroup")) {
896         filesGroup = args[++i];
897       } else if (cmd.equals("-bandwidth")) {
898         bandwidthMB = Integer.parseInt(args[++i]);
899       } else if (cmd.equals("-chmod")) {
900         filesMode = Integer.parseInt(args[++i], 8);
901       } else if (cmd.equals("-overwrite")) {
902         overwrite = true;
903       } else if (cmd.equals("-h") || cmd.equals("--help")) {
904         printUsageAndExit();
905       } else {
906         System.err.println("UNEXPECTED: " + cmd);
907         printUsageAndExit();
908       }
909     }
910 
911     // Check user options
912     if (snapshotName == null) {
913       System.err.println("Snapshot name not provided.");
914       printUsageAndExit();
915     }
916 
917     if (outputRoot == null) {
918       System.err.println("Destination file-system not provided.");
919       printUsageAndExit();
920     }
921 
922     if (targetName == null) {
923       targetName = snapshotName;
924     }
925 
926     Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
927     srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
928     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
929     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
930     Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
931     destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
932     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
933     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
934 
935     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
936 
937     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
938     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
939     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
940     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
941 
942     // Check if the snapshot already exists
943     if (outputFs.exists(outputSnapshotDir)) {
944       if (overwrite) {
945         if (!outputFs.delete(outputSnapshotDir, true)) {
946           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
947           return 1;
948         }
949       } else {
950         System.err.println("The snapshot '" + targetName +
951           "' already exists in the destination: " + outputSnapshotDir);
952         return 1;
953       }
954     }
955 
956     if (!skipTmp) {
957       // Check if the snapshot already in-progress
958       if (outputFs.exists(snapshotTmpDir)) {
959         if (overwrite) {
960           if (!outputFs.delete(snapshotTmpDir, true)) {
961             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
962             return 1;
963           }
964         } else {
965           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
966           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
967           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
968           return 1;
969         }
970       }
971     }
972 
973     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
974     // The snapshot references must be copied before the hfiles otherwise the cleaner
975     // will remove them because they are unreferenced.
976     try {
977       LOG.info("Copy Snapshot Manifest");
978       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
979       if (filesUser != null || filesGroup != null) {
980         setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
981       }
982       if (filesMode > 0) {
983         setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
984       }
985     } catch (IOException e) {
986       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
987         snapshotDir + " to=" + initialOutputSnapshotDir, e);
988     }
989 
990     // Write a new .snapshotinfo if the target name is different from the source name
991     if (!targetName.equals(snapshotName)) {
992       SnapshotDescription snapshotDesc =
993         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
994           .toBuilder()
995           .setName(targetName)
996           .build();
997       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
998     }
999 
1000     // Step 2 - Start MR Job to copy files
1001     // The snapshot references must be copied before the files otherwise the files gets removed
1002     // by the HFileArchiver, since they have no references.
1003     try {
1004       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1005                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1006 
1007       LOG.info("Finalize the Snapshot Export");
1008       if (!skipTmp) {
1009         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1010         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1011           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1012             snapshotTmpDir + " to=" + outputSnapshotDir);
1013         }
1014       }
1015 
1016       // Step 4 - Verify snapshot integrity
1017       if (verifyTarget) {
1018         LOG.info("Verify snapshot integrity");
1019         verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1020       }
1021 
1022       LOG.info("Export Completed: " + targetName);
1023       return 0;
1024     } catch (Exception e) {
1025       LOG.error("Snapshot export failed", e);
1026       if (!skipTmp) {
1027         outputFs.delete(snapshotTmpDir, true);
1028       }
1029       outputFs.delete(outputSnapshotDir, true);
1030       return 1;
1031     } finally {
1032       IOUtils.closeStream(inputFs);
1033       IOUtils.closeStream(outputFs);
1034     }
1035   }
1036 
1037   // ExportSnapshot
1038   private void printUsageAndExit() {
1039     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1040     System.err.println(" where [options] are:");
1041     System.err.println("  -h|-help                Show this help and exit.");
1042     System.err.println("  -snapshot NAME          Snapshot to restore.");
1043     System.err.println("  -copy-to NAME           Remote destination hdfs://");
1044     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
1045     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
1046     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
1047         "exported snapshot.");
1048     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
1049     System.err.println("  -chuser USERNAME        Change the owner of the files " +
1050         "to the specified one.");
1051     System.err.println("  -chgroup GROUP          Change the group of the files to " +
1052         "the specified one.");
1053     System.err.println("  -chmod MODE             Change the permission of the files " +
1054         "to the specified one.");
1055     System.err.println("  -mappers                Number of mappers to use during the " +
1056         "copy (mapreduce.job.maps).");
1057     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
1058     System.err.println();
1059     System.err.println("Examples:");
1060     System.err.println("  hbase snapshot export \\");
1061     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1062     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1063     System.err.println();
1064     System.err.println("  hbase snapshot export \\");
1065     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1066     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1067     System.exit(1);
1068   }
1069 
1070   /**
1071    * The guts of the {@link #main} method.
1072    * Call this method to avoid the {@link #main(String[])} System.exit.
1073    * @param args
1074    * @return errCode
1075    * @throws Exception
1076    */
1077   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1078     return ToolRunner.run(conf, new ExportSnapshot(), args);
1079   }
1080 
1081   public static void main(String[] args) throws Exception {
1082     System.exit(innerMain(HBaseConfiguration.create(), args));
1083   }
1084 }