View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import java.io.BufferedInputStream;
22  import java.io.FileNotFoundException;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Random;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.conf.Configured;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.fs.FSDataOutputStream;
42  import org.apache.hadoop.fs.FileChecksum;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.FileUtil;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.permission.FsPermission;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.HBaseConfiguration;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.io.FileLink;
53  import org.apache.hadoop.hbase.io.HFileLink;
54  import org.apache.hadoop.hbase.io.WALLink;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59  import org.apache.hadoop.hbase.util.FSUtils;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.io.BytesWritable;
62  import org.apache.hadoop.io.IOUtils;
63  import org.apache.hadoop.io.NullWritable;
64  import org.apache.hadoop.io.Writable;
65  import org.apache.hadoop.mapreduce.Job;
66  import org.apache.hadoop.mapreduce.JobContext;
67  import org.apache.hadoop.mapreduce.Mapper;
68  import org.apache.hadoop.mapreduce.InputFormat;
69  import org.apache.hadoop.mapreduce.InputSplit;
70  import org.apache.hadoop.mapreduce.RecordReader;
71  import org.apache.hadoop.mapreduce.TaskAttemptContext;
72  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73  import org.apache.hadoop.mapreduce.security.TokenCache;
74  import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75  import org.apache.hadoop.util.StringUtils;
76  import org.apache.hadoop.util.Tool;
77  import org.apache.hadoop.util.ToolRunner;
78  
79  /**
80   * Export the specified snapshot to a given FileSystem.
81   *
82   * The .snapshot/name folder is copied to the destination cluster
83   * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location.
84   * When everything is done, the second cluster can restore the snapshot.
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class ExportSnapshot extends Configured implements Tool {
89    public static final String NAME = "exportsnapshot";
90    /** Configuration prefix for overrides for the source filesystem */
91    public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
92    /** Configuration prefix for overrides for the destination filesystem */
93    public static final String CONF_DEST_PREFIX = NAME + ".to.";
94  
95    private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
96  
97    private static final String MR_NUM_MAPS = "mapreduce.job.maps";
98    private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
99    private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100   private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101   private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102   private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103   private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104   private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105   private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109   private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110   protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
111 
112   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
113   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
114 
115   private static final String INPUT_FOLDER_PREFIX = "export-files.";
116 
117   // Export Map-Reduce Counters, to keep track of the progress
118   public enum Counter {
119     MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120     BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121   }
122 
123   private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124                                                    NullWritable, NullWritable> {
125     final static int REPORT_SIZE = 1 * 1024 * 1024;
126     final static int BUFFER_SIZE = 64 * 1024;
127 
128     private boolean testFailures;
129     private Random random;
130 
131     private boolean verifyChecksum;
132     private String filesGroup;
133     private String filesUser;
134     private short filesMode;
135     private int bufferSize;
136 
137     private FileSystem outputFs;
138     private Path outputArchive;
139     private Path outputRoot;
140 
141     private FileSystem inputFs;
142     private Path inputArchive;
143     private Path inputRoot;
144 
145     @Override
146     public void setup(Context context) throws IOException {
147       Configuration conf = context.getConfiguration();
148       Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
149       Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
150 
151       verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
152 
153       filesGroup = conf.get(CONF_FILES_GROUP);
154       filesUser = conf.get(CONF_FILES_USER);
155       filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
156       outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
157       inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
158 
159       inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
160       outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
161 
162       testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
163 
164       try {
165         srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166         inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
167       } catch (IOException e) {
168         throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
169       }
170 
171       try {
172         destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
173         outputFs = FileSystem.get(outputRoot.toUri(), destConf);
174       } catch (IOException e) {
175         throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
176       }
177 
178       // Use the default block size of the outputFs if bigger
179       int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
180       bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
181       LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
182 
183       for (Counter c : Counter.values()) {
184         context.getCounter(c).increment(0);
185       }
186     }
187 
188     @Override
189     protected void cleanup(Context context) {
190       IOUtils.closeStream(inputFs);
191       IOUtils.closeStream(outputFs);
192     }
193 
194     @Override
195     public void map(BytesWritable key, NullWritable value, Context context)
196         throws InterruptedException, IOException {
197       SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
198       Path outputPath = getOutputPath(inputInfo);
199 
200       copyFile(context, inputInfo, outputPath);
201     }
202 
203     /**
204      * Returns the location where the inputPath will be copied.
205      */
206     private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
207       Path path = null;
208       switch (inputInfo.getType()) {
209         case HFILE:
210           Path inputPath = new Path(inputInfo.getHfile());
211           String family = inputPath.getParent().getName();
212           TableName table =HFileLink.getReferencedTableName(inputPath.getName());
213           String region = HFileLink.getReferencedRegionName(inputPath.getName());
214           String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
215           path = new Path(FSUtils.getTableDir(new Path("./"), table),
216               new Path(region, new Path(family, hfile)));
217           break;
218         case WAL:
219           Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
220           path = new Path(oldLogsDir, inputInfo.getWalName());
221           break;
222         default:
223           throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
224       }
225       return new Path(outputArchive, path);
226     }
227 
228     /*
229      * Used by TestExportSnapshot to simulate a failure
230      */
231     private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
232         throws IOException {
233       if (testFailures) {
234         if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
235           if (random == null) {
236             random = new Random();
237           }
238 
239           // FLAKY-TEST-WARN: lower is better, we can get some runs without the
240           // retry, but at least we reduce the number of test failures due to
241           // this test exception from the same map task.
242           if (random.nextFloat() < 0.03) {
243             throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
244                                   + " time=" + System.currentTimeMillis());
245           }
246         } else {
247           context.getCounter(Counter.COPY_FAILED).increment(1);
248           throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
249         }
250       }
251     }
252 
253     private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
254         final Path outputPath) throws IOException {
255       injectTestFailure(context, inputInfo);
256 
257       // Get the file information
258       FileStatus inputStat = getSourceFileStatus(context, inputInfo);
259 
260       // Verify if the output file exists and is the same that we want to copy
261       if (outputFs.exists(outputPath)) {
262         FileStatus outputStat = outputFs.getFileStatus(outputPath);
263         if (outputStat != null && sameFile(inputStat, outputStat)) {
264           LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
265           context.getCounter(Counter.FILES_SKIPPED).increment(1);
266           context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
267           return;
268         }
269       }
270 
271       InputStream in = openSourceFile(context, inputInfo);
272       int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
273       if (Integer.MAX_VALUE != bandwidthMB) {
274         in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024L * 1024L);
275       }
276 
277       try {
278         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
279 
280         // Ensure that the output folder is there and copy the file
281         createOutputPath(outputPath.getParent());
282         FSDataOutputStream out = outputFs.create(outputPath, true);
283         try {
284           copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
285         } finally {
286           out.close();
287         }
288 
289         // Try to Preserve attributes
290         if (!preserveAttributes(outputPath, inputStat)) {
291           LOG.warn("You may have to run manually chown on: " + outputPath);
292         }
293       } finally {
294         in.close();
295       }
296     }
297 
298     /**
299      * Create the output folder and optionally set ownership.
300      */
301     private void createOutputPath(final Path path) throws IOException {
302       if (filesUser == null && filesGroup == null) {
303         outputFs.mkdirs(path);
304       } else {
305         Path parent = path.getParent();
306         if (!outputFs.exists(parent) && !parent.isRoot()) {
307           createOutputPath(parent);
308         }
309         outputFs.mkdirs(path);
310         if (filesUser != null || filesGroup != null) {
311           // override the owner when non-null user/group is specified
312           outputFs.setOwner(path, filesUser, filesGroup);
313         }
314         if (filesMode > 0) {
315           outputFs.setPermission(path, new FsPermission(filesMode));
316         }
317       }
318     }
319 
320     /**
321      * Try to Preserve the files attribute selected by the user copying them from the source file
322      * This is only required when you are exporting as a different user than "hbase" or on a system
323      * that doesn't have the "hbase" user.
324      *
325      * This is not considered a blocking failure since the user can force a chmod with the user
326      * that knows is available on the system.
327      */
328     private boolean preserveAttributes(final Path path, final FileStatus refStat) {
329       FileStatus stat;
330       try {
331         stat = outputFs.getFileStatus(path);
332       } catch (IOException e) {
333         LOG.warn("Unable to get the status for file=" + path);
334         return false;
335       }
336 
337       try {
338         if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
339           outputFs.setPermission(path, new FsPermission(filesMode));
340         } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
341           outputFs.setPermission(path, refStat.getPermission());
342         }
343       } catch (IOException e) {
344         LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
345         return false;
346       }
347 
348       boolean hasRefStat = (refStat != null);
349       String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
350       String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
351       if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
352         try {
353           if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
354             outputFs.setOwner(path, user, group);
355           }
356         } catch (IOException e) {
357           LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
358           LOG.warn("The user/group may not exist on the destination cluster: user=" +
359                    user + " group=" + group);
360           return false;
361         }
362       }
363 
364       return true;
365     }
366 
367     private boolean stringIsNotEmpty(final String str) {
368       return str != null && str.length() > 0;
369     }
370 
371     private void copyData(final Context context,
372         final Path inputPath, final InputStream in,
373         final Path outputPath, final FSDataOutputStream out,
374         final long inputFileSize)
375         throws IOException {
376       final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
377                                    " (%.1f%%)";
378 
379       try {
380         byte[] buffer = new byte[bufferSize];
381         long totalBytesWritten = 0;
382         int reportBytes = 0;
383         int bytesRead;
384 
385         long stime = System.currentTimeMillis();
386         while ((bytesRead = in.read(buffer)) > 0) {
387           out.write(buffer, 0, bytesRead);
388           totalBytesWritten += bytesRead;
389           reportBytes += bytesRead;
390 
391           if (reportBytes >= REPORT_SIZE) {
392             context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
393             context.setStatus(String.format(statusMessage,
394                               StringUtils.humanReadableInt(totalBytesWritten),
395                               (totalBytesWritten/(float)inputFileSize) * 100.0f) +
396                               " from " + inputPath + " to " + outputPath);
397             reportBytes = 0;
398           }
399         }
400         long etime = System.currentTimeMillis();
401 
402         context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
403         context.setStatus(String.format(statusMessage,
404                           StringUtils.humanReadableInt(totalBytesWritten),
405                           (totalBytesWritten/(float)inputFileSize) * 100.0f) +
406                           " from " + inputPath + " to " + outputPath);
407 
408         // Verify that the written size match
409         if (totalBytesWritten != inputFileSize) {
410           String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
411                        " expected=" + inputFileSize + " for file=" + inputPath;
412           throw new IOException(msg);
413         }
414 
415         LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
416         LOG.info("size=" + totalBytesWritten +
417             " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
418             " time=" + StringUtils.formatTimeDiff(etime, stime) +
419             String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
420         context.getCounter(Counter.FILES_COPIED).increment(1);
421       } catch (IOException e) {
422         LOG.error("Error copying " + inputPath + " to " + outputPath, e);
423         context.getCounter(Counter.COPY_FAILED).increment(1);
424         throw e;
425       }
426     }
427 
428     /**
429      * Try to open the "source" file.
430      * Throws an IOException if the communication with the inputFs fail or
431      * if the file is not found.
432      */
433     private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
434             throws IOException {
435       try {
436         Configuration conf = context.getConfiguration();
437         FileLink link = null;
438         switch (fileInfo.getType()) {
439           case HFILE:
440             Path inputPath = new Path(fileInfo.getHfile());
441             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
442             break;
443           case WAL:
444             String serverName = fileInfo.getWalServer();
445             String logName = fileInfo.getWalName();
446             link = new WALLink(inputRoot, serverName, logName);
447             break;
448           default:
449             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
450         }
451         return link.open(inputFs);
452       } catch (IOException e) {
453         context.getCounter(Counter.MISSING_FILES).increment(1);
454         LOG.error("Unable to open source file=" + fileInfo.toString(), e);
455         throw e;
456       }
457     }
458 
459     private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
460         throws IOException {
461       try {
462         Configuration conf = context.getConfiguration();
463         FileLink link = null;
464         switch (fileInfo.getType()) {
465           case HFILE:
466             Path inputPath = new Path(fileInfo.getHfile());
467             link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
468             break;
469           case WAL:
470             link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
471             break;
472           default:
473             throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
474         }
475         return link.getFileStatus(inputFs);
476       } catch (FileNotFoundException e) {
477         context.getCounter(Counter.MISSING_FILES).increment(1);
478         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
479         throw e;
480       } catch (IOException e) {
481         LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
482         throw e;
483       }
484     }
485 
486     private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
487       try {
488         return fs.getFileChecksum(path);
489       } catch (IOException e) {
490         LOG.warn("Unable to get checksum for file=" + path, e);
491         return null;
492       }
493     }
494 
495     /**
496      * Check if the two files are equal by looking at the file length,
497      * and at the checksum (if user has specified the verifyChecksum flag).
498      */
499     private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
500       // Not matching length
501       if (inputStat.getLen() != outputStat.getLen()) return false;
502 
503       // Mark files as equals, since user asked for no checksum verification
504       if (!verifyChecksum) return true;
505 
506       // If checksums are not available, files are not the same.
507       FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
508       if (inChecksum == null) return false;
509 
510       FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
511       if (outChecksum == null) return false;
512 
513       return inChecksum.equals(outChecksum);
514     }
515   }
516 
517   // ==========================================================================
518   //  Input Format
519   // ==========================================================================
520 
521   /**
522    * Extract the list of files (HFiles/WALs) to copy using Map-Reduce.
523    * @return list of files referenced by the snapshot (pair of path and size)
524    */
525   private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
526       final FileSystem fs, final Path snapshotDir) throws IOException {
527     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
528 
529     final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
530     final TableName table = TableName.valueOf(snapshotDesc.getTable());
531 
532     // Get snapshot files
533     LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
534     SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
535       new SnapshotReferenceUtil.SnapshotVisitor() {
536         @Override
537         public void storeFile(final HRegionInfo regionInfo, final String family,
538             final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
539           if (storeFile.hasReference()) {
540             // copied as part of the manifest
541           } else {
542             String region = regionInfo.getEncodedName();
543             String hfile = storeFile.getName();
544             Path path = HFileLink.createPath(table, region, family, hfile);
545 
546             SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
547               .setType(SnapshotFileInfo.Type.HFILE)
548               .setHfile(path.toString())
549               .build();
550 
551             long size;
552             if (storeFile.hasFileSize()) {
553               size = storeFile.getFileSize();
554             } else {
555               size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
556             }
557             files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
558           }
559         }
560 
561         @Override
562         public void logFile (final String server, final String logfile)
563             throws IOException {
564           SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
565             .setType(SnapshotFileInfo.Type.WAL)
566             .setWalServer(server)
567             .setWalName(logfile)
568             .build();
569 
570           long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
571           files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
572         }
573     });
574 
575     return files;
576   }
577 
578   /**
579    * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible.
580    * The groups created will have similar amounts of bytes.
581    * <p>
582    * The algorithm used is pretty straightforward; the file list is sorted by size,
583    * and then each group fetch the bigger file available, iterating through groups
584    * alternating the direction.
585    */
586   static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
587       final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
588     // Sort files by size, from small to big
589     Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
590       public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
591         long r = a.getSecond() - b.getSecond();
592         return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
593       }
594     });
595 
596     // create balanced groups
597     List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
598       new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
599     long[] sizeGroups = new long[ngroups];
600     int hi = files.size() - 1;
601     int lo = 0;
602 
603     List<Pair<SnapshotFileInfo, Long>> group;
604     int dir = 1;
605     int g = 0;
606 
607     while (hi >= lo) {
608       if (g == fileGroups.size()) {
609         group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
610         fileGroups.add(group);
611       } else {
612         group = fileGroups.get(g);
613       }
614 
615       Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
616 
617       // add the hi one
618       sizeGroups[g] += fileInfo.getSecond();
619       group.add(fileInfo);
620 
621       // change direction when at the end or the beginning
622       g += dir;
623       if (g == ngroups) {
624         dir = -1;
625         g = ngroups - 1;
626       } else if (g < 0) {
627         dir = 1;
628         g = 0;
629       }
630     }
631 
632     if (LOG.isDebugEnabled()) {
633       for (int i = 0; i < sizeGroups.length; ++i) {
634         LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
635       }
636     }
637 
638     return fileGroups;
639   }
640 
641   private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
642     @Override
643     public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
644         TaskAttemptContext tac) throws IOException, InterruptedException {
645       return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
646     }
647 
648     @Override
649     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
650       Configuration conf = context.getConfiguration();
651       Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
652       FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
653 
654       List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
655       int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
656       if (mappers == 0 && snapshotFiles.size() > 0) {
657         mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
658         mappers = Math.min(mappers, snapshotFiles.size());
659         conf.setInt(CONF_NUM_SPLITS, mappers);
660         conf.setInt(MR_NUM_MAPS, mappers);
661       }
662 
663       List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
664       List<InputSplit> splits = new ArrayList(groups.size());
665       for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
666         splits.add(new ExportSnapshotInputSplit(files));
667       }
668       return splits;
669     }
670 
671     private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
672       private List<Pair<BytesWritable, Long>> files;
673       private long length;
674 
675       public ExportSnapshotInputSplit() {
676         this.files = null;
677       }
678 
679       public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
680         this.files = new ArrayList(snapshotFiles.size());
681         for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
682           this.files.add(new Pair<BytesWritable, Long>(
683             new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
684           this.length += fileInfo.getSecond();
685         }
686       }
687 
688       private List<Pair<BytesWritable, Long>> getSplitKeys() {
689         return files;
690       }
691 
692       @Override
693       public long getLength() throws IOException, InterruptedException {
694         return length;
695       }
696 
697       @Override
698       public String[] getLocations() throws IOException, InterruptedException {
699         return new String[] {};
700       }
701 
702       @Override
703       public void readFields(DataInput in) throws IOException {
704         int count = in.readInt();
705         files = new ArrayList<Pair<BytesWritable, Long>>(count);
706         length = 0;
707         for (int i = 0; i < count; ++i) {
708           BytesWritable fileInfo = new BytesWritable();
709           fileInfo.readFields(in);
710           long size = in.readLong();
711           files.add(new Pair<BytesWritable, Long>(fileInfo, size));
712           length += size;
713         }
714       }
715 
716       @Override
717       public void write(DataOutput out) throws IOException {
718         out.writeInt(files.size());
719         for (final Pair<BytesWritable, Long> fileInfo: files) {
720           fileInfo.getFirst().write(out);
721           out.writeLong(fileInfo.getSecond());
722         }
723       }
724     }
725 
726     private static class ExportSnapshotRecordReader
727         extends RecordReader<BytesWritable, NullWritable> {
728       private final List<Pair<BytesWritable, Long>> files;
729       private long totalSize = 0;
730       private long procSize = 0;
731       private int index = -1;
732 
733       ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
734         this.files = files;
735         for (Pair<BytesWritable, Long> fileInfo: files) {
736           totalSize += fileInfo.getSecond();
737         }
738       }
739 
740       @Override
741       public void close() { }
742 
743       @Override
744       public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
745 
746       @Override
747       public NullWritable getCurrentValue() { return NullWritable.get(); }
748 
749       @Override
750       public float getProgress() { return (float)procSize / totalSize; }
751 
752       @Override
753       public void initialize(InputSplit split, TaskAttemptContext tac) { }
754 
755       @Override
756       public boolean nextKeyValue() {
757         if (index >= 0) {
758           procSize += files.get(index).getSecond();
759         }
760         return(++index < files.size());
761       }
762     }
763   }
764 
765   // ==========================================================================
766   //  Tool
767   // ==========================================================================
768 
769   /**
770    * Run Map-Reduce Job to perform the files copy.
771    */
772   private void runCopyJob(final Path inputRoot, final Path outputRoot,
773       final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
774       final String filesUser, final String filesGroup, final int filesMode,
775       final int mappers, final int bandwidthMB)
776           throws IOException, InterruptedException, ClassNotFoundException {
777     Configuration conf = getConf();
778     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
779     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
780     if (mappers > 0) {
781       conf.setInt(CONF_NUM_SPLITS, mappers);
782       conf.setInt(MR_NUM_MAPS, mappers);
783     }
784     conf.setInt(CONF_FILES_MODE, filesMode);
785     conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
786     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
787     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
788     conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
789     conf.set(CONF_SNAPSHOT_NAME, snapshotName);
790     conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
791 
792     Job job = new Job(conf);
793     job.setJobName("ExportSnapshot-" + snapshotName);
794     job.setJarByClass(ExportSnapshot.class);
795     TableMapReduceUtil.addDependencyJars(job);
796     job.setMapperClass(ExportMapper.class);
797     job.setInputFormatClass(ExportSnapshotInputFormat.class);
798     job.setOutputFormatClass(NullOutputFormat.class);
799     job.setMapSpeculativeExecution(false);
800     job.setNumReduceTasks(0);
801 
802     // Acquire the delegation Tokens
803     Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
804     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
805       new Path[] { inputRoot }, srcConf);
806     Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
807     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
808         new Path[] { outputRoot }, destConf);
809 
810     // Run the MR Job
811     if (!job.waitForCompletion(true)) {
812       // TODO: Replace the fixed string with job.getStatus().getFailureInfo()
813       // when it will be available on all the supported versions.
814       throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
815     }
816   }
817 
818   private void verifySnapshot(final Configuration baseConf,
819       final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
820     // Update the conf with the current root dir, since may be a different cluster
821     Configuration conf = new Configuration(baseConf);
822     FSUtils.setRootDir(conf, rootDir);
823     FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
824     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
825     SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
826   }
827 
828   /**
829    * Set path ownership.
830    */
831   private void setOwner(final FileSystem fs, final Path path, final String user,
832       final String group, final boolean recursive) throws IOException {
833     if (user != null || group != null) {
834       if (recursive && fs.isDirectory(path)) {
835         for (FileStatus child : fs.listStatus(path)) {
836           setOwner(fs, child.getPath(), user, group, recursive);
837         }
838       }
839       fs.setOwner(path, user, group);
840     }
841   }
842 
843   /**
844    * Set path permission.
845    */
846   private void setPermission(final FileSystem fs, final Path path, final short filesMode,
847       final boolean recursive) throws IOException {
848     if (filesMode > 0) {
849       FsPermission perm = new FsPermission(filesMode);
850       if (recursive && fs.isDirectory(path)) {
851         for (FileStatus child : fs.listStatus(path)) {
852           setPermission(fs, child.getPath(), filesMode, recursive);
853         }
854       }
855       fs.setPermission(path, perm);
856     }
857   }
858 
859   /**
860    * Execute the export snapshot by copying the snapshot metadata, hfiles and wals.
861    * @return 0 on success, and != 0 upon failure.
862    */
863   @Override
864   public int run(String[] args) throws IOException {
865     boolean verifyTarget = true;
866     boolean verifyChecksum = true;
867     String snapshotName = null;
868     String targetName = null;
869     boolean overwrite = false;
870     String filesGroup = null;
871     String filesUser = null;
872     Path outputRoot = null;
873     int bandwidthMB = Integer.MAX_VALUE;
874     int filesMode = 0;
875     int mappers = 0;
876 
877     Configuration conf = getConf();
878     Path inputRoot = FSUtils.getRootDir(conf);
879 
880     // Process command line args
881     for (int i = 0; i < args.length; i++) {
882       String cmd = args[i];
883       if (cmd.equals("-snapshot")) {
884         snapshotName = args[++i];
885       } else if (cmd.equals("-target")) {
886         targetName = args[++i];
887       } else if (cmd.equals("-copy-to")) {
888         outputRoot = new Path(args[++i]);
889       } else if (cmd.equals("-copy-from")) {
890         inputRoot = new Path(args[++i]);
891         FSUtils.setRootDir(conf, inputRoot);
892       } else if (cmd.equals("-no-checksum-verify")) {
893         verifyChecksum = false;
894       } else if (cmd.equals("-no-target-verify")) {
895         verifyTarget = false;
896       } else if (cmd.equals("-mappers")) {
897         mappers = Integer.parseInt(args[++i]);
898       } else if (cmd.equals("-chuser")) {
899         filesUser = args[++i];
900       } else if (cmd.equals("-chgroup")) {
901         filesGroup = args[++i];
902       } else if (cmd.equals("-bandwidth")) {
903         bandwidthMB = Integer.parseInt(args[++i]);
904       } else if (cmd.equals("-chmod")) {
905         filesMode = Integer.parseInt(args[++i], 8);
906       } else if (cmd.equals("-overwrite")) {
907         overwrite = true;
908       } else if (cmd.equals("-h") || cmd.equals("--help")) {
909         printUsageAndExit();
910       } else {
911         System.err.println("UNEXPECTED: " + cmd);
912         printUsageAndExit();
913       }
914     }
915 
916     // Check user options
917     if (snapshotName == null) {
918       System.err.println("Snapshot name not provided.");
919       printUsageAndExit();
920     }
921 
922     if (outputRoot == null) {
923       System.err.println("Destination file-system not provided.");
924       printUsageAndExit();
925     }
926 
927     if (targetName == null) {
928       targetName = snapshotName;
929     }
930 
931     Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
932     srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
933     FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
934     LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
935     Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
936     destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
937     FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
938     LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
939 
940     boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
941 
942     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
943     Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
944     Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
945     Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
946 
947     // Find the necessary directory which need to change owner and group
948     Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
949     if (outputFs.exists(needSetOwnerDir)) {
950       if (skipTmp) {
951         needSetOwnerDir = outputSnapshotDir;
952       } else {
953         needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot);
954         if (outputFs.exists(needSetOwnerDir)) {
955           needSetOwnerDir = snapshotTmpDir;
956         }
957       }
958     }
959 
960     // Check if the snapshot already exists
961     if (outputFs.exists(outputSnapshotDir)) {
962       if (overwrite) {
963         if (!outputFs.delete(outputSnapshotDir, true)) {
964           System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
965           return 1;
966         }
967       } else {
968         System.err.println("The snapshot '" + targetName +
969           "' already exists in the destination: " + outputSnapshotDir);
970         return 1;
971       }
972     }
973 
974     if (!skipTmp) {
975       // Check if the snapshot already in-progress
976       if (outputFs.exists(snapshotTmpDir)) {
977         if (overwrite) {
978           if (!outputFs.delete(snapshotTmpDir, true)) {
979             System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
980             return 1;
981           }
982         } else {
983           System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
984           System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
985           System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
986           return 1;
987         }
988       }
989     }
990 
991     // Step 1 - Copy fs1:/.snapshot/<snapshot> to  fs2:/.snapshot/.tmp/<snapshot>
992     // The snapshot references must be copied before the hfiles otherwise the cleaner
993     // will remove them because they are unreferenced.
994     try {
995       LOG.info("Copy Snapshot Manifest");
996       FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
997     } catch (IOException e) {
998       throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
999         snapshotDir + " to=" + initialOutputSnapshotDir, e);
1000     } finally {
1001       if (filesUser != null || filesGroup != null) {
1002         LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1003             + filesUser)
1004             + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1005             + filesGroup));
1006         setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
1007       }
1008       if (filesMode > 0) {
1009         LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1010         setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
1011       }
1012     }
1013 
1014     // Write a new .snapshotinfo if the target name is different from the source name
1015     if (!targetName.equals(snapshotName)) {
1016       SnapshotDescription snapshotDesc =
1017         SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1018           .toBuilder()
1019           .setName(targetName)
1020           .build();
1021       SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
1022     }
1023 
1024     // Step 2 - Start MR Job to copy files
1025     // The snapshot references must be copied before the files otherwise the files gets removed
1026     // by the HFileArchiver, since they have no references.
1027     try {
1028       runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1029                  filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1030 
1031       LOG.info("Finalize the Snapshot Export");
1032       if (!skipTmp) {
1033         // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>
1034         if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1035           throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1036             snapshotTmpDir + " to=" + outputSnapshotDir);
1037         }
1038       }
1039 
1040       // Step 4 - Verify snapshot integrity
1041       if (verifyTarget) {
1042         LOG.info("Verify snapshot integrity");
1043         verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1044       }
1045 
1046       LOG.info("Export Completed: " + targetName);
1047       return 0;
1048     } catch (Exception e) {
1049       LOG.error("Snapshot export failed", e);
1050       if (!skipTmp) {
1051         outputFs.delete(snapshotTmpDir, true);
1052       }
1053       outputFs.delete(outputSnapshotDir, true);
1054       return 1;
1055     } finally {
1056       IOUtils.closeStream(inputFs);
1057       IOUtils.closeStream(outputFs);
1058     }
1059   }
1060 
1061   // ExportSnapshot
1062   private void printUsageAndExit() {
1063     System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1064     System.err.println(" where [options] are:");
1065     System.err.println("  -h|-help                Show this help and exit.");
1066     System.err.println("  -snapshot NAME          Snapshot to restore.");
1067     System.err.println("  -copy-to NAME           Remote destination hdfs://");
1068     System.err.println("  -copy-from NAME         Input folder hdfs:// (default hbase.rootdir)");
1069     System.err.println("  -no-checksum-verify     Do not verify checksum, use name+length only.");
1070     System.err.println("  -no-target-verify       Do not verify the integrity of the \\" +
1071         "exported snapshot.");
1072     System.err.println("  -overwrite              Rewrite the snapshot manifest if already exists");
1073     System.err.println("  -chuser USERNAME        Change the owner of the files " +
1074         "to the specified one.");
1075     System.err.println("  -chgroup GROUP          Change the group of the files to " +
1076         "the specified one.");
1077     System.err.println("  -chmod MODE             Change the permission of the files " +
1078         "to the specified one.");
1079     System.err.println("  -mappers                Number of mappers to use during the " +
1080         "copy (mapreduce.job.maps).");
1081     System.err.println("  -bandwidth              Limit bandwidth to this value in MB/second.");
1082     System.err.println();
1083     System.err.println("Examples:");
1084     System.err.println("  hbase " + getClass().getName() + " \\");
1085     System.err.println("    -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1086     System.err.println("    -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1087     System.err.println();
1088     System.err.println("  hbase " + getClass().getName() + " \\");
1089     System.err.println("    -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1090     System.err.println("    -copy-to hdfs://srv1:50070/hbase \\");
1091     System.exit(1);
1092   }
1093 
1094   /**
1095    * The guts of the {@link #main} method.
1096    * Call this method to avoid the {@link #main(String[])} System.exit.
1097    * @param args
1098    * @return errCode
1099    * @throws Exception
1100    */
1101   static int innerMain(final Configuration conf, final String [] args) throws Exception {
1102     return ToolRunner.run(conf, new ExportSnapshot(), args);
1103   }
1104 
1105   public static void main(String[] args) throws Exception {
1106     System.exit(innerMain(HBaseConfiguration.create(), args));
1107   }
1108 }