View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.io.BytesWritable;
62  import org.apache.hadoop.io.IOUtils;
63  import org.apache.hadoop.io.NullWritable;
64  import org.apache.hadoop.io.Writable;
65  import org.apache.hadoop.mapreduce.Job;
66  import org.apache.hadoop.mapreduce.JobContext;
67  import org.apache.hadoop.mapreduce.Mapper;
68  import org.apache.hadoop.mapreduce.InputFormat;
69  import org.apache.hadoop.mapreduce.InputSplit;
70  import org.apache.hadoop.mapreduce.RecordReader;
71  import org.apache.hadoop.mapreduce.TaskAttemptContext;
72  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73  import org.apache.hadoop.mapreduce.security.TokenCache;
74  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75  import org.apache.hadoop.util.StringUtils;
76  import org.apache.hadoop.util.Tool;
77  import org.apache.hadoop.util.ToolRunner;
78  
79  /**
80   * Export the specified snapshot to a given FileSystem.
81   *
82   * The .snapshot/name folder is copied to the destination cluster
83   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
84   * When everything is done, the second cluster can restore the snapshot.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class ExportSnapshot extends Configured implements Tool {
89    public static final String NAME = "exportsnapshot";
90  
91    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
92  
93    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
94    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
95    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
96    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
97    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
98    private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
99    private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
100   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
101   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
102   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
103   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
104   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
105   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
106   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
107 
108   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
109   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
110 
111   private static final String INPUT_FOLDER_PREFIX = "export-files.";
112 
113   // Export Map-Reduce Counters, to keep track of the progress
114   public enum Counter {
115     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
116     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
117   }
118 
119   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
120                                                    NullWritable, NullWritable> {
121     final static int REPORT_SIZE = 1 * 1024 * 1024;
122     final static int BUFFER_SIZE = 64 * 1024;
123 
124     private boolean testFailures;
125     private Random random;
126 
127     private boolean verifyChecksum;
128     private String filesGroup;
129     private String filesUser;
130     private short filesMode;
131     private int bufferSize;
132 
133     private FileSystem outputFs;
134     private Path outputArchive;
135     private Path outputRoot;
136 
137     private FileSystem inputFs;
138     private Path inputArchive;
139     private Path inputRoot;
140 
141     @Override
142     public void setup(Context context) throws IOException {
143       Configuration conf = context.getConfiguration();
144       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
145 
146       filesGroup = conf.get(CONF_FILES_GROUP);
147       filesUser = conf.get(CONF_FILES_USER);
148       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
149       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
150       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
151 
152       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
153       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
154 
155       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
156 
157       try {
158         conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
159         inputFs = FileSystem.get(inputRoot.toUri(), conf);
160       } catch (IOException e) {
161         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
162       }
163 
164       try {
165         conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166         outputFs = FileSystem.get(outputRoot.toUri(), conf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
169       }
170 
171       // Use the default block size of the outputFs if bigger
172       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
173       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
174       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
175 
176       for (Counter c : Counter.values()) {
177         context.getCounter(c).increment(0);
178       }
179     }
180 
181     @Override
182     protected void cleanup(Context context) {
183       IOUtils.closeStream(inputFs);
184       IOUtils.closeStream(outputFs);
185     }
186 
187     @Override
188     public void map(BytesWritable key, NullWritable value, Context context)
189         throws InterruptedException, IOException {
190       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
191       Path outputPath = getOutputPath(inputInfo);
192 
193       copyFile(context, inputInfo, outputPath);
194     }
195 
196     /**
197      * Returns the location where the inputPath will be copied.
198      */
199     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
200       Path path = null;
201       switch (inputInfo.getType()) {
202         case HFILE:
203           Path inputPath = new Path(inputInfo.getHfile());
204           String family = inputPath.getParent().getName();
205           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
206           String region = HFileLink.getReferencedRegionName(inputPath.getName());
207           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
208           path = new Path(FSUtils.getTableDir(new Path("./"), table),
209               new Path(region, new Path(family, hfile)));
210           break;
211         case WAL:
212           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
213           path = new Path(oldLogsDir, inputInfo.getWalName());
214           break;
215         default:
216           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
217       }
218       return new Path(outputArchive, path);
219     }
220 
221     /*
222      * Used by TestExportSnapshot to simulate a failure
223      */
224     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
225         throws IOException {
226       if (testFailures) {
227         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
228           if (random == null) {
229             random = new Random();
230           }
231 
232           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
233           // retry, but at least we reduce the number of test failures due to
234           // this test exception from the same map task.
235           if (random.nextFloat() < 0.03) {
236             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
237                                   + " time=" + System.currentTimeMillis());
238           }
239         } else {
240           context.getCounter(Counter.COPY_FAILED).increment(1);
241           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
242         }
243       }
244     }
245 
246     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
247         final Path outputPath) throws IOException {
248       injectTestFailure(context, inputInfo);
249 
250       // Get the file information
251       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
252 
253       // Verify if the output file exists and is the same that we want to copy
254       if (outputFs.exists(outputPath)) {
255         FileStatus outputStat = outputFs.getFileStatus(outputPath);
256         if (outputStat != null && sameFile(inputStat, outputStat)) {
257           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
258           context.getCounter(Counter.FILES_SKIPPED).increment(1);
259           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
260           return;
261         }
262       }
263 
264       InputStream in = openSourceFile(context, inputInfo);
265       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
266       if (Integer.MAX_VALUE != bandwidthMB) {
267         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
268       }
269 
270       try {
271         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
272 
273         // Ensure that the output folder is there and copy the file
274         outputFs.mkdirs(outputPath.getParent());
275         FSDataOutputStream out = outputFs.create(outputPath, true);
276         try {
277           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
278         } finally {
279           out.close();
280         }
281 
282         // Try to Preserve attributes
283         if (!preserveAttributes(outputPath, inputStat)) {
284           LOG.warn("You may have to run manually chown on: " + outputPath);
285         }
286       } finally {
287         in.close();
288       }
289     }
290 
291     /**
292      * Try to Preserve the files attribute selected by the user copying them from the source file
293      * This is only required when you are exporting as a different user than "hbase" or on a system
294      * that doesn't have the "hbase" user.
295      *
296      * This is not considered a blocking failure since the user can force a chmod with the user
297      * that knows is available on the system.
298      */
299     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
300       FileStatus stat;
301       try {
302         stat = outputFs.getFileStatus(path);
303       } catch (IOException e) {
304         LOG.warn("Unable to get the status for file=" + path);
305         return false;
306       }
307 
308       try {
309         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
310           outputFs.setPermission(path, new FsPermission(filesMode));
311         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
312           outputFs.setPermission(path, refStat.getPermission());
313         }
314       } catch (IOException e) {
315         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
316         return false;
317       }
318 
319       boolean hasRefStat = (refStat != null);
320       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
321       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
322       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
323         try {
324           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
325             outputFs.setOwner(path, user, group);
326           }
327         } catch (IOException e) {
328           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
329           LOG.warn("The user/group may not exist on the destination cluster: user=" +
330                    user + " group=" + group);
331           return false;
332         }
333       }
334 
335       return true;
336     }
337 
338     private boolean stringIsNotEmpty(final String str) {
339       return str != null && str.length() > 0;
340     }
341 
342     private void copyData(final Context context,
343         final Path inputPath, final InputStream in,
344         final Path outputPath, final FSDataOutputStream out,
345         final long inputFileSize)
346         throws IOException {
347       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
348                                    " (%.1f%%)";
349 
350       try {
351         byte[] buffer = new byte[bufferSize];
352         long totalBytesWritten = 0;
353         int reportBytes = 0;
354         int bytesRead;
355 
356         long stime = System.currentTimeMillis();
357         while ((bytesRead = in.read(buffer)) > 0) {
358           out.write(buffer, 0, bytesRead);
359           totalBytesWritten += bytesRead;
360           reportBytes += bytesRead;
361 
362           if (reportBytes >= REPORT_SIZE) {
363             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
364             context.setStatus(String.format(statusMessage,
365                               StringUtils.humanReadableInt(totalBytesWritten),
366                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
367                               " from " + inputPath + " to " + outputPath);
368             reportBytes = 0;
369           }
370         }
371         long etime = System.currentTimeMillis();
372 
373         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
374         context.setStatus(String.format(statusMessage,
375                           StringUtils.humanReadableInt(totalBytesWritten),
376                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
377                           " from " + inputPath + " to " + outputPath);
378 
379         // Verify that the written size match
380         if (totalBytesWritten != inputFileSize) {
381           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
382                        " expected=" + inputFileSize + " for file=" + inputPath;
383           throw new IOException(msg);
384         }
385 
386         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
387         LOG.info("size=" + totalBytesWritten +
388             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
389             " time=" + StringUtils.formatTimeDiff(etime, stime) +
390             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
391         context.getCounter(Counter.FILES_COPIED).increment(1);
392       } catch (IOException e) {
393         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
394         context.getCounter(Counter.COPY_FAILED).increment(1);
395         throw e;
396       }
397     }
398 
399     /**
400      * Try to open the "source" file.
401      * Throws an IOException if the communication with the inputFs fail or
402      * if the file is not found.
403      */
404     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
405             throws IOException {
406       try {
407         Configuration conf = context.getConfiguration();
408         FileLink link = null;
409         switch (fileInfo.getType()) {
410           case HFILE:
411             Path inputPath = new Path(fileInfo.getHfile());
412             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
413             break;
414           case WAL:
415             String serverName = fileInfo.getWalServer();
416             String logName = fileInfo.getWalName();
417             link = new WALLink(inputRoot, serverName, logName);
418             break;
419           default:
420             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
421         }
422         return link.open(inputFs);
423       } catch (IOException e) {
424         context.getCounter(Counter.MISSING_FILES).increment(1);
425         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
426         throw e;
427       }
428     }
429 
430     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
431         throws IOException {
432       try {
433         Configuration conf = context.getConfiguration();
434         FileLink link = null;
435         switch (fileInfo.getType()) {
436           case HFILE:
437             Path inputPath = new Path(fileInfo.getHfile());
438             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
439             break;
440           case WAL:
441             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
442             break;
443           default:
444             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
445         }
446         return link.getFileStatus(inputFs);
447       } catch (FileNotFoundException e) {
448         context.getCounter(Counter.MISSING_FILES).increment(1);
449         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
450         throw e;
451       } catch (IOException e) {
452         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
453         throw e;
454       }
455     }
456 
457     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
458       try {
459         return fs.getFileChecksum(path);
460       } catch (IOException e) {
461         LOG.warn("Unable to get checksum for file=" + path, e);
462         return null;
463       }
464     }
465 
466     /**
467      * Check if the two files are equal by looking at the file length,
468      * and at the checksum (if user has specified the verifyChecksum flag).
469      */
470     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
471       // Not matching length
472       if (inputStat.getLen() != outputStat.getLen()) return false;
473 
474       // Mark files as equals, since user asked for no checksum verification
475       if (!verifyChecksum) return true;
476 
477       // If checksums are not available, files are not the same.
478       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
479       if (inChecksum == null) return false;
480 
481       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
482       if (outChecksum == null) return false;
483 
484       return inChecksum.equals(outChecksum);
485     }
486   }
487 
488   // ==========================================================================
489   //  Input Format
490   // ==========================================================================
491 
492   /**
493    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
494    * @return list of files referenced by the snapshot (pair of path and size)
495    */
496   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
497       final FileSystem fs, final Path snapshotDir) throws IOException {
498     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
499 
500     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
501     final TableName table = TableName.valueOf(snapshotDesc.getTable());
502 
503     // Get snapshot files
504     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
505     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
506       new SnapshotReferenceUtil.SnapshotVisitor() {
507         @Override
508         public void storeFile(final HRegionInfo regionInfo, final String family,
509             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
510           if (storeFile.hasReference()) {
511             // copied as part of the manifest
512           } else {
513             String region = regionInfo.getEncodedName();
514             String hfile = storeFile.getName();
515             Path path = HFileLink.createPath(table, region, family, hfile);
516 
517             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
518               .setType(SnapshotFileInfo.Type.HFILE)
519               .setHfile(path.toString())
520               .build();
521 
522             long size;
523             if (storeFile.hasFileSize()) {
524               size = storeFile.getFileSize();
525             } else {
526               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
527             }
528             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
529           }
530         }
531 
532         @Override
533         public void logFile (final String server, final String logfile)
534             throws IOException {
535           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
536             .setType(SnapshotFileInfo.Type.WAL)
537             .setWalServer(server)
538             .setWalName(logfile)
539             .build();
540 
541           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
542           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
543         }
544     });
545 
546     return files;
547   }
548 
549   /**
550    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
551    * The groups created will have similar amounts of bytes.
552    * <p>
553    * The algorithm used is pretty straightforward; the file list is sorted by size,
554    * and then each group fetch the bigger file available, iterating through groups
555    * alternating the direction.
556    */
557   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
558       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
559     // Sort files by size, from small to big
560     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
561       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
562         long r = a.getSecond() - b.getSecond();
563         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
564       }
565     });
566 
567     // create balanced groups
568     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
569       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
570     long[] sizeGroups = new long[ngroups];
571     int hi = files.size() - 1;
572     int lo = 0;
573 
574     List<Pair<SnapshotFileInfo, Long>> group;
575     int dir = 1;
576     int g = 0;
577 
578     while (hi >= lo) {
579       if (g == fileGroups.size()) {
580         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
581         fileGroups.add(group);
582       } else {
583         group = fileGroups.get(g);
584       }
585 
586       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
587 
588       // add the hi one
589       sizeGroups[g] += fileInfo.getSecond();
590       group.add(fileInfo);
591 
592       // change direction when at the end or the beginning
593       g += dir;
594       if (g == ngroups) {
595         dir = -1;
596         g = ngroups - 1;
597       } else if (g < 0) {
598         dir = 1;
599         g = 0;
600       }
601     }
602 
603     if (LOG.isDebugEnabled()) {
604       for (int i = 0; i < sizeGroups.length; ++i) {
605         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
606       }
607     }
608 
609     return fileGroups;
610   }
611 
612   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
613     @Override
614     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
615         TaskAttemptContext tac) throws IOException, InterruptedException {
616       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
617     }
618 
619     @Override
620     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
621       Configuration conf = context.getConfiguration();
622       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
623       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
624       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
625 
626       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
627       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
628       if (mappers == 0 && snapshotFiles.size() > 0) {
629         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
630         mappers = Math.min(mappers, snapshotFiles.size());
631         conf.setInt(CONF_NUM_SPLITS, mappers);
632         conf.setInt(MR_NUM_MAPS, mappers);
633       }
634 
635       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
636       List<InputSplit> splits = new ArrayList(groups.size());
637       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
638         splits.add(new ExportSnapshotInputSplit(files));
639       }
640       return splits;
641     }
642 
643     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
644       private List<Pair<BytesWritable, Long>> files;
645       private long length;
646 
647       public ExportSnapshotInputSplit() {
648         this.files = null;
649       }
650 
651       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
652         this.files = new ArrayList(snapshotFiles.size());
653         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
654           this.files.add(new Pair<BytesWritable, Long>(
655             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
656           this.length += fileInfo.getSecond();
657         }
658       }
659 
660       private List<Pair<BytesWritable, Long>> getSplitKeys() {
661         return files;
662       }
663 
664       @Override
665       public long getLength() throws IOException, InterruptedException {
666         return length;
667       }
668 
669       @Override
670       public String[] getLocations() throws IOException, InterruptedException {
671         return new String[] {};
672       }
673 
674       @Override
675       public void readFields(DataInput in) throws IOException {
676         int count = in.readInt();
677         files = new ArrayList<Pair<BytesWritable, Long>>(count);
678         length = 0;
679         for (int i = 0; i < count; ++i) {
680           BytesWritable fileInfo = new BytesWritable();
681           fileInfo.readFields(in);
682           long size = in.readLong();
683           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
684           length += size;
685         }
686       }
687 
688       @Override
689       public void write(DataOutput out) throws IOException {
690         out.writeInt(files.size());
691         for (final Pair<BytesWritable, Long> fileInfo: files) {
692           fileInfo.getFirst().write(out);
693           out.writeLong(fileInfo.getSecond());
694         }
695       }
696     }
697 
698     private static class ExportSnapshotRecordReader
699         extends RecordReader<BytesWritable, NullWritable> {
700       private final List<Pair<BytesWritable, Long>> files;
701       private long totalSize = 0;
702       private long procSize = 0;
703       private int index = -1;
704 
705       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
706         this.files = files;
707         for (Pair<BytesWritable, Long> fileInfo: files) {
708           totalSize += fileInfo.getSecond();
709         }
710       }
711 
712       @Override
713       public void close() { }
714 
715       @Override
716       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
717 
718       @Override
719       public NullWritable getCurrentValue() { return NullWritable.get(); }
720 
721       @Override
722       public float getProgress() { return (float)procSize / totalSize; }
723 
724       @Override
725       public void initialize(InputSplit split, TaskAttemptContext tac) { }
726 
727       @Override
728       public boolean nextKeyValue() {
729         if (index >= 0) {
730           procSize += files.get(index).getSecond();
731         }
732         return(++index < files.size());
733       }
734     }
735   }
736 
737   // ==========================================================================
738   //  Tool
739   // ==========================================================================
740 
741   /**
742    * Run Map-Reduce Job to perform the files copy.
743    */
744   private void runCopyJob(final Path inputRoot, final Path outputRoot,
745       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
746       final String filesUser, final String filesGroup, final int filesMode,
747       final int mappers, final int bandwidthMB)
748           throws IOException, InterruptedException, ClassNotFoundException {
749     Configuration conf = getConf();
750     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
751     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
752     if (mappers > 0) {
753       conf.setInt(CONF_NUM_SPLITS, mappers);
754       conf.setInt(MR_NUM_MAPS, mappers);
755     }
756     conf.setInt(CONF_FILES_MODE, filesMode);
757     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
758     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
759     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
760     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
761     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
762     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
763 
764     Job job = new Job(conf);
765     job.setJobName("ExportSnapshot-" + snapshotName);
766     job.setJarByClass(ExportSnapshot.class);
767     TableMapReduceUtil.addDependencyJars(job);
768     job.setMapperClass(ExportMapper.class);
769     job.setInputFormatClass(ExportSnapshotInputFormat.class);
770     job.setOutputFormatClass(NullOutputFormat.class);
771     job.setMapSpeculativeExecution(false);
772     job.setNumReduceTasks(0);
773 
774     // Acquire the delegation Tokens
775     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
776       new Path[] { inputRoot, outputRoot }, conf);
777 
778     // Run the MR Job
779     if (!job.waitForCompletion(true)) {
780       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
781       // when it will be available on all the supported versions.
782       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
783     }
784   }
785 
786   private void verifySnapshot(final Configuration baseConf,
787       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
788     // Update the conf with the current root dir, since may be a different cluster
789     Configuration conf = new Configuration(baseConf);
790     FSUtils.setRootDir(conf, rootDir);
791     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
792     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
793     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
794   }
795 
796   /**
797    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
798    * @return 0 on success, and != 0 upon failure.
799    */
800   @Override
801   public int run(String[] args) throws IOException {
802     boolean verifyTarget = true;
803     boolean verifyChecksum = true;
804     String snapshotName = null;
805     String targetName = null;
806     boolean overwrite = false;
807     String filesGroup = null;
808     String filesUser = null;
809     Path outputRoot = null;
810     int bandwidthMB = Integer.MAX_VALUE;
811     int filesMode = 0;
812     int mappers = 0;
813 
814     Configuration conf = getConf();
815     Path inputRoot = FSUtils.getRootDir(conf);
816 
817     // Process command line args
818     for (int i = 0; i < args.length; i++) {
819       String cmd = args[i];
820       if (cmd.equals("-snapshot")) {
821         snapshotName = args[++i];
822       } else if (cmd.equals("-target")) {
823         targetName = args[++i];
824       } else if (cmd.equals("-copy-to")) {
825         outputRoot = new Path(args[++i]);
826       } else if (cmd.equals("-copy-from")) {
827         inputRoot = new Path(args[++i]);
828         FSUtils.setRootDir(conf, inputRoot);
829       } else if (cmd.equals("-no-checksum-verify")) {
830         verifyChecksum = false;
831       } else if (cmd.equals("-no-target-verify")) {
832         verifyTarget = false;
833       } else if (cmd.equals("-mappers")) {
834         mappers = Integer.parseInt(args[++i]);
835       } else if (cmd.equals("-chuser")) {
836         filesUser = args[++i];
837       } else if (cmd.equals("-chgroup")) {
838         filesGroup = args[++i];
839       } else if (cmd.equals("-bandwidth")) {
840         bandwidthMB = Integer.parseInt(args[++i]);
841       } else if (cmd.equals("-chmod")) {
842         filesMode = Integer.parseInt(args[++i], 8);
843       } else if (cmd.equals("-overwrite")) {
844         overwrite = true;
845       } else if (cmd.equals("-h") || cmd.equals("--help")) {
846         printUsageAndExit();
847       } else {
848         System.err.println("UNEXPECTED: " + cmd);
849         printUsageAndExit();
850       }
851     }
852 
853     // Check user options
854     if (snapshotName == null) {
855       System.err.println("Snapshot name not provided.");
856       printUsageAndExit();
857     }
858 
859     if (outputRoot == null) {
860       System.err.println("Destination file-system not provided.");
861       printUsageAndExit();
862     }
863 
864     if (targetName == null) {
865       targetName = snapshotName;
866     }
867 
868     conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
869     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
870     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
871     conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
872     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
873     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
874 
875     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
876 
877     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
878     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
879     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
880     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
881 
882     // Check if the snapshot already exists
883     if (outputFs.exists(outputSnapshotDir)) {
884       if (overwrite) {
885         if (!outputFs.delete(outputSnapshotDir, true)) {
886           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
887           return 1;
888         }
889       } else {
890         System.err.println("The snapshot '" + targetName +
891           "' already exists in the destination: " + outputSnapshotDir);
892         return 1;
893       }
894     }
895 
896     if (!skipTmp) {
897       // Check if the snapshot already in-progress
898       if (outputFs.exists(snapshotTmpDir)) {
899         if (overwrite) {
900           if (!outputFs.delete(snapshotTmpDir, true)) {
901             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
902             return 1;
903           }
904         } else {
905           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
906           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
907           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
908           return 1;
909         }
910       }
911     }
912 
913     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
914     // The snapshot references must be copied before the hfiles otherwise the cleaner
915     // will remove them because they are unreferenced.
916     try {
917       LOG.info("Copy Snapshot Manifest");
918       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
919     } catch (IOException e) {
920       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
921         snapshotDir + " to=" + initialOutputSnapshotDir, e);
922     }
923 
924     // Write a new .snapshotinfo if the target name is different from the source name
925     if (!targetName.equals(snapshotName)) {
926       SnapshotDescription snapshotDesc =
927         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
928           .toBuilder()
929           .setName(targetName)
930           .build();
931       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
932     }
933 
934     // Step 2 - Start MR Job to copy files
935     // The snapshot references must be copied before the files otherwise the files gets removed
936     // by the HFileArchiver, since they have no references.
937     try {
938       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
939                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
940 
941       LOG.info("Finalize the Snapshot Export");
942       if (!skipTmp) {
943         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
944         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
945           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
946             snapshotTmpDir + " to=" + outputSnapshotDir);
947         }
948       }
949 
950       // Step 4 - Verify snapshot integrity
951       if (verifyTarget) {
952         LOG.info("Verify snapshot integrity");
953         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
954       }
955 
956       LOG.info("Export Completed: " + targetName);
957       return 0;
958     } catch (Exception e) {
959       LOG.error("Snapshot export failed", e);
960       if (!skipTmp) {
961         outputFs.delete(snapshotTmpDir, true);
962       }
963       outputFs.delete(outputSnapshotDir, true);
964       return 1;
965     } finally {
966       IOUtils.closeStream(inputFs);
967       IOUtils.closeStream(outputFs);
968     }
969   }
970 
971   // ExportSnapshot
972   private void printUsageAndExit() {
973     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
974     System.err.println(" where [options] are:");
975     System.err.println("  -h|-help                Show this help and exit.");
976     System.err.println("  -snapshot NAME          Snapshot to restore.");
977     System.err.println("  -copy-to NAME           Remote destination hdfs://");
978     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
979     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
980     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
981         "exported snapshot.");
982     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
983     System.err.println("  -chuser USERNAME        Change the owner of the files " +
984         "to the specified one.");
985     System.err.println("  -chgroup GROUP          Change the group of the files to " +
986         "the specified one.");
987     System.err.println("  -chmod MODE             Change the permission of the files " +
988         "to the specified one.");
989     System.err.println("  -mappers                Number of mappers to use during the " +
990         "copy (mapreduce.job.maps).");
991     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
992     System.err.println();
993     System.err.println("Examples:");
994     System.err.println("  hbase " + getClass().getName() + " \\");
995     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
996     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
997     System.err.println();
998     System.err.println("  hbase " + getClass().getName() + " \\");
999     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1000     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1001     System.exit(1);
1002   }
1003 
1004   /**
1005    * The guts of the {@link #main} method.
1006    * Call this method to avoid the {@link #main(String[])} System.exit.
1007    * @param args
1008    * @return errCode
1009    * @throws Exception
1010    */
1011   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1012     return ToolRunner.run(conf, new ExportSnapshot(), args);
1013   }
1014 
1015   public static void main(String[] args) throws Exception {
1016     System.exit(innerMain(HBaseConfiguration.create(), args));
1017   }
1018 }