View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.io.BytesWritable;
62  import org.apache.hadoop.io.NullWritable;
63  import org.apache.hadoop.io.Writable;
64  import org.apache.hadoop.mapreduce.Job;
65  import org.apache.hadoop.mapreduce.JobContext;
66  import org.apache.hadoop.mapreduce.Mapper;
67  import org.apache.hadoop.mapreduce.InputFormat;
68  import org.apache.hadoop.mapreduce.InputSplit;
69  import org.apache.hadoop.mapreduce.RecordReader;
70  import org.apache.hadoop.mapreduce.TaskAttemptContext;
71  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
72  import org.apache.hadoop.mapreduce.security.TokenCache;
73  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
74  import org.apache.hadoop.util.StringUtils;
75  import org.apache.hadoop.util.Tool;
76  import org.apache.hadoop.util.ToolRunner;
77  
78  /**
79   * Export the specified snapshot to a given FileSystem.
80   *
81   * The .snapshot/name folder is copied to the destination cluster
82   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
83   * When everything is done, the second cluster can restore the snapshot.
84   */
85  @InterfaceAudience.Public
86  @InterfaceStability.Evolving
87  public class ExportSnapshot extends Configured implements Tool {
88    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
89  
90    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
91    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
92    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
93    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
94    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
95    private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
96    private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
97    private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
98    private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
99    private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
100   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
101   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
102   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
103   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
104 
105   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
106   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
107 
108   private static final String INPUT_FOLDER_PREFIX = "export-files.";
109 
110   // Export Map-Reduce Counters, to keep track of the progress
111   public enum Counter {
112     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
113     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
114   }
115 
116   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
117                                                    NullWritable, NullWritable> {
118     final static int REPORT_SIZE = 1 * 1024 * 1024;
119     final static int BUFFER_SIZE = 64 * 1024;
120 
121     private boolean testFailures;
122     private Random random;
123 
124     private boolean verifyChecksum;
125     private String filesGroup;
126     private String filesUser;
127     private short filesMode;
128     private int bufferSize;
129 
130     private FileSystem outputFs;
131     private Path outputArchive;
132     private Path outputRoot;
133 
134     private FileSystem inputFs;
135     private Path inputArchive;
136     private Path inputRoot;
137 
138     @Override
139     public void setup(Context context) throws IOException {
140       Configuration conf = context.getConfiguration();
141       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
142 
143       filesGroup = conf.get(CONF_FILES_GROUP);
144       filesUser = conf.get(CONF_FILES_USER);
145       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
146       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
147       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
148 
149       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
150       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
151 
152       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
153 
154       try {
155         inputFs = FileSystem.get(inputRoot.toUri(), conf);
156       } catch (IOException e) {
157         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
158       }
159 
160       try {
161         outputFs = FileSystem.get(outputRoot.toUri(), conf);
162       } catch (IOException e) {
163         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
164       }
165 
166       // Use the default block size of the outputFs if bigger
167       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
168       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
169       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
170 
171       for (Counter c : Counter.values()) {
172         context.getCounter(c).increment(0);
173       }
174     }
175 
176     @Override
177     public void map(BytesWritable key, NullWritable value, Context context)
178         throws InterruptedException, IOException {
179       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
180       Path outputPath = getOutputPath(inputInfo);
181 
182       copyFile(context, inputInfo, outputPath);
183     }
184 
185     /**
186      * Returns the location where the inputPath will be copied.
187      */
188     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
189       Path path = null;
190       switch (inputInfo.getType()) {
191         case HFILE:
192           Path inputPath = new Path(inputInfo.getHfile());
193           String family = inputPath.getParent().getName();
194           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
195           String region = HFileLink.getReferencedRegionName(inputPath.getName());
196           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
197           path = new Path(FSUtils.getTableDir(new Path("./"), table),
198               new Path(region, new Path(family, hfile)));
199           break;
200         case WAL:
201           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
202           path = new Path(oldLogsDir, inputInfo.getWalName());
203           break;
204         default:
205           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
206       }
207       return new Path(outputArchive, path);
208     }
209 
210     /*
211      * Used by TestExportSnapshot to simulate a failure
212      */
213     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
214         throws IOException {
215       if (testFailures) {
216         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
217           if (random == null) {
218             random = new Random();
219           }
220 
221           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
222           // retry, but at least we reduce the number of test failures due to
223           // this test exception from the same map task.
224           if (random.nextFloat() < 0.03) {
225             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
226                                   + " time=" + System.currentTimeMillis());
227           }
228         } else {
229           context.getCounter(Counter.COPY_FAILED).increment(1);
230           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
231         }
232       }
233     }
234 
235     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
236         final Path outputPath) throws IOException {
237       injectTestFailure(context, inputInfo);
238 
239       // Get the file information
240       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
241 
242       // Verify if the output file exists and is the same that we want to copy
243       if (outputFs.exists(outputPath)) {
244         FileStatus outputStat = outputFs.getFileStatus(outputPath);
245         if (outputStat != null && sameFile(inputStat, outputStat)) {
246           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
247           context.getCounter(Counter.FILES_SKIPPED).increment(1);
248           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
249           return;
250         }
251       }
252 
253       InputStream in = openSourceFile(context, inputInfo);
254       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
255       if (Integer.MAX_VALUE != bandwidthMB) {
256         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
257       }
258 
259       try {
260         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
261 
262         // Ensure that the output folder is there and copy the file
263         outputFs.mkdirs(outputPath.getParent());
264         FSDataOutputStream out = outputFs.create(outputPath, true);
265         try {
266           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
267         } finally {
268           out.close();
269         }
270 
271         // Try to Preserve attributes
272         if (!preserveAttributes(outputPath, inputStat)) {
273           LOG.warn("You may have to run manually chown on: " + outputPath);
274         }
275       } finally {
276         in.close();
277       }
278     }
279 
280     /**
281      * Try to Preserve the files attribute selected by the user copying them from the source file
282      * This is only required when you are exporting as a different user than "hbase" or on a system
283      * that doesn't have the "hbase" user.
284      *
285      * This is not considered a blocking failure since the user can force a chmod with the user
286      * that knows is available on the system.
287      */
288     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
289       FileStatus stat;
290       try {
291         stat = outputFs.getFileStatus(path);
292       } catch (IOException e) {
293         LOG.warn("Unable to get the status for file=" + path);
294         return false;
295       }
296 
297       try {
298         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
299           outputFs.setPermission(path, new FsPermission(filesMode));
300         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
301           outputFs.setPermission(path, refStat.getPermission());
302         }
303       } catch (IOException e) {
304         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
305         return false;
306       }
307 
308       boolean hasRefStat = (refStat != null);
309       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
310       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
311       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
312         try {
313           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
314             outputFs.setOwner(path, user, group);
315           }
316         } catch (IOException e) {
317           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
318           LOG.warn("The user/group may not exist on the destination cluster: user=" +
319                    user + " group=" + group);
320           return false;
321         }
322       }
323 
324       return true;
325     }
326 
327     private boolean stringIsNotEmpty(final String str) {
328       return str != null && str.length() > 0;
329     }
330 
331     private void copyData(final Context context,
332         final Path inputPath, final InputStream in,
333         final Path outputPath, final FSDataOutputStream out,
334         final long inputFileSize)
335         throws IOException {
336       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
337                                    " (%.1f%%)";
338 
339       try {
340         byte[] buffer = new byte[bufferSize];
341         long totalBytesWritten = 0;
342         int reportBytes = 0;
343         int bytesRead;
344 
345         long stime = System.currentTimeMillis();
346         while ((bytesRead = in.read(buffer)) > 0) {
347           out.write(buffer, 0, bytesRead);
348           totalBytesWritten += bytesRead;
349           reportBytes += bytesRead;
350 
351           if (reportBytes >= REPORT_SIZE) {
352             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
353             context.setStatus(String.format(statusMessage,
354                               StringUtils.humanReadableInt(totalBytesWritten),
355                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
356                               " from " + inputPath + " to " + outputPath);
357             reportBytes = 0;
358           }
359         }
360         long etime = System.currentTimeMillis();
361 
362         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
363         context.setStatus(String.format(statusMessage,
364                           StringUtils.humanReadableInt(totalBytesWritten),
365                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
366                           " from " + inputPath + " to " + outputPath);
367 
368         // Verify that the written size match
369         if (totalBytesWritten != inputFileSize) {
370           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
371                        " expected=" + inputFileSize + " for file=" + inputPath;
372           throw new IOException(msg);
373         }
374 
375         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
376         LOG.info("size=" + totalBytesWritten +
377             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
378             " time=" + StringUtils.formatTimeDiff(etime, stime) +
379             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
380         context.getCounter(Counter.FILES_COPIED).increment(1);
381       } catch (IOException e) {
382         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
383         context.getCounter(Counter.COPY_FAILED).increment(1);
384         throw e;
385       }
386     }
387 
388     /**
389      * Try to open the "source" file.
390      * Throws an IOException if the communication with the inputFs fail or
391      * if the file is not found.
392      */
393     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
394         throws IOException {
395       try {
396         FileLink link = null;
397         switch (fileInfo.getType()) {
398           case HFILE:
399             Path inputPath = new Path(fileInfo.getHfile());
400             link = new HFileLink(inputRoot, inputArchive, inputPath);
401             break;
402           case WAL:
403             String serverName = fileInfo.getWalServer();
404             String logName = fileInfo.getWalName();
405             link = new WALLink(inputRoot, serverName, logName);
406             break;
407           default:
408             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
409         }
410         return link.open(inputFs);
411       } catch (IOException e) {
412         context.getCounter(Counter.MISSING_FILES).increment(1);
413         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
414         throw e;
415       }
416     }
417 
418     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
419         throws IOException {
420       try {
421         FileLink link = null;
422         switch (fileInfo.getType()) {
423           case HFILE:
424             Path inputPath = new Path(fileInfo.getHfile());
425             link = new HFileLink(inputRoot, inputArchive, inputPath);
426             break;
427           case WAL:
428             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
429             break;
430           default:
431             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
432         }
433         return link.getFileStatus(inputFs);
434       } catch (FileNotFoundException e) {
435         context.getCounter(Counter.MISSING_FILES).increment(1);
436         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
437         throw e;
438       } catch (IOException e) {
439         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
440         throw e;
441       }
442     }
443 
444     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
445       try {
446         return fs.getFileChecksum(path);
447       } catch (IOException e) {
448         LOG.warn("Unable to get checksum for file=" + path, e);
449         return null;
450       }
451     }
452 
453     /**
454      * Check if the two files are equal by looking at the file length,
455      * and at the checksum (if user has specified the verifyChecksum flag).
456      */
457     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
458       // Not matching length
459       if (inputStat.getLen() != outputStat.getLen()) return false;
460 
461       // Mark files as equals, since user asked for no checksum verification
462       if (!verifyChecksum) return true;
463 
464       // If checksums are not available, files are not the same.
465       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
466       if (inChecksum == null) return false;
467 
468       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
469       if (outChecksum == null) return false;
470 
471       return inChecksum.equals(outChecksum);
472     }
473   }
474 
475   // ==========================================================================
476   //  Input Format
477   // ==========================================================================
478 
479   /**
480    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
481    * @return list of files referenced by the snapshot (pair of path and size)
482    */
483   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
484       final FileSystem fs, final Path snapshotDir) throws IOException {
485     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
486 
487     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
488     final TableName table = TableName.valueOf(snapshotDesc.getTable());
489 
490     // Get snapshot files
491     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
492     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
493       new SnapshotReferenceUtil.SnapshotVisitor() {
494         @Override
495         public void storeFile(final HRegionInfo regionInfo, final String family,
496             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
497           if (storeFile.hasReference()) {
498             // copied as part of the manifest
499           } else {
500             String region = regionInfo.getEncodedName();
501             String hfile = storeFile.getName();
502             Path path = HFileLink.createPath(table, region, family, hfile);
503 
504             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
505               .setType(SnapshotFileInfo.Type.HFILE)
506               .setHfile(path.toString())
507               .build();
508 
509             long size;
510             if (storeFile.hasFileSize()) {
511               size = storeFile.getFileSize();
512             } else {
513               size = new HFileLink(conf, path).getFileStatus(fs).getLen();
514             }
515             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
516           }
517         }
518 
519         @Override
520         public void logFile (final String server, final String logfile)
521             throws IOException {
522           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
523             .setType(SnapshotFileInfo.Type.WAL)
524             .setWalServer(server)
525             .setWalName(logfile)
526             .build();
527 
528           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
529           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
530         }
531     });
532 
533     return files;
534   }
535 
536   /**
537    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
538    * The groups created will have similar amounts of bytes.
539    * <p>
540    * The algorithm used is pretty straightforward; the file list is sorted by size,
541    * and then each group fetch the bigger file available, iterating through groups
542    * alternating the direction.
543    */
544   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
545       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
546     // Sort files by size, from small to big
547     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
548       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
549         long r = a.getSecond() - b.getSecond();
550         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
551       }
552     });
553 
554     // create balanced groups
555     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
556       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
557     long[] sizeGroups = new long[ngroups];
558     int hi = files.size() - 1;
559     int lo = 0;
560 
561     List<Pair<SnapshotFileInfo, Long>> group;
562     int dir = 1;
563     int g = 0;
564 
565     while (hi >= lo) {
566       if (g == fileGroups.size()) {
567         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
568         fileGroups.add(group);
569       } else {
570         group = fileGroups.get(g);
571       }
572 
573       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
574 
575       // add the hi one
576       sizeGroups[g] += fileInfo.getSecond();
577       group.add(fileInfo);
578 
579       // change direction when at the end or the beginning
580       g += dir;
581       if (g == ngroups) {
582         dir = -1;
583         g = ngroups - 1;
584       } else if (g < 0) {
585         dir = 1;
586         g = 0;
587       }
588     }
589 
590     if (LOG.isDebugEnabled()) {
591       for (int i = 0; i < sizeGroups.length; ++i) {
592         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
593       }
594     }
595 
596     return fileGroups;
597   }
598 
599   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
600     @Override
601     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
602         TaskAttemptContext tac) throws IOException, InterruptedException {
603       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
604     }
605 
606     @Override
607     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
608       Configuration conf = context.getConfiguration();
609       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
610       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
611       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
612 
613       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
614       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
615       if (mappers == 0 && snapshotFiles.size() > 0) {
616         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
617         mappers = Math.min(mappers, snapshotFiles.size());
618         conf.setInt(CONF_NUM_SPLITS, mappers);
619         conf.setInt(MR_NUM_MAPS, mappers);
620       }
621 
622       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
623       List<InputSplit> splits = new ArrayList(groups.size());
624       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
625         splits.add(new ExportSnapshotInputSplit(files));
626       }
627       return splits;
628     }
629 
630     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
631       private List<Pair<BytesWritable, Long>> files;
632       private long length;
633 
634       public ExportSnapshotInputSplit() {
635         this.files = null;
636       }
637 
638       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
639         this.files = new ArrayList(snapshotFiles.size());
640         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
641           this.files.add(new Pair<BytesWritable, Long>(
642             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
643           this.length += fileInfo.getSecond();
644         }
645       }
646 
647       private List<Pair<BytesWritable, Long>> getSplitKeys() {
648         return files;
649       }
650 
651       @Override
652       public long getLength() throws IOException, InterruptedException {
653         return length;
654       }
655 
656       @Override
657       public String[] getLocations() throws IOException, InterruptedException {
658         return new String[] {};
659       }
660 
661       @Override
662       public void readFields(DataInput in) throws IOException {
663         int count = in.readInt();
664         files = new ArrayList<Pair<BytesWritable, Long>>(count);
665         length = 0;
666         for (int i = 0; i < count; ++i) {
667           BytesWritable fileInfo = new BytesWritable();
668           fileInfo.readFields(in);
669           long size = in.readLong();
670           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
671           length += size;
672         }
673       }
674 
675       @Override
676       public void write(DataOutput out) throws IOException {
677         out.writeInt(files.size());
678         for (final Pair<BytesWritable, Long> fileInfo: files) {
679           fileInfo.getFirst().write(out);
680           out.writeLong(fileInfo.getSecond());
681         }
682       }
683     }
684 
685     private static class ExportSnapshotRecordReader
686         extends RecordReader<BytesWritable, NullWritable> {
687       private final List<Pair<BytesWritable, Long>> files;
688       private long totalSize = 0;
689       private long procSize = 0;
690       private int index = -1;
691 
692       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
693         this.files = files;
694         for (Pair<BytesWritable, Long> fileInfo: files) {
695           totalSize += fileInfo.getSecond();
696         }
697       }
698 
699       @Override
700       public void close() { }
701 
702       @Override
703       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
704 
705       @Override
706       public NullWritable getCurrentValue() { return NullWritable.get(); }
707 
708       @Override
709       public float getProgress() { return (float)procSize / totalSize; }
710 
711       @Override
712       public void initialize(InputSplit split, TaskAttemptContext tac) { }
713 
714       @Override
715       public boolean nextKeyValue() {
716         if (index >= 0) {
717           procSize += files.get(index).getSecond();
718         }
719         return(++index < files.size());
720       }
721     }
722   }
723 
724   // ==========================================================================
725   //  Tool
726   // ==========================================================================
727 
728   /**
729    * Run Map-Reduce Job to perform the files copy.
730    */
731   private void runCopyJob(final Path inputRoot, final Path outputRoot,
732       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
733       final String filesUser, final String filesGroup, final int filesMode,
734       final int mappers, final int bandwidthMB)
735           throws IOException, InterruptedException, ClassNotFoundException {
736     Configuration conf = getConf();
737     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
738     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
739     if (mappers > 0) {
740       conf.setInt(CONF_NUM_SPLITS, mappers);
741       conf.setInt(MR_NUM_MAPS, mappers);
742     }
743     conf.setInt(CONF_FILES_MODE, filesMode);
744     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
745     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
746     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
747     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
748     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
749     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
750 
751     Job job = new Job(conf);
752     job.setJobName("ExportSnapshot-" + snapshotName);
753     job.setJarByClass(ExportSnapshot.class);
754     TableMapReduceUtil.addDependencyJars(job);
755     job.setMapperClass(ExportMapper.class);
756     job.setInputFormatClass(ExportSnapshotInputFormat.class);
757     job.setOutputFormatClass(NullOutputFormat.class);
758     job.setMapSpeculativeExecution(false);
759     job.setNumReduceTasks(0);
760 
761     // Acquire the delegation Tokens
762     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
763       new Path[] { inputRoot, outputRoot }, conf);
764 
765     // Run the MR Job
766     if (!job.waitForCompletion(true)) {
767       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
768       // when it will be available on all the supported versions.
769       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
770     }
771   }
772 
773   private void verifySnapshot(final Configuration baseConf,
774       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
775     // Update the conf with the current root dir, since may be a different cluster
776     Configuration conf = new Configuration(baseConf);
777     FSUtils.setRootDir(conf, rootDir);
778     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
779     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
780     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
781   }
782 
783   /**
784    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
785    * @return 0 on success, and != 0 upon failure.
786    */
787   @Override
788   public int run(String[] args) throws IOException {
789     boolean verifyTarget = true;
790     boolean verifyChecksum = true;
791     String snapshotName = null;
792     String targetName = null;
793     boolean overwrite = false;
794     String filesGroup = null;
795     String filesUser = null;
796     Path outputRoot = null;
797     int bandwidthMB = Integer.MAX_VALUE;
798     int filesMode = 0;
799     int mappers = 0;
800 
801     Configuration conf = getConf();
802     Path inputRoot = FSUtils.getRootDir(conf);
803 
804     // Process command line args
805     for (int i = 0; i < args.length; i++) {
806       String cmd = args[i];
807       if (cmd.equals("-snapshot")) {
808         snapshotName = args[++i];
809       } else if (cmd.equals("-target")) {
810         targetName = args[++i];
811       } else if (cmd.equals("-copy-to")) {
812         outputRoot = new Path(args[++i]);
813       } else if (cmd.equals("-copy-from")) {
814         inputRoot = new Path(args[++i]);
815         FSUtils.setRootDir(conf, inputRoot);
816       } else if (cmd.equals("-no-checksum-verify")) {
817         verifyChecksum = false;
818       } else if (cmd.equals("-no-target-verify")) {
819         verifyTarget = false;
820       } else if (cmd.equals("-mappers")) {
821         mappers = Integer.parseInt(args[++i]);
822       } else if (cmd.equals("-chuser")) {
823         filesUser = args[++i];
824       } else if (cmd.equals("-chgroup")) {
825         filesGroup = args[++i];
826       } else if (cmd.equals("-bandwidth")) {
827         bandwidthMB = Integer.parseInt(args[++i]);
828       } else if (cmd.equals("-chmod")) {
829         filesMode = Integer.parseInt(args[++i], 8);
830       } else if (cmd.equals("-overwrite")) {
831         overwrite = true;
832       } else if (cmd.equals("-h") || cmd.equals("--help")) {
833         printUsageAndExit();
834       } else {
835         System.err.println("UNEXPECTED: " + cmd);
836         printUsageAndExit();
837       }
838     }
839 
840     // Check user options
841     if (snapshotName == null) {
842       System.err.println("Snapshot name not provided.");
843       printUsageAndExit();
844     }
845 
846     if (outputRoot == null) {
847       System.err.println("Destination file-system not provided.");
848       printUsageAndExit();
849     }
850 
851     if (targetName == null) {
852       targetName = snapshotName;
853     }
854 
855     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
856     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
857     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
858     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
859 
860     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
861 
862     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
863     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
864     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
865     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
866 
867     // Check if the snapshot already exists
868     if (outputFs.exists(outputSnapshotDir)) {
869       if (overwrite) {
870         if (!outputFs.delete(outputSnapshotDir, true)) {
871           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
872           return 1;
873         }
874       } else {
875         System.err.println("The snapshot '" + targetName +
876           "' already exists in the destination: " + outputSnapshotDir);
877         return 1;
878       }
879     }
880 
881     if (!skipTmp) {
882       // Check if the snapshot already in-progress
883       if (outputFs.exists(snapshotTmpDir)) {
884         if (overwrite) {
885           if (!outputFs.delete(snapshotTmpDir, true)) {
886             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
887             return 1;
888           }
889         } else {
890           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
891           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
892           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
893           return 1;
894         }
895       }
896     }
897 
898     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
899     // The snapshot references must be copied before the hfiles otherwise the cleaner
900     // will remove them because they are unreferenced.
901     try {
902       LOG.info("Copy Snapshot Manifest");
903       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
904     } catch (IOException e) {
905       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
906         snapshotDir + " to=" + initialOutputSnapshotDir, e);
907     }
908 
909     // Write a new .snapshotinfo if the target name is different from the source name
910     if (!targetName.equals(snapshotName)) {
911       SnapshotDescription snapshotDesc =
912         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
913           .toBuilder()
914           .setName(targetName)
915           .build();
916       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
917     }
918 
919     // Step 2 - Start MR Job to copy files
920     // The snapshot references must be copied before the files otherwise the files gets removed
921     // by the HFileArchiver, since they have no references.
922     try {
923       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
924                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
925 
926       LOG.info("Finalize the Snapshot Export");
927       if (!skipTmp) {
928         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
929         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
930           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
931             snapshotTmpDir + " to=" + outputSnapshotDir);
932         }
933       }
934 
935       // Step 4 - Verify snapshot integrity
936       if (verifyTarget) {
937         LOG.info("Verify snapshot integrity");
938         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
939       }
940 
941       LOG.info("Export Completed: " + targetName);
942       return 0;
943     } catch (Exception e) {
944       LOG.error("Snapshot export failed", e);
945       if (!skipTmp) {
946         outputFs.delete(snapshotTmpDir, true);
947       }
948       outputFs.delete(outputSnapshotDir, true);
949       return 1;
950     }
951   }
952 
953   // ExportSnapshot
954   private void printUsageAndExit() {
955     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
956     System.err.println(" where [options] are:");
957     System.err.println("  -h|-help                Show this help and exit.");
958     System.err.println("  -snapshot NAME          Snapshot to restore.");
959     System.err.println("  -copy-to NAME           Remote destination hdfs://");
960     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
961     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
962     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
963         "exported snapshot.");
964     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
965     System.err.println("  -chuser USERNAME        Change the owner of the files " +
966         "to the specified one.");
967     System.err.println("  -chgroup GROUP          Change the group of the files to " +
968         "the specified one.");
969     System.err.println("  -chmod MODE             Change the permission of the files " +
970         "to the specified one.");
971     System.err.println("  -mappers                Number of mappers to use during the " +
972         "copy (mapreduce.job.maps).");
973     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
974     System.err.println();
975     System.err.println("Examples:");
976     System.err.println("  hbase " + getClass().getName() + " \\");
977     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
978     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
979     System.err.println();
980     System.err.println("  hbase " + getClass().getName() + " \\");
981     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
982     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
983     System.exit(1);
984   }
985 
986   /**
987    * The guts of the {@link #main} method.
988    * Call this method to avoid the {@link #main(String[])} System.exit.
989    * @param args
990    * @return errCode
991    * @throws Exception
992    */
993   static int innerMain(final Configuration conf, final String [] args) throws Exception {
994     return ToolRunner.run(conf, new ExportSnapshot(), args);
995   }
996 
997   public static void main(String[] args) throws Exception {
998     System.exit(innerMain(HBaseConfiguration.create(), args));
999   }
1000 }