View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.io.BytesWritable;
62  import org.apache.hadoop.io.IOUtils;
63  import org.apache.hadoop.io.NullWritable;
64  import org.apache.hadoop.io.Writable;
65  import org.apache.hadoop.mapreduce.Job;
66  import org.apache.hadoop.mapreduce.JobContext;
67  import org.apache.hadoop.mapreduce.Mapper;
68  import org.apache.hadoop.mapreduce.InputFormat;
69  import org.apache.hadoop.mapreduce.InputSplit;
70  import org.apache.hadoop.mapreduce.RecordReader;
71  import org.apache.hadoop.mapreduce.TaskAttemptContext;
72  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73  import org.apache.hadoop.mapreduce.security.TokenCache;
74  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75  import org.apache.hadoop.util.StringUtils;
76  import org.apache.hadoop.util.Tool;
77  import org.apache.hadoop.util.ToolRunner;
78  
79  /**
80   * Export the specified snapshot to a given FileSystem.
81   *
82   * The .snapshot/name folder is copied to the destination cluster
83   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
84   * When everything is done, the second cluster can restore the snapshot.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class ExportSnapshot extends Configured implements Tool {
89    public static final String NAME = "exportsnapshot";
90    /** Configuration prefix for overrides for the source filesystem */
91    public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
92    /** Configuration prefix for overrides for the destination filesystem */
93    public static final String CONF_DEST_PREFIX = NAME + ".to.";
94  
95    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
96  
97    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
98    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
99    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100   private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101   private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
111 
112   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
113   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
114 
115   private static final String INPUT_FOLDER_PREFIX = "export-files.";
116 
117   // Export Map-Reduce Counters, to keep track of the progress
118   public enum Counter {
119     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121   }
122 
123   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124                                                    NullWritable, NullWritable> {
125     final static int REPORT_SIZE = 1 * 1024 * 1024;
126     final static int BUFFER_SIZE = 64 * 1024;
127 
128     private boolean testFailures;
129     private Random random;
130 
131     private boolean verifyChecksum;
132     private String filesGroup;
133     private String filesUser;
134     private short filesMode;
135     private int bufferSize;
136 
137     private FileSystem outputFs;
138     private Path outputArchive;
139     private Path outputRoot;
140 
141     private FileSystem inputFs;
142     private Path inputArchive;
143     private Path inputRoot;
144 
145     @Override
146     public void setup(Context context) throws IOException {
147       Configuration conf = context.getConfiguration();
148       Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
149       Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
150 
151       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
152 
153       filesGroup = conf.get(CONF_FILES_GROUP);
154       filesUser = conf.get(CONF_FILES_USER);
155       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
156       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
157       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
158 
159       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
160       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
161 
162       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
163 
164       try {
165         srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166         inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
169       }
170 
171       try {
172         destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
173         outputFs = FileSystem.get(outputRoot.toUri(), destConf);
174       } catch (IOException e) {
175         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
176       }
177 
178       // Use the default block size of the outputFs if bigger
179       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
180       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
181       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
182 
183       for (Counter c : Counter.values()) {
184         context.getCounter(c).increment(0);
185       }
186     }
187 
188     @Override
189     protected void cleanup(Context context) {
190       IOUtils.closeStream(inputFs);
191       IOUtils.closeStream(outputFs);
192     }
193 
194     @Override
195     public void map(BytesWritable key, NullWritable value, Context context)
196         throws InterruptedException, IOException {
197       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
198       Path outputPath = getOutputPath(inputInfo);
199 
200       copyFile(context, inputInfo, outputPath);
201     }
202 
203     /**
204      * Returns the location where the inputPath will be copied.
205      */
206     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
207       Path path = null;
208       switch (inputInfo.getType()) {
209         case HFILE:
210           Path inputPath = new Path(inputInfo.getHfile());
211           String family = inputPath.getParent().getName();
212           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
213           String region = HFileLink.getReferencedRegionName(inputPath.getName());
214           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
215           path = new Path(FSUtils.getTableDir(new Path("./"), table),
216               new Path(region, new Path(family, hfile)));
217           break;
218         case WAL:
219           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
220           path = new Path(oldLogsDir, inputInfo.getWalName());
221           break;
222         default:
223           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
224       }
225       return new Path(outputArchive, path);
226     }
227 
228     /*
229      * Used by TestExportSnapshot to simulate a failure
230      */
231     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
232         throws IOException {
233       if (testFailures) {
234         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
235           if (random == null) {
236             random = new Random();
237           }
238 
239           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
240           // retry, but at least we reduce the number of test failures due to
241           // this test exception from the same map task.
242           if (random.nextFloat() < 0.03) {
243             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
244                                   + " time=" + System.currentTimeMillis());
245           }
246         } else {
247           context.getCounter(Counter.COPY_FAILED).increment(1);
248           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
249         }
250       }
251     }
252 
253     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
254         final Path outputPath) throws IOException {
255       injectTestFailure(context, inputInfo);
256 
257       // Get the file information
258       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
259 
260       // Verify if the output file exists and is the same that we want to copy
261       if (outputFs.exists(outputPath)) {
262         FileStatus outputStat = outputFs.getFileStatus(outputPath);
263         if (outputStat != null && sameFile(inputStat, outputStat)) {
264           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
265           context.getCounter(Counter.FILES_SKIPPED).increment(1);
266           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
267           return;
268         }
269       }
270 
271       InputStream in = openSourceFile(context, inputInfo);
272       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
273       if (Integer.MAX_VALUE != bandwidthMB) {
274         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024L * 1024L);
275       }
276 
277       try {
278         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
279 
280         // Ensure that the output folder is there and copy the file
281         createOutputPath(outputPath.getParent());
282         FSDataOutputStream out = outputFs.create(outputPath, true);
283         try {
284           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
285         } finally {
286           out.close();
287         }
288 
289         // Try to Preserve attributes
290         if (!preserveAttributes(outputPath, inputStat)) {
291           LOG.warn("You may have to run manually chown on: " + outputPath);
292         }
293       } finally {
294         in.close();
295       }
296     }
297 
298     /**
299      * Create the output folder and optionally set ownership.
300      */
301     private void createOutputPath(final Path path) throws IOException {
302       if (filesUser == null && filesGroup == null) {
303         outputFs.mkdirs(path);
304       } else {
305         Path parent = path.getParent();
306         if (!outputFs.exists(parent) && !parent.isRoot()) {
307           createOutputPath(parent);
308         }
309         outputFs.mkdirs(path);
310         if (filesUser != null || filesGroup != null) {
311           // override the owner when non-null user/group is specified
312           outputFs.setOwner(path, filesUser, filesGroup);
313         }
314         if (filesMode > 0) {
315           outputFs.setPermission(path, new FsPermission(filesMode));
316         }
317       }
318     }
319 
320     /**
321      * Try to Preserve the files attribute selected by the user copying them from the source file
322      * This is only required when you are exporting as a different user than "hbase" or on a system
323      * that doesn't have the "hbase" user.
324      *
325      * This is not considered a blocking failure since the user can force a chmod with the user
326      * that knows is available on the system.
327      */
328     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
329       FileStatus stat;
330       try {
331         stat = outputFs.getFileStatus(path);
332       } catch (IOException e) {
333         LOG.warn("Unable to get the status for file=" + path);
334         return false;
335       }
336 
337       try {
338         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
339           outputFs.setPermission(path, new FsPermission(filesMode));
340         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
341           outputFs.setPermission(path, refStat.getPermission());
342         }
343       } catch (IOException e) {
344         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
345         return false;
346       }
347 
348       boolean hasRefStat = (refStat != null);
349       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
350       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
351       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
352         try {
353           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
354             outputFs.setOwner(path, user, group);
355           }
356         } catch (IOException e) {
357           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
358           LOG.warn("The user/group may not exist on the destination cluster: user=" +
359                    user + " group=" + group);
360           return false;
361         }
362       }
363 
364       return true;
365     }
366 
367     private boolean stringIsNotEmpty(final String str) {
368       return str != null && str.length() > 0;
369     }
370 
371     private void copyData(final Context context,
372         final Path inputPath, final InputStream in,
373         final Path outputPath, final FSDataOutputStream out,
374         final long inputFileSize)
375         throws IOException {
376       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
377                                    " (%.1f%%)";
378 
379       try {
380         byte[] buffer = new byte[bufferSize];
381         long totalBytesWritten = 0;
382         int reportBytes = 0;
383         int bytesRead;
384 
385         long stime = System.currentTimeMillis();
386         while ((bytesRead = in.read(buffer)) > 0) {
387           out.write(buffer, 0, bytesRead);
388           totalBytesWritten += bytesRead;
389           reportBytes += bytesRead;
390 
391           if (reportBytes >= REPORT_SIZE) {
392             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
393             context.setStatus(String.format(statusMessage,
394                               StringUtils.humanReadableInt(totalBytesWritten),
395                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
396                               " from " + inputPath + " to " + outputPath);
397             reportBytes = 0;
398           }
399         }
400         long etime = System.currentTimeMillis();
401 
402         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
403         context.setStatus(String.format(statusMessage,
404                           StringUtils.humanReadableInt(totalBytesWritten),
405                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
406                           " from " + inputPath + " to " + outputPath);
407 
408         // Verify that the written size match
409         if (totalBytesWritten != inputFileSize) {
410           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
411                        " expected=" + inputFileSize + " for file=" + inputPath;
412           throw new IOException(msg);
413         }
414 
415         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
416         LOG.info("size=" + totalBytesWritten +
417             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
418             " time=" + StringUtils.formatTimeDiff(etime, stime) +
419             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
420         context.getCounter(Counter.FILES_COPIED).increment(1);
421       } catch (IOException e) {
422         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
423         context.getCounter(Counter.COPY_FAILED).increment(1);
424         throw e;
425       }
426     }
427 
428     /**
429      * Try to open the "source" file.
430      * Throws an IOException if the communication with the inputFs fail or
431      * if the file is not found.
432      */
433     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
434             throws IOException {
435       try {
436         Configuration conf = context.getConfiguration();
437         FileLink link = null;
438         switch (fileInfo.getType()) {
439           case HFILE:
440             Path inputPath = new Path(fileInfo.getHfile());
441             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
442             break;
443           case WAL:
444             String serverName = fileInfo.getWalServer();
445             String logName = fileInfo.getWalName();
446             link = new WALLink(inputRoot, serverName, logName);
447             break;
448           default:
449             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
450         }
451         return link.open(inputFs);
452       } catch (IOException e) {
453         context.getCounter(Counter.MISSING_FILES).increment(1);
454         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
455         throw e;
456       }
457     }
458 
459     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
460         throws IOException {
461       try {
462         Configuration conf = context.getConfiguration();
463         FileLink link = null;
464         switch (fileInfo.getType()) {
465           case HFILE:
466             Path inputPath = new Path(fileInfo.getHfile());
467             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
468             break;
469           case WAL:
470             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
471             break;
472           default:
473             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
474         }
475         return link.getFileStatus(inputFs);
476       } catch (FileNotFoundException e) {
477         context.getCounter(Counter.MISSING_FILES).increment(1);
478         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
479         throw e;
480       } catch (IOException e) {
481         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
482         throw e;
483       }
484     }
485 
486     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
487       try {
488         return fs.getFileChecksum(path);
489       } catch (IOException e) {
490         LOG.warn("Unable to get checksum for file=" + path, e);
491         return null;
492       }
493     }
494 
495     /**
496      * Check if the two files are equal by looking at the file length,
497      * and at the checksum (if user has specified the verifyChecksum flag).
498      */
499     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
500       // Not matching length
501       if (inputStat.getLen() != outputStat.getLen()) return false;
502 
503       // Mark files as equals, since user asked for no checksum verification
504       if (!verifyChecksum) return true;
505 
506       // If checksums are not available, files are not the same.
507       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
508       if (inChecksum == null) return false;
509 
510       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
511       if (outChecksum == null) return false;
512 
513       return inChecksum.equals(outChecksum);
514     }
515   }
516 
517   // ==========================================================================
518   //  Input Format
519   // ==========================================================================
520 
521   /**
522    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
523    * @return list of files referenced by the snapshot (pair of path and size)
524    */
525   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
526       final FileSystem fs, final Path snapshotDir) throws IOException {
527     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
528 
529     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
530     final TableName table = TableName.valueOf(snapshotDesc.getTable());
531 
532     // Get snapshot files
533     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
534     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
535       new SnapshotReferenceUtil.SnapshotVisitor() {
536         @Override
537         public void storeFile(final HRegionInfo regionInfo, final String family,
538             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
539           if (storeFile.hasReference()) {
540             // copied as part of the manifest
541           } else {
542             String region = regionInfo.getEncodedName();
543             String hfile = storeFile.getName();
544             Path path = HFileLink.createPath(table, region, family, hfile);
545 
546             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
547               .setType(SnapshotFileInfo.Type.HFILE)
548               .setHfile(path.toString())
549               .build();
550 
551             long size;
552             if (storeFile.hasFileSize()) {
553               size = storeFile.getFileSize();
554             } else {
555               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
556             }
557             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
558           }
559         }
560 
561         @Override
562         public void logFile (final String server, final String logfile)
563             throws IOException {
564           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
565             .setType(SnapshotFileInfo.Type.WAL)
566             .setWalServer(server)
567             .setWalName(logfile)
568             .build();
569 
570           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
571           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
572         }
573     });
574 
575     return files;
576   }
577 
578   /**
579    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
580    * The groups created will have similar amounts of bytes.
581    * <p>
582    * The algorithm used is pretty straightforward; the file list is sorted by size,
583    * and then each group fetch the bigger file available, iterating through groups
584    * alternating the direction.
585    */
586   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
587       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
588     // Sort files by size, from small to big
589     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
590       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
591         long r = a.getSecond() - b.getSecond();
592         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
593       }
594     });
595 
596     // create balanced groups
597     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
598       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
599     long[] sizeGroups = new long[ngroups];
600     int hi = files.size() - 1;
601     int lo = 0;
602 
603     List<Pair<SnapshotFileInfo, Long>> group;
604     int dir = 1;
605     int g = 0;
606 
607     while (hi >= lo) {
608       if (g == fileGroups.size()) {
609         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
610         fileGroups.add(group);
611       } else {
612         group = fileGroups.get(g);
613       }
614 
615       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
616 
617       // add the hi one
618       sizeGroups[g] += fileInfo.getSecond();
619       group.add(fileInfo);
620 
621       // change direction when at the end or the beginning
622       g += dir;
623       if (g == ngroups) {
624         dir = -1;
625         g = ngroups - 1;
626       } else if (g < 0) {
627         dir = 1;
628         g = 0;
629       }
630     }
631 
632     if (LOG.isDebugEnabled()) {
633       for (int i = 0; i < sizeGroups.length; ++i) {
634         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
635       }
636     }
637 
638     return fileGroups;
639   }
640 
641   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
642     @Override
643     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
644         TaskAttemptContext tac) throws IOException, InterruptedException {
645       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
646     }
647 
648     @Override
649     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
650       Configuration conf = context.getConfiguration();
651       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
652       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
653 
654       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
655       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
656       if (mappers == 0 && snapshotFiles.size() > 0) {
657         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
658         mappers = Math.min(mappers, snapshotFiles.size());
659         conf.setInt(CONF_NUM_SPLITS, mappers);
660         conf.setInt(MR_NUM_MAPS, mappers);
661       }
662 
663       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
664       List<InputSplit> splits = new ArrayList(groups.size());
665       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
666         splits.add(new ExportSnapshotInputSplit(files));
667       }
668       return splits;
669     }
670 
671     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
672       private List<Pair<BytesWritable, Long>> files;
673       private long length;
674 
675       public ExportSnapshotInputSplit() {
676         this.files = null;
677       }
678 
679       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
680         this.files = new ArrayList(snapshotFiles.size());
681         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
682           this.files.add(new Pair<BytesWritable, Long>(
683             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
684           this.length += fileInfo.getSecond();
685         }
686       }
687 
688       private List<Pair<BytesWritable, Long>> getSplitKeys() {
689         return files;
690       }
691 
692       @Override
693       public long getLength() throws IOException, InterruptedException {
694         return length;
695       }
696 
697       @Override
698       public String[] getLocations() throws IOException, InterruptedException {
699         return new String[] {};
700       }
701 
702       @Override
703       public void readFields(DataInput in) throws IOException {
704         int count = in.readInt();
705         files = new ArrayList<Pair<BytesWritable, Long>>(count);
706         length = 0;
707         for (int i = 0; i < count; ++i) {
708           BytesWritable fileInfo = new BytesWritable();
709           fileInfo.readFields(in);
710           long size = in.readLong();
711           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
712           length += size;
713         }
714       }
715 
716       @Override
717       public void write(DataOutput out) throws IOException {
718         out.writeInt(files.size());
719         for (final Pair<BytesWritable, Long> fileInfo: files) {
720           fileInfo.getFirst().write(out);
721           out.writeLong(fileInfo.getSecond());
722         }
723       }
724     }
725 
726     private static class ExportSnapshotRecordReader
727         extends RecordReader<BytesWritable, NullWritable> {
728       private final List<Pair<BytesWritable, Long>> files;
729       private long totalSize = 0;
730       private long procSize = 0;
731       private int index = -1;
732 
733       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
734         this.files = files;
735         for (Pair<BytesWritable, Long> fileInfo: files) {
736           totalSize += fileInfo.getSecond();
737         }
738       }
739 
740       @Override
741       public void close() { }
742 
743       @Override
744       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
745 
746       @Override
747       public NullWritable getCurrentValue() { return NullWritable.get(); }
748 
749       @Override
750       public float getProgress() { return (float)procSize / totalSize; }
751 
752       @Override
753       public void initialize(InputSplit split, TaskAttemptContext tac) { }
754 
755       @Override
756       public boolean nextKeyValue() {
757         if (index >= 0) {
758           procSize += files.get(index).getSecond();
759         }
760         return(++index < files.size());
761       }
762     }
763   }
764 
765   // ==========================================================================
766   //  Tool
767   // ==========================================================================
768 
769   /**
770    * Run Map-Reduce Job to perform the files copy.
771    */
772   private void runCopyJob(final Path inputRoot, final Path outputRoot,
773       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
774       final String filesUser, final String filesGroup, final int filesMode,
775       final int mappers, final int bandwidthMB)
776           throws IOException, InterruptedException, ClassNotFoundException {
777     Configuration conf = getConf();
778     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
779     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
780     if (mappers > 0) {
781       conf.setInt(CONF_NUM_SPLITS, mappers);
782       conf.setInt(MR_NUM_MAPS, mappers);
783     }
784     conf.setInt(CONF_FILES_MODE, filesMode);
785     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
786     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
787     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
788     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
789     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
790     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
791 
792     Job job = new Job(conf);
793     job.setJobName("ExportSnapshot-" + snapshotName);
794     job.setJarByClass(ExportSnapshot.class);
795     TableMapReduceUtil.addDependencyJars(job);
796     job.setMapperClass(ExportMapper.class);
797     job.setInputFormatClass(ExportSnapshotInputFormat.class);
798     job.setOutputFormatClass(NullOutputFormat.class);
799     job.setMapSpeculativeExecution(false);
800     job.setNumReduceTasks(0);
801 
802     // Acquire the delegation Tokens
803     Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
804     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
805       new Path[] { inputRoot }, srcConf);
806     Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
807     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
808         new Path[] { outputRoot }, destConf);
809 
810     // Run the MR Job
811     if (!job.waitForCompletion(true)) {
812       throw new ExportSnapshotException(job.getStatus().getFailureInfo());
813     }
814   }
815 
816   private void verifySnapshot(final Configuration baseConf,
817       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
818     // Update the conf with the current root dir, since may be a different cluster
819     Configuration conf = new Configuration(baseConf);
820     FSUtils.setRootDir(conf, rootDir);
821     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
822     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
823     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
824   }
825 
826   /**
827    * Set path ownership.
828    */
829   private void setOwner(final FileSystem fs, final Path path, final String user,
830       final String group, final boolean recursive) throws IOException {
831     if (user != null || group != null) {
832       if (recursive && fs.isDirectory(path)) {
833         for (FileStatus child : fs.listStatus(path)) {
834           setOwner(fs, child.getPath(), user, group, recursive);
835         }
836       }
837       fs.setOwner(path, user, group);
838     }
839   }
840 
841   /**
842    * Set path permission.
843    */
844   private void setPermission(final FileSystem fs, final Path path, final short filesMode,
845       final boolean recursive) throws IOException {
846     if (filesMode > 0) {
847       FsPermission perm = new FsPermission(filesMode);
848       if (recursive && fs.isDirectory(path)) {
849         for (FileStatus child : fs.listStatus(path)) {
850           setPermission(fs, child.getPath(), filesMode, recursive);
851         }
852       }
853       fs.setPermission(path, perm);
854     }
855   }
856 
857   /**
858    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
859    * @return 0 on success, and != 0 upon failure.
860    */
861   @Override
862   public int run(String[] args) throws IOException {
863     boolean verifyTarget = true;
864     boolean verifyChecksum = true;
865     String snapshotName = null;
866     String targetName = null;
867     boolean overwrite = false;
868     String filesGroup = null;
869     String filesUser = null;
870     Path outputRoot = null;
871     int bandwidthMB = Integer.MAX_VALUE;
872     int filesMode = 0;
873     int mappers = 0;
874 
875     Configuration conf = getConf();
876     Path inputRoot = FSUtils.getRootDir(conf);
877 
878     // Process command line args
879     for (int i = 0; i < args.length; i++) {
880       String cmd = args[i];
881       if (cmd.equals("-snapshot")) {
882         snapshotName = args[++i];
883       } else if (cmd.equals("-target")) {
884         targetName = args[++i];
885       } else if (cmd.equals("-copy-to")) {
886         outputRoot = new Path(args[++i]);
887       } else if (cmd.equals("-copy-from")) {
888         inputRoot = new Path(args[++i]);
889         FSUtils.setRootDir(conf, inputRoot);
890       } else if (cmd.equals("-no-checksum-verify")) {
891         verifyChecksum = false;
892       } else if (cmd.equals("-no-target-verify")) {
893         verifyTarget = false;
894       } else if (cmd.equals("-mappers")) {
895         mappers = Integer.parseInt(args[++i]);
896       } else if (cmd.equals("-chuser")) {
897         filesUser = args[++i];
898       } else if (cmd.equals("-chgroup")) {
899         filesGroup = args[++i];
900       } else if (cmd.equals("-bandwidth")) {
901         bandwidthMB = Integer.parseInt(args[++i]);
902       } else if (cmd.equals("-chmod")) {
903         filesMode = Integer.parseInt(args[++i], 8);
904       } else if (cmd.equals("-overwrite")) {
905         overwrite = true;
906       } else if (cmd.equals("-h") || cmd.equals("--help")) {
907         printUsageAndExit();
908       } else {
909         System.err.println("UNEXPECTED: " + cmd);
910         printUsageAndExit();
911       }
912     }
913 
914     // Check user options
915     if (snapshotName == null) {
916       System.err.println("Snapshot name not provided.");
917       printUsageAndExit();
918     }
919 
920     if (outputRoot == null) {
921       System.err.println("Destination file-system not provided.");
922       printUsageAndExit();
923     }
924 
925     if (targetName == null) {
926       targetName = snapshotName;
927     }
928 
929     Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
930     srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
931     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
932     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
933     Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
934     destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
935     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
936     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
937 
938     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
939 
940     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
941     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
942     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
943     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
944 
945     // Find the necessary directory which need to change owner and group
946     Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
947     if (outputFs.exists(needSetOwnerDir)) {
948       if (skipTmp) {
949         needSetOwnerDir = outputSnapshotDir;
950       } else {
951         needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot);
952         if (outputFs.exists(needSetOwnerDir)) {
953           needSetOwnerDir = snapshotTmpDir;
954         }
955       }
956     }
957 
958     // Check if the snapshot already exists
959     if (outputFs.exists(outputSnapshotDir)) {
960       if (overwrite) {
961         if (!outputFs.delete(outputSnapshotDir, true)) {
962           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
963           return 1;
964         }
965       } else {
966         System.err.println("The snapshot '" + targetName +
967           "' already exists in the destination: " + outputSnapshotDir);
968         return 1;
969       }
970     }
971 
972     if (!skipTmp) {
973       // Check if the snapshot already in-progress
974       if (outputFs.exists(snapshotTmpDir)) {
975         if (overwrite) {
976           if (!outputFs.delete(snapshotTmpDir, true)) {
977             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
978             return 1;
979           }
980         } else {
981           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
982           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
983           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
984           return 1;
985         }
986       }
987     }
988 
989     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
990     // The snapshot references must be copied before the hfiles otherwise the cleaner
991     // will remove them because they are unreferenced.
992     try {
993       LOG.info("Copy Snapshot Manifest");
994       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
995     } catch (IOException e) {
996       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
997         snapshotDir + " to=" + initialOutputSnapshotDir, e);
998     } finally {
999       if (filesUser != null || filesGroup != null) {
1000         LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1001             + filesUser)
1002             + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1003             + filesGroup));
1004         setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
1005       }
1006       if (filesMode > 0) {
1007         LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1008         setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
1009       }
1010     }
1011 
1012     // Write a new .snapshotinfo if the target name is different from the source name
1013     if (!targetName.equals(snapshotName)) {
1014       SnapshotDescription snapshotDesc =
1015         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1016           .toBuilder()
1017           .setName(targetName)
1018           .build();
1019       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
1020     }
1021 
1022     // Step 2 - Start MR Job to copy files
1023     // The snapshot references must be copied before the files otherwise the files gets removed
1024     // by the HFileArchiver, since they have no references.
1025     try {
1026       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1027                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1028 
1029       LOG.info("Finalize the Snapshot Export");
1030       if (!skipTmp) {
1031         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1032         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1033           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1034             snapshotTmpDir + " to=" + outputSnapshotDir);
1035         }
1036       }
1037 
1038       // Step 4 - Verify snapshot integrity
1039       if (verifyTarget) {
1040         LOG.info("Verify snapshot integrity");
1041         verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1042       }
1043 
1044       LOG.info("Export Completed: " + targetName);
1045       return 0;
1046     } catch (Exception e) {
1047       LOG.error("Snapshot export failed", e);
1048       if (!skipTmp) {
1049         outputFs.delete(snapshotTmpDir, true);
1050       }
1051       outputFs.delete(outputSnapshotDir, true);
1052       return 1;
1053     } finally {
1054       IOUtils.closeStream(inputFs);
1055       IOUtils.closeStream(outputFs);
1056     }
1057   }
1058 
1059   // ExportSnapshot
1060   private void printUsageAndExit() {
1061     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1062     System.err.println(" where [options] are:");
1063     System.err.println("  -h|-help                Show this help and exit.");
1064     System.err.println("  -snapshot NAME          Snapshot to restore.");
1065     System.err.println("  -copy-to NAME           Remote destination hdfs://");
1066     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
1067     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
1068     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
1069         "exported snapshot.");
1070     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
1071     System.err.println("  -chuser USERNAME        Change the owner of the files " +
1072         "to the specified one.");
1073     System.err.println("  -chgroup GROUP          Change the group of the files to " +
1074         "the specified one.");
1075     System.err.println("  -chmod MODE             Change the permission of the files " +
1076         "to the specified one.");
1077     System.err.println("  -mappers                Number of mappers to use during the " +
1078         "copy (mapreduce.job.maps).");
1079     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
1080     System.err.println();
1081     System.err.println("Examples:");
1082     System.err.println("  hbase " + getClass().getName() + " \\");
1083     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1084     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1085     System.err.println();
1086     System.err.println("  hbase " + getClass().getName() + " \\");
1087     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1088     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1089     System.exit(1);
1090   }
1091 
1092   /**
1093    * The guts of the {@link #main} method.
1094    * Call this method to avoid the {@link #main(String[])} System.exit.
1095    * @param args
1096    * @return errCode
1097    * @throws Exception
1098    */
1099   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1100     return ToolRunner.run(conf, new ExportSnapshot(), args);
1101   }
1102 
1103   public static void main(String[] args) throws Exception {
1104     System.exit(innerMain(HBaseConfiguration.create(), args));
1105   }
1106 }