View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.mob.MobUtils;
57  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
59  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
60  import org.apache.hadoop.hbase.util.FSUtils;
61  import org.apache.hadoop.hbase.util.HFileArchiveUtil;
62  import org.apache.hadoop.hbase.util.Pair;
63  import org.apache.hadoop.io.BytesWritable;
64  import org.apache.hadoop.io.IOUtils;
65  import org.apache.hadoop.io.NullWritable;
66  import org.apache.hadoop.io.Writable;
67  import org.apache.hadoop.mapreduce.Job;
68  import org.apache.hadoop.mapreduce.JobContext;
69  import org.apache.hadoop.mapreduce.Mapper;
70  import org.apache.hadoop.mapreduce.InputFormat;
71  import org.apache.hadoop.mapreduce.InputSplit;
72  import org.apache.hadoop.mapreduce.RecordReader;
73  import org.apache.hadoop.mapreduce.TaskAttemptContext;
74  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
75  import org.apache.hadoop.mapreduce.security.TokenCache;
76  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
77  import org.apache.hadoop.util.StringUtils;
78  import org.apache.hadoop.util.Tool;
79  import org.apache.hadoop.util.ToolRunner;
80  
81  /**
82   * Export the specified snapshot to a given FileSystem.
83   *
84   * The .snapshot/name folder is copied to the destination cluster
85   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
86   * When everything is done, the second cluster can restore the snapshot.
87   */
88  @InterfaceAudience.Public
89  @InterfaceStability.Evolving
90  public class ExportSnapshot extends Configured implements Tool {
91    public static final String NAME = "exportsnapshot";
92  
93    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
94  
95    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
96    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
97    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
98    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
99    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
100   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
101   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
102   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
103   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
104   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
105   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
106   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
107   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
108   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
109 
110   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
111   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
112 
113   // Export Map-Reduce Counters, to keep track of the progress
114   public enum Counter {
115     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
116     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
117   }
118 
119   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
120                                                    NullWritable, NullWritable> {
121     final static int REPORT_SIZE = 1 * 1024 * 1024;
122     final static int BUFFER_SIZE = 64 * 1024;
123 
124     private boolean testFailures;
125     private Random random;
126 
127     private boolean verifyChecksum;
128     private String filesGroup;
129     private String filesUser;
130     private short filesMode;
131     private int bufferSize;
132 
133     private FileSystem outputFs;
134     private Path outputArchive;
135     private Path outputRoot;
136 
137     private FileSystem inputFs;
138     private Path inputArchive;
139     private Path inputRoot;
140 
141     @Override
142     public void setup(Context context) throws IOException {
143       Configuration conf = context.getConfiguration();
144       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
145 
146       filesGroup = conf.get(CONF_FILES_GROUP);
147       filesUser = conf.get(CONF_FILES_USER);
148       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
149       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
150       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
151 
152       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
153       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
154 
155       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
156 
157       try {
158         conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
159         inputFs = FileSystem.get(inputRoot.toUri(), conf);
160       } catch (IOException e) {
161         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
162       }
163 
164       try {
165         conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166         outputFs = FileSystem.get(outputRoot.toUri(), conf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
169       }
170 
171       // Use the default block size of the outputFs if bigger
172       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
173       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
174       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
175 
176       for (Counter c : Counter.values()) {
177         context.getCounter(c).increment(0);
178       }
179     }
180 
181     @Override
182     protected void cleanup(Context context) {
183       IOUtils.closeStream(inputFs);
184       IOUtils.closeStream(outputFs);
185     }
186 
187     @Override
188     public void map(BytesWritable key, NullWritable value, Context context)
189         throws InterruptedException, IOException {
190       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
191       Path outputPath = getOutputPath(inputInfo);
192 
193       copyFile(context, inputInfo, outputPath);
194     }
195 
196     /**
197      * Returns the location where the inputPath will be copied.
198      */
199     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
200       Path path = null;
201       switch (inputInfo.getType()) {
202         case HFILE:
203           Path inputPath = new Path(inputInfo.getHfile());
204           String family = inputPath.getParent().getName();
205           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
206           String region = HFileLink.getReferencedRegionName(inputPath.getName());
207           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
208           path = new Path(FSUtils.getTableDir(new Path("./"), table),
209               new Path(region, new Path(family, hfile)));
210           break;
211         case WAL:
212           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
213           path = new Path(oldLogsDir, inputInfo.getWalName());
214           break;
215         default:
216           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
217       }
218       return new Path(outputArchive, path);
219     }
220 
221     /*
222      * Used by TestExportSnapshot to simulate a failure
223      */
224     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
225         throws IOException {
226       if (testFailures) {
227         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
228           if (random == null) {
229             random = new Random();
230           }
231 
232           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
233           // retry, but at least we reduce the number of test failures due to
234           // this test exception from the same map task.
235           if (random.nextFloat() < 0.03) {
236             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
237                                   + " time=" + System.currentTimeMillis());
238           }
239         } else {
240           context.getCounter(Counter.COPY_FAILED).increment(1);
241           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
242         }
243       }
244     }
245 
246     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
247         final Path outputPath) throws IOException {
248       injectTestFailure(context, inputInfo);
249 
250       // Get the file information
251       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
252 
253       // Verify if the output file exists and is the same that we want to copy
254       if (outputFs.exists(outputPath)) {
255         FileStatus outputStat = outputFs.getFileStatus(outputPath);
256         if (outputStat != null && sameFile(inputStat, outputStat)) {
257           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
258           context.getCounter(Counter.FILES_SKIPPED).increment(1);
259           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
260           return;
261         }
262       }
263 
264       InputStream in = openSourceFile(context, inputInfo);
265       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
266       if (Integer.MAX_VALUE != bandwidthMB) {
267         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
268       }
269 
270       try {
271         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
272 
273         // Ensure that the output folder is there and copy the file
274         outputFs.mkdirs(outputPath.getParent());
275         FSDataOutputStream out = outputFs.create(outputPath, true);
276         try {
277           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
278         } finally {
279           out.close();
280         }
281 
282         // Try to Preserve attributes
283         if (!preserveAttributes(outputPath, inputStat)) {
284           LOG.warn("You may have to run manually chown on: " + outputPath);
285         }
286       } finally {
287         in.close();
288       }
289     }
290 
291     /**
292      * Try to Preserve the files attribute selected by the user copying them from the source file
293      * This is only required when you are exporting as a different user than "hbase" or on a system
294      * that doesn't have the "hbase" user.
295      *
296      * This is not considered a blocking failure since the user can force a chmod with the user
297      * that knows is available on the system.
298      */
299     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
300       FileStatus stat;
301       try {
302         stat = outputFs.getFileStatus(path);
303       } catch (IOException e) {
304         LOG.warn("Unable to get the status for file=" + path);
305         return false;
306       }
307 
308       try {
309         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
310           outputFs.setPermission(path, new FsPermission(filesMode));
311         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
312           outputFs.setPermission(path, refStat.getPermission());
313         }
314       } catch (IOException e) {
315         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
316         return false;
317       }
318 
319       boolean hasRefStat = (refStat != null);
320       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
321       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
322       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
323         try {
324           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
325             outputFs.setOwner(path, user, group);
326           }
327         } catch (IOException e) {
328           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
329           LOG.warn("The user/group may not exist on the destination cluster: user=" +
330                    user + " group=" + group);
331           return false;
332         }
333       }
334 
335       return true;
336     }
337 
338     private boolean stringIsNotEmpty(final String str) {
339       return str != null && str.length() > 0;
340     }
341 
342     private void copyData(final Context context,
343         final Path inputPath, final InputStream in,
344         final Path outputPath, final FSDataOutputStream out,
345         final long inputFileSize)
346         throws IOException {
347       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
348                                    " (%.1f%%)";
349 
350       try {
351         byte[] buffer = new byte[bufferSize];
352         long totalBytesWritten = 0;
353         int reportBytes = 0;
354         int bytesRead;
355 
356         long stime = System.currentTimeMillis();
357         while ((bytesRead = in.read(buffer)) > 0) {
358           out.write(buffer, 0, bytesRead);
359           totalBytesWritten += bytesRead;
360           reportBytes += bytesRead;
361 
362           if (reportBytes >= REPORT_SIZE) {
363             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
364             context.setStatus(String.format(statusMessage,
365                               StringUtils.humanReadableInt(totalBytesWritten),
366                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
367                               " from " + inputPath + " to " + outputPath);
368             reportBytes = 0;
369           }
370         }
371         long etime = System.currentTimeMillis();
372 
373         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
374         context.setStatus(String.format(statusMessage,
375                           StringUtils.humanReadableInt(totalBytesWritten),
376                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
377                           " from " + inputPath + " to " + outputPath);
378 
379         // Verify that the written size match
380         if (totalBytesWritten != inputFileSize) {
381           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
382                        " expected=" + inputFileSize + " for file=" + inputPath;
383           throw new IOException(msg);
384         }
385 
386         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
387         LOG.info("size=" + totalBytesWritten +
388             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
389             " time=" + StringUtils.formatTimeDiff(etime, stime) +
390             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
391         context.getCounter(Counter.FILES_COPIED).increment(1);
392       } catch (IOException e) {
393         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
394         context.getCounter(Counter.COPY_FAILED).increment(1);
395         throw e;
396       }
397     }
398 
399     /**
400      * Try to open the "source" file.
401      * Throws an IOException if the communication with the inputFs fail or
402      * if the file is not found.
403      */
404     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
405             throws IOException {
406       try {
407         Configuration conf = context.getConfiguration();
408         FileLink link = null;
409         switch (fileInfo.getType()) {
410           case HFILE:
411             Path inputPath = new Path(fileInfo.getHfile());
412             link = getFileLink(inputPath, conf);
413             break;
414           case WAL:
415             String serverName = fileInfo.getWalServer();
416             String logName = fileInfo.getWalName();
417             link = new WALLink(inputRoot, serverName, logName);
418             break;
419           default:
420             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
421         }
422         return link.open(inputFs);
423       } catch (IOException e) {
424         context.getCounter(Counter.MISSING_FILES).increment(1);
425         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
426         throw e;
427       }
428     }
429 
430     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
431         throws IOException {
432       try {
433         Configuration conf = context.getConfiguration();
434         FileLink link = null;
435         switch (fileInfo.getType()) {
436           case HFILE:
437             Path inputPath = new Path(fileInfo.getHfile());
438             link = getFileLink(inputPath, conf);
439             break;
440           case WAL:
441             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
442             break;
443           default:
444             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
445         }
446         return link.getFileStatus(inputFs);
447       } catch (FileNotFoundException e) {
448         context.getCounter(Counter.MISSING_FILES).increment(1);
449         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
450         throw e;
451       } catch (IOException e) {
452         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
453         throw e;
454       }
455     }
456 
457     private FileLink getFileLink(Path path, Configuration conf) throws IOException{
458       String regionName = HFileLink.getReferencedRegionName(path.getName());
459       TableName tableName = HFileLink.getReferencedTableName(path.getName());
460       if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) {
461         return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf),
462                 HFileArchiveUtil.getArchivePath(conf), path);
463       }
464       return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path);
465     }
466 
467     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
468       try {
469         return fs.getFileChecksum(path);
470       } catch (IOException e) {
471         LOG.warn("Unable to get checksum for file=" + path, e);
472         return null;
473       }
474     }
475 
476     /**
477      * Check if the two files are equal by looking at the file length,
478      * and at the checksum (if user has specified the verifyChecksum flag).
479      */
480     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
481       // Not matching length
482       if (inputStat.getLen() != outputStat.getLen()) return false;
483 
484       // Mark files as equals, since user asked for no checksum verification
485       if (!verifyChecksum) return true;
486 
487       // If checksums are not available, files are not the same.
488       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
489       if (inChecksum == null) return false;
490 
491       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
492       if (outChecksum == null) return false;
493 
494       return inChecksum.equals(outChecksum);
495     }
496   }
497 
498   // ==========================================================================
499   //  Input Format
500   // ==========================================================================
501 
502   /**
503    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
504    * @return list of files referenced by the snapshot (pair of path and size)
505    */
506   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
507       final FileSystem fs, final Path snapshotDir) throws IOException {
508     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
509 
510     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
511     final TableName table = TableName.valueOf(snapshotDesc.getTable());
512 
513     // Get snapshot files
514     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
515     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
516       new SnapshotReferenceUtil.SnapshotVisitor() {
517         @Override
518         public void storeFile(final HRegionInfo regionInfo, final String family,
519             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
520           // for storeFile.hasReference() case, copied as part of the manifest
521           if (!storeFile.hasReference()) {
522             String region = regionInfo.getEncodedName();
523             String hfile = storeFile.getName();
524             Path path = HFileLink.createPath(table, region, family, hfile);
525 
526             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
527               .setType(SnapshotFileInfo.Type.HFILE)
528               .setHfile(path.toString())
529               .build();
530 
531             long size;
532             if (storeFile.hasFileSize()) {
533               size = storeFile.getFileSize();
534             } else {
535               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
536             }
537             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
538           }
539         }
540 
541         @Override
542         public void logFile (final String server, final String logfile)
543             throws IOException {
544           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
545             .setType(SnapshotFileInfo.Type.WAL)
546             .setWalServer(server)
547             .setWalName(logfile)
548             .build();
549 
550           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
551           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
552         }
553     });
554 
555     return files;
556   }
557 
558   /**
559    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
560    * The groups created will have similar amounts of bytes.
561    * <p>
562    * The algorithm used is pretty straightforward; the file list is sorted by size,
563    * and then each group fetch the bigger file available, iterating through groups
564    * alternating the direction.
565    */
566   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
567       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
568     // Sort files by size, from small to big
569     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
570       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
571         long r = a.getSecond() - b.getSecond();
572         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
573       }
574     });
575 
576     // create balanced groups
577     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
578       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
579     long[] sizeGroups = new long[ngroups];
580     int hi = files.size() - 1;
581     int lo = 0;
582 
583     List<Pair<SnapshotFileInfo, Long>> group;
584     int dir = 1;
585     int g = 0;
586 
587     while (hi >= lo) {
588       if (g == fileGroups.size()) {
589         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
590         fileGroups.add(group);
591       } else {
592         group = fileGroups.get(g);
593       }
594 
595       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
596 
597       // add the hi one
598       sizeGroups[g] += fileInfo.getSecond();
599       group.add(fileInfo);
600 
601       // change direction when at the end or the beginning
602       g += dir;
603       if (g == ngroups) {
604         dir = -1;
605         g = ngroups - 1;
606       } else if (g < 0) {
607         dir = 1;
608         g = 0;
609       }
610     }
611 
612     if (LOG.isDebugEnabled()) {
613       for (int i = 0; i < sizeGroups.length; ++i) {
614         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
615       }
616     }
617 
618     return fileGroups;
619   }
620 
621   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
622     @Override
623     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
624         TaskAttemptContext tac) throws IOException, InterruptedException {
625       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
626     }
627 
628     @Override
629     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
630       Configuration conf = context.getConfiguration();
631       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
632       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
633       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
634 
635       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
636       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
637       if (mappers == 0 && snapshotFiles.size() > 0) {
638         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
639         mappers = Math.min(mappers, snapshotFiles.size());
640         conf.setInt(CONF_NUM_SPLITS, mappers);
641         conf.setInt(MR_NUM_MAPS, mappers);
642       }
643 
644       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
645       List<InputSplit> splits = new ArrayList(groups.size());
646       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
647         splits.add(new ExportSnapshotInputSplit(files));
648       }
649       return splits;
650     }
651 
652     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
653       private List<Pair<BytesWritable, Long>> files;
654       private long length;
655 
656       public ExportSnapshotInputSplit() {
657         this.files = null;
658       }
659 
660       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
661         this.files = new ArrayList(snapshotFiles.size());
662         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
663           this.files.add(new Pair<BytesWritable, Long>(
664             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
665           this.length += fileInfo.getSecond();
666         }
667       }
668 
669       private List<Pair<BytesWritable, Long>> getSplitKeys() {
670         return files;
671       }
672 
673       @Override
674       public long getLength() throws IOException, InterruptedException {
675         return length;
676       }
677 
678       @Override
679       public String[] getLocations() throws IOException, InterruptedException {
680         return new String[] {};
681       }
682 
683       @Override
684       public void readFields(DataInput in) throws IOException {
685         int count = in.readInt();
686         files = new ArrayList<Pair<BytesWritable, Long>>(count);
687         length = 0;
688         for (int i = 0; i < count; ++i) {
689           BytesWritable fileInfo = new BytesWritable();
690           fileInfo.readFields(in);
691           long size = in.readLong();
692           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
693           length += size;
694         }
695       }
696 
697       @Override
698       public void write(DataOutput out) throws IOException {
699         out.writeInt(files.size());
700         for (final Pair<BytesWritable, Long> fileInfo: files) {
701           fileInfo.getFirst().write(out);
702           out.writeLong(fileInfo.getSecond());
703         }
704       }
705     }
706 
707     private static class ExportSnapshotRecordReader
708         extends RecordReader<BytesWritable, NullWritable> {
709       private final List<Pair<BytesWritable, Long>> files;
710       private long totalSize = 0;
711       private long procSize = 0;
712       private int index = -1;
713 
714       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
715         this.files = files;
716         for (Pair<BytesWritable, Long> fileInfo: files) {
717           totalSize += fileInfo.getSecond();
718         }
719       }
720 
721       @Override
722       public void close() { }
723 
724       @Override
725       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
726 
727       @Override
728       public NullWritable getCurrentValue() { return NullWritable.get(); }
729 
730       @Override
731       public float getProgress() { return (float)procSize / totalSize; }
732 
733       @Override
734       public void initialize(InputSplit split, TaskAttemptContext tac) { }
735 
736       @Override
737       public boolean nextKeyValue() {
738         if (index >= 0) {
739           procSize += files.get(index).getSecond();
740         }
741         return(++index < files.size());
742       }
743     }
744   }
745 
746   // ==========================================================================
747   //  Tool
748   // ==========================================================================
749 
750   /**
751    * Run Map-Reduce Job to perform the files copy.
752    */
753   private void runCopyJob(final Path inputRoot, final Path outputRoot,
754       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
755       final String filesUser, final String filesGroup, final int filesMode,
756       final int mappers, final int bandwidthMB)
757           throws IOException, InterruptedException, ClassNotFoundException {
758     Configuration conf = getConf();
759     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
760     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
761     if (mappers > 0) {
762       conf.setInt(CONF_NUM_SPLITS, mappers);
763       conf.setInt(MR_NUM_MAPS, mappers);
764     }
765     conf.setInt(CONF_FILES_MODE, filesMode);
766     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
767     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
768     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
769     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
770     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
771     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
772 
773     Job job = new Job(conf);
774     job.setJobName("ExportSnapshot-" + snapshotName);
775     job.setJarByClass(ExportSnapshot.class);
776     TableMapReduceUtil.addDependencyJars(job);
777     job.setMapperClass(ExportMapper.class);
778     job.setInputFormatClass(ExportSnapshotInputFormat.class);
779     job.setOutputFormatClass(NullOutputFormat.class);
780     job.setMapSpeculativeExecution(false);
781     job.setNumReduceTasks(0);
782 
783     // Acquire the delegation Tokens
784     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
785       new Path[] { inputRoot, outputRoot }, conf);
786 
787     // Run the MR Job
788     if (!job.waitForCompletion(true)) {
789       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
790       // when it will be available on all the supported versions.
791       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
792     }
793   }
794 
795   private void verifySnapshot(final Configuration baseConf,
796       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
797     // Update the conf with the current root dir, since may be a different cluster
798     Configuration conf = new Configuration(baseConf);
799     FSUtils.setRootDir(conf, rootDir);
800     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
801     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
802     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
803   }
804 
805   /**
806    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
807    * @return 0 on success, and != 0 upon failure.
808    */
809   @Override
810   public int run(String[] args) throws IOException {
811     boolean verifyTarget = true;
812     boolean verifyChecksum = true;
813     String snapshotName = null;
814     String targetName = null;
815     boolean overwrite = false;
816     String filesGroup = null;
817     String filesUser = null;
818     Path outputRoot = null;
819     int bandwidthMB = Integer.MAX_VALUE;
820     int filesMode = 0;
821     int mappers = 0;
822 
823     Configuration conf = getConf();
824     Path inputRoot = FSUtils.getRootDir(conf);
825 
826     // Process command line args
827     for (int i = 0; i < args.length; i++) {
828       String cmd = args[i];
829       if (cmd.equals("-snapshot")) {
830         snapshotName = args[++i];
831       } else if (cmd.equals("-target")) {
832         targetName = args[++i];
833       } else if (cmd.equals("-copy-to")) {
834         outputRoot = new Path(args[++i]);
835       } else if (cmd.equals("-copy-from")) {
836         inputRoot = new Path(args[++i]);
837         FSUtils.setRootDir(conf, inputRoot);
838       } else if (cmd.equals("-no-checksum-verify")) {
839         verifyChecksum = false;
840       } else if (cmd.equals("-no-target-verify")) {
841         verifyTarget = false;
842       } else if (cmd.equals("-mappers")) {
843         mappers = Integer.parseInt(args[++i]);
844       } else if (cmd.equals("-chuser")) {
845         filesUser = args[++i];
846       } else if (cmd.equals("-chgroup")) {
847         filesGroup = args[++i];
848       } else if (cmd.equals("-bandwidth")) {
849         bandwidthMB = Integer.parseInt(args[++i]);
850       } else if (cmd.equals("-chmod")) {
851         filesMode = Integer.parseInt(args[++i], 8);
852       } else if (cmd.equals("-overwrite")) {
853         overwrite = true;
854       } else if (cmd.equals("-h") || cmd.equals("--help")) {
855         printUsageAndExit();
856       } else {
857         System.err.println("UNEXPECTED: " + cmd);
858         printUsageAndExit();
859       }
860     }
861 
862     // Check user options
863     if (snapshotName == null) {
864       System.err.println("Snapshot name not provided.");
865       printUsageAndExit();
866     }
867 
868     if (outputRoot == null) {
869       System.err.println("Destination file-system not provided.");
870       printUsageAndExit();
871     }
872 
873     if (targetName == null) {
874       targetName = snapshotName;
875     }
876 
877     conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
878     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
879     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
880     conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
881     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
882     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
883 
884     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
885 
886     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
887     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
888     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
889     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
890 
891     // Check if the snapshot already exists
892     if (outputFs.exists(outputSnapshotDir)) {
893       if (overwrite) {
894         if (!outputFs.delete(outputSnapshotDir, true)) {
895           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
896           return 1;
897         }
898       } else {
899         System.err.println("The snapshot '" + targetName +
900           "' already exists in the destination: " + outputSnapshotDir);
901         return 1;
902       }
903     }
904 
905     if (!skipTmp) {
906       // Check if the snapshot already in-progress
907       if (outputFs.exists(snapshotTmpDir)) {
908         if (overwrite) {
909           if (!outputFs.delete(snapshotTmpDir, true)) {
910             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
911             return 1;
912           }
913         } else {
914           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
915           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
916           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
917           return 1;
918         }
919       }
920     }
921 
922     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
923     // The snapshot references must be copied before the hfiles otherwise the cleaner
924     // will remove them because they are unreferenced.
925     try {
926       LOG.info("Copy Snapshot Manifest");
927       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
928     } catch (IOException e) {
929       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
930         snapshotDir + " to=" + initialOutputSnapshotDir, e);
931     }
932 
933     // Write a new .snapshotinfo if the target name is different from the source name
934     if (!targetName.equals(snapshotName)) {
935       SnapshotDescription snapshotDesc =
936         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
937           .toBuilder()
938           .setName(targetName)
939           .build();
940       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
941     }
942 
943     // Step 2 - Start MR Job to copy files
944     // The snapshot references must be copied before the files otherwise the files gets removed
945     // by the HFileArchiver, since they have no references.
946     try {
947       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
948                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
949 
950       LOG.info("Finalize the Snapshot Export");
951       if (!skipTmp) {
952         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
953         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
954           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
955             snapshotTmpDir + " to=" + outputSnapshotDir);
956         }
957       }
958 
959       // Step 4 - Verify snapshot integrity
960       if (verifyTarget) {
961         LOG.info("Verify snapshot integrity");
962         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
963       }
964 
965       LOG.info("Export Completed: " + targetName);
966       return 0;
967     } catch (Exception e) {
968       LOG.error("Snapshot export failed", e);
969       if (!skipTmp) {
970         outputFs.delete(snapshotTmpDir, true);
971       }
972       outputFs.delete(outputSnapshotDir, true);
973       return 1;
974     } finally {
975       IOUtils.closeStream(inputFs);
976       IOUtils.closeStream(outputFs);
977     }
978   }
979 
980   // ExportSnapshot
981   private void printUsageAndExit() {
982     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
983     System.err.println(" where [options] are:");
984     System.err.println("  -h|-help                Show this help and exit.");
985     System.err.println("  -snapshot NAME          Snapshot to restore.");
986     System.err.println("  -copy-to NAME           Remote destination hdfs://");
987     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
988     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
989     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
990         "exported snapshot.");
991     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
992     System.err.println("  -chuser USERNAME        Change the owner of the files " +
993         "to the specified one.");
994     System.err.println("  -chgroup GROUP          Change the group of the files to " +
995         "the specified one.");
996     System.err.println("  -chmod MODE             Change the permission of the files " +
997         "to the specified one.");
998     System.err.println("  -mappers                Number of mappers to use during the " +
999         "copy (mapreduce.job.maps).");
1000     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
1001     System.err.println();
1002     System.err.println("Examples:");
1003     System.err.println("  hbase " + getClass().getName() + " \\");
1004     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1005     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1006     System.err.println();
1007     System.err.println("  hbase " + getClass().getName() + " \\");
1008     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1009     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1010     System.exit(1);
1011   }
1012 
1013   /**
1014    * The guts of the {@link #main} method.
1015    * Call this method to avoid the {@link #main(String[])} System.exit.
1016    * @param args
1017    * @return errCode
1018    * @throws Exception
1019    */
1020   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1021     return ToolRunner.run(conf, new ExportSnapshot(), args);
1022   }
1023 
1024   public static void main(String[] args) throws Exception {
1025     System.exit(innerMain(HBaseConfiguration.create(), args));
1026   }
1027 }