View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.net.URI;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.Comparator;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Random;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.classification.InterfaceAudience;
38  import org.apache.hadoop.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.conf.Configured;
41  import org.apache.hadoop.fs.FSDataInputStream;
42  import org.apache.hadoop.fs.FSDataOutputStream;
43  import org.apache.hadoop.fs.FileChecksum;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.FileUtil;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.fs.permission.FsPermission;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.io.FileLink;
54  import org.apache.hadoop.hbase.io.HFileLink;
55  import org.apache.hadoop.hbase.io.HLogLink;
56  import org.apache.hadoop.hbase.mapreduce.JobUtil;
57  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
58  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
59  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
60  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.Pair;
64  import org.apache.hadoop.io.BytesWritable;
65  import org.apache.hadoop.io.NullWritable;
66  import org.apache.hadoop.io.SequenceFile;
67  import org.apache.hadoop.io.Writable;
68  import org.apache.hadoop.mapreduce.Job;
69  import org.apache.hadoop.mapreduce.JobContext;
70  import org.apache.hadoop.mapreduce.Mapper;
71  import org.apache.hadoop.mapreduce.InputFormat;
72  import org.apache.hadoop.mapreduce.InputSplit;
73  import org.apache.hadoop.mapreduce.RecordReader;
74  import org.apache.hadoop.mapreduce.TaskAttemptContext;
75  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
76  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
77  import org.apache.hadoop.mapreduce.security.TokenCache;
78  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
79  import org.apache.hadoop.util.StringUtils;
80  import org.apache.hadoop.util.Tool;
81  import org.apache.hadoop.util.ToolRunner;
82  
83  /**
84   * Export the specified snapshot to a given FileSystem.
85   *
86   * The .snapshot/name folder is copied to the destination cluster
87   * and then all the hfiles/hlogs are copied using a Map-Reduce Job in the .archive/ location.
88   * When everything is done, the second cluster can restore the snapshot.
89   */
90  @InterfaceAudience.Public
91  @InterfaceStability.Evolving
92  public final class ExportSnapshot extends Configured implements Tool {
93    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
94  
95    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
96    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
97    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
98    private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
99    private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
100   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
101   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
102   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
103   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
104   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
105   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
106   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
107   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
108   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
109 
110   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
111   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
112 
113   private static final String INPUT_FOLDER_PREFIX = "export-files.";
114 
115   // Export Map-Reduce Counters, to keep track of the progress
116   public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED, FILES_COPIED };
117 
118   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
119                                                    NullWritable, NullWritable> {
120     final static int REPORT_SIZE = 1 * 1024 * 1024;
121     final static int BUFFER_SIZE = 64 * 1024;
122 
123     private boolean testFailures;
124     private Random random;
125 
126     private boolean verifyChecksum;
127     private String filesGroup;
128     private String filesUser;
129     private short filesMode;
130     private int bufferSize;
131 
132     private FileSystem outputFs;
133     private Path outputArchive;
134     private Path outputRoot;
135 
136     private FileSystem inputFs;
137     private Path inputArchive;
138     private Path inputRoot;
139 
140     @Override
141     public void setup(Context context) throws IOException {
142       Configuration conf = context.getConfiguration();
143       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
144 
145       filesGroup = conf.get(CONF_FILES_GROUP);
146       filesUser = conf.get(CONF_FILES_USER);
147       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
148       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
149       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
150 
151       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
152       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
153 
154       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
155 
156       try {
157         inputFs = FileSystem.get(inputRoot.toUri(), conf);
158       } catch (IOException e) {
159         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
160       }
161 
162       try {
163         outputFs = FileSystem.get(outputRoot.toUri(), conf);
164       } catch (IOException e) {
165         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
166       }
167 
168       // Use the default block size of the outputFs if bigger
169       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
170       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
171       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
172     }
173 
174     @Override
175     public void map(BytesWritable key, NullWritable value, Context context)
176         throws InterruptedException, IOException {
177       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
178       Path outputPath = getOutputPath(inputInfo);
179 
180       copyFile(context, inputInfo, outputPath);
181     }
182 
183     /**
184      * Returns the location where the inputPath will be copied.
185      */
186     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
187       Path path = null;
188       switch (inputInfo.getType()) {
189         case HFILE:
190           Path inputPath = new Path(inputInfo.getHfile());
191           String family = inputPath.getParent().getName();
192           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
193           String region = HFileLink.getReferencedRegionName(inputPath.getName());
194           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
195           path = new Path(FSUtils.getTableDir(new Path("./"), table),
196               new Path(region, new Path(family, hfile)));
197           break;
198         case WAL:
199           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
200           path = new Path(oldLogsDir, inputInfo.getWalName());
201           break;
202         default:
203           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
204       }
205       return new Path(outputArchive, path);
206     }
207 
208     /*
209      * Used by TestExportSnapshot to simulate a failure
210      */
211     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
212         throws IOException {
213       if (testFailures) {
214         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
215           if (random == null) {
216             random = new Random();
217           }
218 
219           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
220           // retry, but at least we reduce the number of test failures due to
221           // this test exception from the same map task.
222           if (random.nextFloat() < 0.03) {
223             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
224                                   + " time=" + System.currentTimeMillis());
225           }
226         } else {
227           context.getCounter(Counter.COPY_FAILED).increment(1);
228           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
229         }
230       }
231     }
232 
233     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
234         final Path outputPath) throws IOException {
235       injectTestFailure(context, inputInfo);
236 
237       // Get the file information
238       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
239 
240       // Verify if the output file exists and is the same that we want to copy
241       if (outputFs.exists(outputPath)) {
242         FileStatus outputStat = outputFs.getFileStatus(outputPath);
243         if (outputStat != null && sameFile(inputStat, outputStat)) {
244           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
245           return;
246         }
247       }
248 
249       InputStream in = openSourceFile(context, inputInfo);
250       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
251       if (Integer.MAX_VALUE != bandwidthMB) {
252         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
253       }
254 
255       try {
256         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
257 
258         // Ensure that the output folder is there and copy the file
259         outputFs.mkdirs(outputPath.getParent());
260         FSDataOutputStream out = outputFs.create(outputPath, true);
261         try {
262           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
263         } finally {
264           out.close();
265         }
266 
267         // Try to Preserve attributes
268         if (!preserveAttributes(outputPath, inputStat)) {
269           LOG.warn("You may have to run manually chown on: " + outputPath);
270         }
271       } finally {
272         in.close();
273       }
274     }
275 
276     /**
277      * Try to Preserve the files attribute selected by the user copying them from the source file
278      * This is only required when you are exporting as a different user than "hbase" or on a system
279      * that doesn't have the "hbase" user.
280      *
281      * This is not considered a blocking failure since the user can force a chmod with the user
282      * that knows is available on the system.
283      */
284     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
285       FileStatus stat;
286       try {
287         stat = outputFs.getFileStatus(path);
288       } catch (IOException e) {
289         LOG.warn("Unable to get the status for file=" + path);
290         return false;
291       }
292 
293       try {
294         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
295           outputFs.setPermission(path, new FsPermission(filesMode));
296         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
297           outputFs.setPermission(path, refStat.getPermission());
298         }
299       } catch (IOException e) {
300         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
301         return false;
302       }
303 
304       boolean hasRefStat = (refStat != null);
305       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
306       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
307       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
308         try {
309           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
310             outputFs.setOwner(path, user, group);
311           }
312         } catch (IOException e) {
313           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
314           LOG.warn("The user/group may not exist on the destination cluster: user=" +
315                    user + " group=" + group);
316           return false;
317         }
318       }
319 
320       return true;
321     }
322 
323     private boolean stringIsNotEmpty(final String str) {
324       return str != null && str.length() > 0;
325     }
326 
327     private void copyData(final Context context,
328         final Path inputPath, final InputStream in,
329         final Path outputPath, final FSDataOutputStream out,
330         final long inputFileSize)
331         throws IOException {
332       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
333                                    " (%.1f%%)";
334 
335       try {
336         byte[] buffer = new byte[bufferSize];
337         long totalBytesWritten = 0;
338         int reportBytes = 0;
339         int bytesRead;
340 
341         long stime = System.currentTimeMillis();
342         while ((bytesRead = in.read(buffer)) > 0) {
343           out.write(buffer, 0, bytesRead);
344           totalBytesWritten += bytesRead;
345           reportBytes += bytesRead;
346 
347           if (reportBytes >= REPORT_SIZE) {
348             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
349             context.setStatus(String.format(statusMessage,
350                               StringUtils.humanReadableInt(totalBytesWritten),
351                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
352                               " from " + inputPath + " to " + outputPath);
353             reportBytes = 0;
354           }
355         }
356         long etime = System.currentTimeMillis();
357 
358         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
359         context.setStatus(String.format(statusMessage,
360                           StringUtils.humanReadableInt(totalBytesWritten),
361                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
362                           " from " + inputPath + " to " + outputPath);
363 
364         // Verify that the written size match
365         if (totalBytesWritten != inputFileSize) {
366           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
367                        " expected=" + inputFileSize + " for file=" + inputPath;
368           throw new IOException(msg);
369         }
370 
371         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
372         LOG.info("size=" + totalBytesWritten +
373             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
374             " time=" + StringUtils.formatTimeDiff(etime, stime) +
375             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
376         context.getCounter(Counter.FILES_COPIED).increment(1);
377       } catch (IOException e) {
378         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
379         context.getCounter(Counter.COPY_FAILED).increment(1);
380         throw e;
381       }
382     }
383 
384     /**
385      * Try to open the "source" file.
386      * Throws an IOException if the communication with the inputFs fail or
387      * if the file is not found.
388      */
389     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
390         throws IOException {
391       try {
392         FileLink link = null;
393         switch (fileInfo.getType()) {
394           case HFILE:
395             Path inputPath = new Path(fileInfo.getHfile());
396             link = new HFileLink(inputRoot, inputArchive, inputPath);
397             break;
398           case WAL:
399             String serverName = fileInfo.getWalServer();
400             String logName = fileInfo.getWalName();
401             link = new HLogLink(inputRoot, serverName, logName);
402             break;
403           default:
404             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
405         }
406         return link.open(inputFs);
407       } catch (IOException e) {
408         context.getCounter(Counter.MISSING_FILES).increment(1);
409         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
410         throw e;
411       }
412     }
413 
414     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
415         throws IOException {
416       try {
417         FileLink link = null;
418         switch (fileInfo.getType()) {
419           case HFILE:
420             Path inputPath = new Path(fileInfo.getHfile());
421             link = new HFileLink(inputRoot, inputArchive, inputPath);
422             break;
423           case WAL:
424             link = new HLogLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
425             break;
426           default:
427             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
428         }
429         return link.getFileStatus(inputFs);
430       } catch (FileNotFoundException e) {
431         context.getCounter(Counter.MISSING_FILES).increment(1);
432         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
433         throw e;
434       } catch (IOException e) {
435         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
436         throw e;
437       }
438     }
439 
440     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
441       try {
442         return fs.getFileChecksum(path);
443       } catch (IOException e) {
444         LOG.warn("Unable to get checksum for file=" + path, e);
445         return null;
446       }
447     }
448 
449     /**
450      * Check if the two files are equal by looking at the file length,
451      * and at the checksum (if user has specified the verifyChecksum flag).
452      */
453     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
454       // Not matching length
455       if (inputStat.getLen() != outputStat.getLen()) return false;
456 
457       // Mark files as equals, since user asked for no checksum verification
458       if (!verifyChecksum) return true;
459 
460       // If checksums are not available, files are not the same.
461       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
462       if (inChecksum == null) return false;
463 
464       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
465       if (outChecksum == null) return false;
466 
467       return inChecksum.equals(outChecksum);
468     }
469   }
470 
471   // ==========================================================================
472   //  Input Format
473   // ==========================================================================
474 
475   /**
476    * Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
477    * @return list of files referenced by the snapshot (pair of path and size)
478    */
479   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
480       final FileSystem fs, final Path snapshotDir) throws IOException {
481     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
482 
483     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
484     final TableName table = TableName.valueOf(snapshotDesc.getTable());
485 
486     // Get snapshot files
487     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
488     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
489       new SnapshotReferenceUtil.SnapshotVisitor() {
490         @Override
491         public void storeFile(final HRegionInfo regionInfo, final String family,
492             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
493           if (storeFile.hasReference()) {
494             // copied as part of the manifest
495           } else {
496             String region = regionInfo.getEncodedName();
497             String hfile = storeFile.getName();
498             Path path = HFileLink.createPath(table, region, family, hfile);
499 
500             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
501               .setType(SnapshotFileInfo.Type.HFILE)
502               .setHfile(path.toString())
503               .build();
504 
505             long size;
506             if (storeFile.hasFileSize()) {
507               size = storeFile.getFileSize();
508             } else {
509               size = new HFileLink(conf, path).getFileStatus(fs).getLen();
510             }
511             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
512           }
513         }
514 
515         @Override
516         public void logFile (final String server, final String logfile)
517             throws IOException {
518           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
519             .setType(SnapshotFileInfo.Type.WAL)
520             .setWalServer(server)
521             .setWalName(logfile)
522             .build();
523 
524           long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
525           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
526         }
527     });
528 
529     return files;
530   }
531 
532   /**
533    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
534    * The groups created will have similar amounts of bytes.
535    * <p>
536    * The algorithm used is pretty straightforward; the file list is sorted by size,
537    * and then each group fetch the bigger file available, iterating through groups
538    * alternating the direction.
539    */
540   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
541       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
542     // Sort files by size, from small to big
543     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
544       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
545         long r = a.getSecond() - b.getSecond();
546         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
547       }
548     });
549 
550     // create balanced groups
551     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
552       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
553     long[] sizeGroups = new long[ngroups];
554     int hi = files.size() - 1;
555     int lo = 0;
556 
557     List<Pair<SnapshotFileInfo, Long>> group;
558     int dir = 1;
559     int g = 0;
560 
561     while (hi >= lo) {
562       if (g == fileGroups.size()) {
563         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
564         fileGroups.add(group);
565       } else {
566         group = fileGroups.get(g);
567       }
568 
569       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
570 
571       // add the hi one
572       sizeGroups[g] += fileInfo.getSecond();
573       group.add(fileInfo);
574 
575       // change direction when at the end or the beginning
576       g += dir;
577       if (g == ngroups) {
578         dir = -1;
579         g = ngroups - 1;
580       } else if (g < 0) {
581         dir = 1;
582         g = 0;
583       }
584     }
585 
586     if (LOG.isDebugEnabled()) {
587       for (int i = 0; i < sizeGroups.length; ++i) {
588         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
589       }
590     }
591 
592     return fileGroups;
593   }
594 
595   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
596     @Override
597     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
598         TaskAttemptContext tac) throws IOException, InterruptedException {
599       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
600     }
601 
602     @Override
603     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
604       Configuration conf = context.getConfiguration();
605       String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
606       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
607       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
608 
609       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
610       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
611       if (mappers == 0 && snapshotFiles.size() > 0) {
612         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
613         mappers = Math.min(mappers, snapshotFiles.size());
614         conf.setInt(CONF_NUM_SPLITS, mappers);
615         conf.setInt(MR_NUM_MAPS, mappers);
616       }
617 
618       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
619       List<InputSplit> splits = new ArrayList(groups.size());
620       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
621         splits.add(new ExportSnapshotInputSplit(files));
622       }
623       return splits;
624     }
625 
626     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
627       private List<Pair<BytesWritable, Long>> files;
628       private long length;
629 
630       public ExportSnapshotInputSplit() {
631         this.files = null;
632       }
633 
634       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
635         this.files = new ArrayList(snapshotFiles.size());
636         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
637           this.files.add(new Pair<BytesWritable, Long>(
638             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
639           this.length += fileInfo.getSecond();
640         }
641       }
642 
643       private List<Pair<BytesWritable, Long>> getSplitKeys() {
644         return files;
645       }
646 
647       @Override
648       public long getLength() throws IOException, InterruptedException {
649         return length;
650       }
651 
652       @Override
653       public String[] getLocations() throws IOException, InterruptedException {
654         return new String[] {};
655       }
656 
657       @Override
658       public void readFields(DataInput in) throws IOException {
659         int count = in.readInt();
660         files = new ArrayList<Pair<BytesWritable, Long>>(count);
661         length = 0;
662         for (int i = 0; i < count; ++i) {
663           BytesWritable fileInfo = new BytesWritable();
664           fileInfo.readFields(in);
665           long size = in.readLong();
666           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
667           length += size;
668         }
669       }
670 
671       @Override
672       public void write(DataOutput out) throws IOException {
673         out.writeInt(files.size());
674         for (final Pair<BytesWritable, Long> fileInfo: files) {
675           fileInfo.getFirst().write(out);
676           out.writeLong(fileInfo.getSecond());
677         }
678       }
679     }
680 
681     private static class ExportSnapshotRecordReader
682         extends RecordReader<BytesWritable, NullWritable> {
683       private final List<Pair<BytesWritable, Long>> files;
684       private long totalSize = 0;
685       private long procSize = 0;
686       private int index = -1;
687 
688       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
689         this.files = files;
690         for (Pair<BytesWritable, Long> fileInfo: files) {
691           totalSize += fileInfo.getSecond();
692         }
693       }
694 
695       @Override
696       public void close() { }
697 
698       @Override
699       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
700 
701       @Override
702       public NullWritable getCurrentValue() { return NullWritable.get(); }
703 
704       @Override
705       public float getProgress() { return (float)procSize / totalSize; }
706 
707       @Override
708       public void initialize(InputSplit split, TaskAttemptContext tac) { }
709 
710       @Override
711       public boolean nextKeyValue() {
712         if (index >= 0) {
713           procSize += files.get(index).getSecond();
714         }
715         return(++index < files.size());
716       }
717     }
718   }
719 
720   // ==========================================================================
721   //  Tool
722   // ==========================================================================
723 
724   /**
725    * Run Map-Reduce Job to perform the files copy.
726    */
727   private void runCopyJob(final Path inputRoot, final Path outputRoot,
728       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
729       final String filesUser, final String filesGroup, final int filesMode,
730       final int mappers, final int bandwidthMB)
731           throws IOException, InterruptedException, ClassNotFoundException {
732     Configuration conf = getConf();
733     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
734     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
735     if (mappers > 0) {
736       conf.setInt(CONF_NUM_SPLITS, mappers);
737       conf.setInt(MR_NUM_MAPS, mappers);
738     }
739     conf.setInt(CONF_FILES_MODE, filesMode);
740     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
741     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
742     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
743     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
744     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
745     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
746 
747     Job job = new Job(conf);
748     job.setJobName("ExportSnapshot-" + snapshotName);
749     job.setJarByClass(ExportSnapshot.class);
750     TableMapReduceUtil.addDependencyJars(job);
751     job.setMapperClass(ExportMapper.class);
752     job.setInputFormatClass(ExportSnapshotInputFormat.class);
753     job.setOutputFormatClass(NullOutputFormat.class);
754     job.setMapSpeculativeExecution(false);
755     job.setNumReduceTasks(0);
756 
757     // Acquire the delegation Tokens
758     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
759       new Path[] { inputRoot, outputRoot }, conf);
760 
761     // Run the MR Job
762     if (!job.waitForCompletion(true)) {
763       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
764       // when it will be available on all the supported versions.
765       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
766     }
767   }
768 
769   private void verifySnapshot(final Configuration baseConf,
770       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
771     // Update the conf with the current root dir, since may be a different cluster
772     Configuration conf = new Configuration(baseConf);
773     FSUtils.setRootDir(conf, rootDir);
774     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
775     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
776     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
777   }
778 
779   /**
780    * Execute the export snapshot by copying the snapshot metadata, hfiles and hlogs.
781    * @return 0 on success, and != 0 upon failure.
782    */
783   @Override
784   public int run(String[] args) throws IOException {
785     boolean verifyTarget = true;
786     boolean verifyChecksum = true;
787     String snapshotName = null;
788     String targetName = null;
789     boolean overwrite = false;
790     String filesGroup = null;
791     String filesUser = null;
792     Path outputRoot = null;
793     int bandwidthMB = Integer.MAX_VALUE;
794     int filesMode = 0;
795     int mappers = 0;
796 
797     Configuration conf = getConf();
798     Path inputRoot = FSUtils.getRootDir(conf);
799 
800     // Process command line args
801     for (int i = 0; i < args.length; i++) {
802       String cmd = args[i];
803       if (cmd.equals("-snapshot")) {
804         snapshotName = args[++i];
805       } else if (cmd.equals("-target")) {
806         targetName = args[++i];
807       } else if (cmd.equals("-copy-to")) {
808         outputRoot = new Path(args[++i]);
809       } else if (cmd.equals("-copy-from")) {
810         inputRoot = new Path(args[++i]);
811       } else if (cmd.equals("-no-checksum-verify")) {
812         verifyChecksum = false;
813       } else if (cmd.equals("-no-target-verify")) {
814         verifyTarget = false;
815       } else if (cmd.equals("-mappers")) {
816         mappers = Integer.parseInt(args[++i]);
817       } else if (cmd.equals("-chuser")) {
818         filesUser = args[++i];
819       } else if (cmd.equals("-chgroup")) {
820         filesGroup = args[++i];
821       } else if (cmd.equals("-bandwidth")) {
822         bandwidthMB = Integer.parseInt(args[++i]);
823       } else if (cmd.equals("-chmod")) {
824         filesMode = Integer.parseInt(args[++i], 8);
825       } else if (cmd.equals("-overwrite")) {
826         overwrite = true;
827       } else if (cmd.equals("-h") || cmd.equals("--help")) {
828         printUsageAndExit();
829       } else {
830         System.err.println("UNEXPECTED: " + cmd);
831         printUsageAndExit();
832       }
833     }
834 
835     // Check user options
836     if (snapshotName == null) {
837       System.err.println("Snapshot name not provided.");
838       printUsageAndExit();
839     }
840 
841     if (outputRoot == null) {
842       System.err.println("Destination file-system not provided.");
843       printUsageAndExit();
844     }
845 
846     if (targetName == null) {
847       targetName = snapshotName;
848     }
849 
850     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
851     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
852     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
853     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
854 
855     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
856 
857     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
858     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
859     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
860     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
861 
862     // Check if the snapshot already exists
863     if (outputFs.exists(outputSnapshotDir)) {
864       if (overwrite) {
865         if (!outputFs.delete(outputSnapshotDir, true)) {
866           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
867           return 1;
868         }
869       } else {
870         System.err.println("The snapshot '" + targetName +
871           "' already exists in the destination: " + outputSnapshotDir);
872         return 1;
873       }
874     }
875 
876     if (!skipTmp) {
877       // Check if the snapshot already in-progress
878       if (outputFs.exists(snapshotTmpDir)) {
879         if (overwrite) {
880           if (!outputFs.delete(snapshotTmpDir, true)) {
881             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
882             return 1;
883           }
884         } else {
885           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
886           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
887           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
888           return 1;
889         }
890       }
891     }
892 
893     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
894     // The snapshot references must be copied before the hfiles otherwise the cleaner
895     // will remove them because they are unreferenced.
896     try {
897       LOG.info("Copy Snapshot Manifest");
898       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
899     } catch (IOException e) {
900       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
901         snapshotDir + " to=" + initialOutputSnapshotDir, e);
902     }
903 
904     // Write a new .snapshotinfo if the target name is different from the source name
905     if (!targetName.equals(snapshotName)) {
906       SnapshotDescription snapshotDesc =
907         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
908           .toBuilder()
909           .setName(targetName)
910           .build();
911       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
912     }
913 
914     // Step 2 - Start MR Job to copy files
915     // The snapshot references must be copied before the files otherwise the files gets removed
916     // by the HFileArchiver, since they have no references.
917     try {
918       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
919                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
920 
921       LOG.info("Finalize the Snapshot Export");
922       if (!skipTmp) {
923         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
924         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
925           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
926             snapshotTmpDir + " to=" + outputSnapshotDir);
927         }
928       }
929 
930       // Step 4 - Verify snapshot integrity
931       if (verifyTarget) {
932         LOG.info("Verify snapshot integrity");
933         verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
934       }
935 
936       LOG.info("Export Completed: " + targetName);
937       return 0;
938     } catch (Exception e) {
939       LOG.error("Snapshot export failed", e);
940       if (!skipTmp) {
941         outputFs.delete(snapshotTmpDir, true);
942       }
943       outputFs.delete(outputSnapshotDir, true);
944       return 1;
945     }
946   }
947 
948   // ExportSnapshot
949   private void printUsageAndExit() {
950     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
951     System.err.println(" where [options] are:");
952     System.err.println("  -h|-help                Show this help and exit.");
953     System.err.println("  -snapshot NAME          Snapshot to restore.");
954     System.err.println("  -copy-to NAME           Remote destination hdfs://");
955     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
956     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
957     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
958         "exported snapshot.");
959     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
960     System.err.println("  -chuser USERNAME        Change the owner of the files " +
961         "to the specified one.");
962     System.err.println("  -chgroup GROUP          Change the group of the files to " +
963         "the specified one.");
964     System.err.println("  -chmod MODE             Change the permission of the files " +
965         "to the specified one.");
966     System.err.println("  -mappers                Number of mappers to use during the " +
967         "copy (mapreduce.job.maps).");
968     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
969     System.err.println();
970     System.err.println("Examples:");
971     System.err.println("  hbase " + getClass().getName() + " \\");
972     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
973     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
974     System.err.println();
975     System.err.println("  hbase " + getClass().getName() + " \\");
976     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
977     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
978     System.exit(1);
979   }
980 
981   /**
982    * The guts of the {@link #main} method.
983    * Call this method to avoid the {@link #main(String[])} System.exit.
984    * @param args
985    * @return errCode
986    * @throws Exception
987    */
988   static int innerMain(final Configuration conf, final String [] args) throws Exception {
989     return ToolRunner.run(conf, new ExportSnapshot(), args);
990   }
991 
992   public static void main(String[] args) throws Exception {
993     System.exit(innerMain(HBaseConfiguration.create(), args));
994   }
995 }