1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.LinkedList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.conf.Configured;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FSDataOutputStream;
36 import org.apache.hadoop.fs.FileChecksum;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.FileUtil;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.fs.permission.FsPermission;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.exceptions.ExportSnapshotException;
45 import org.apache.hadoop.hbase.io.HFileLink;
46 import org.apache.hadoop.hbase.io.HLogLink;
47 import org.apache.hadoop.hbase.mapreduce.JobUtil;
48 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
49 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.hadoop.hbase.util.FSUtils;
52 import org.apache.hadoop.hbase.util.Pair;
53 import org.apache.hadoop.io.NullWritable;
54 import org.apache.hadoop.io.SequenceFile;
55 import org.apache.hadoop.io.Text;
56 import org.apache.hadoop.mapreduce.Job;
57 import org.apache.hadoop.mapreduce.Mapper;
58 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
59 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
60 import org.apache.hadoop.util.StringUtils;
61 import org.apache.hadoop.util.Tool;
62 import org.apache.hadoop.util.ToolRunner;
63
64
65
66
67
68
69
70
71 @InterfaceAudience.Public
72 @InterfaceStability.Evolving
73 public final class ExportSnapshot extends Configured implements Tool {
74 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
75
76 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
77 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
78 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
79 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
80 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
81 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
82
83 private static final String INPUT_FOLDER_PREFIX = "export-files.";
84
85
86 public enum Counter { MISSING_FILES, COPY_FAILED, BYTES_EXPECTED, BYTES_COPIED };
87
88 private static class ExportMapper extends Mapper<Text, NullWritable, NullWritable, NullWritable> {
89 final static int REPORT_SIZE = 1 * 1024 * 1024;
90 final static int BUFFER_SIZE = 64 * 1024;
91
92 private boolean verifyChecksum;
93 private String filesGroup;
94 private String filesUser;
95 private short filesMode;
96
97 private FileSystem outputFs;
98 private Path outputArchive;
99 private Path outputRoot;
100
101 private FileSystem inputFs;
102 private Path inputArchive;
103 private Path inputRoot;
104
105 @Override
106 public void setup(Context context) {
107 Configuration conf = context.getConfiguration();
108 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
109
110 filesGroup = conf.get(CONF_FILES_GROUP);
111 filesUser = conf.get(CONF_FILES_USER);
112 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
113 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
114 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
115
116 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
117 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
118
119 try {
120 inputFs = FileSystem.get(inputRoot.toUri(), conf);
121 } catch (IOException e) {
122 throw new RuntimeException("Could not get the input FileSystem with root=" + inputRoot, e);
123 }
124
125 try {
126 outputFs = FileSystem.get(outputRoot.toUri(), conf);
127 } catch (IOException e) {
128 throw new RuntimeException("Could not get the output FileSystem with root="+ outputRoot, e);
129 }
130 }
131
132 @Override
133 public void map(Text key, NullWritable value, Context context)
134 throws InterruptedException, IOException {
135 Path inputPath = new Path(key.toString());
136 Path outputPath = getOutputPath(inputPath);
137
138 LOG.info("copy file input=" + inputPath + " output=" + outputPath);
139 if (copyFile(context, inputPath, outputPath)) {
140 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
141 }
142 }
143
144
145
146
147
148
149 private Path getOutputPath(final Path inputPath) throws IOException {
150 Path path;
151 if (HFileLink.isHFileLink(inputPath) || StoreFileInfo.isReference(inputPath)) {
152 String family = inputPath.getParent().getName();
153 String table = HFileLink.getReferencedTableName(inputPath.getName());
154 String region = HFileLink.getReferencedRegionName(inputPath.getName());
155 String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
156 path = new Path(table, new Path(region, new Path(family, hfile)));
157 } else if (isHLogLinkPath(inputPath)) {
158 String logName = inputPath.getName();
159 path = new Path(new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME), logName);
160 } else {
161 path = inputPath;
162 }
163 return new Path(outputArchive, path);
164 }
165
166 private boolean copyFile(final Context context, final Path inputPath, final Path outputPath)
167 throws IOException {
168 FSDataInputStream in = openSourceFile(inputPath);
169 if (in == null) {
170 context.getCounter(Counter.MISSING_FILES).increment(1);
171 return false;
172 }
173
174 try {
175
176 FileStatus inputStat = getFileStatus(inputFs, inputPath);
177 if (inputStat == null) return false;
178
179
180 if (outputFs.exists(outputPath)) {
181 FileStatus outputStat = outputFs.getFileStatus(outputPath);
182 if (sameFile(inputStat, outputStat)) {
183 LOG.info("Skip copy " + inputPath + " to " + outputPath + ", same file.");
184 return true;
185 }
186 }
187
188 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
189
190
191 outputFs.mkdirs(outputPath.getParent());
192 FSDataOutputStream out = outputFs.create(outputPath, true);
193 try {
194 if (!copyData(context, inputPath, in, outputPath, out, inputStat.getLen()))
195 return false;
196 } finally {
197 out.close();
198 }
199
200
201 return preserveAttributes(outputPath, inputStat);
202 } finally {
203 in.close();
204 }
205 }
206
207
208
209
210 private boolean preserveAttributes(final Path path, final FileStatus refStat) {
211 FileStatus stat;
212 try {
213 stat = outputFs.getFileStatus(path);
214 } catch (IOException e) {
215 LOG.warn("Unable to get the status for file=" + path);
216 return false;
217 }
218
219 try {
220 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
221 outputFs.setPermission(path, new FsPermission(filesMode));
222 } else if (!stat.getPermission().equals(refStat.getPermission())) {
223 outputFs.setPermission(path, refStat.getPermission());
224 }
225 } catch (IOException e) {
226 LOG.error("Unable to set the permission for file=" + path, e);
227 return false;
228 }
229
230 try {
231 String user = (filesUser != null) ? filesUser : refStat.getOwner();
232 String group = (filesGroup != null) ? filesGroup : refStat.getGroup();
233 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
234 outputFs.setOwner(path, user, group);
235 }
236 } catch (IOException e) {
237 LOG.error("Unable to set the owner/group for file=" + path, e);
238 return false;
239 }
240
241 return true;
242 }
243
244 private boolean copyData(final Context context,
245 final Path inputPath, final FSDataInputStream in,
246 final Path outputPath, final FSDataOutputStream out,
247 final long inputFileSize) {
248 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
249 " (%.3f%%) from " + inputPath + " to " + outputPath;
250
251 try {
252 byte[] buffer = new byte[BUFFER_SIZE];
253 long totalBytesWritten = 0;
254 int reportBytes = 0;
255 int bytesRead;
256
257 while ((bytesRead = in.read(buffer)) > 0) {
258 out.write(buffer, 0, bytesRead);
259 totalBytesWritten += bytesRead;
260 reportBytes += bytesRead;
261
262 if (reportBytes >= REPORT_SIZE) {
263 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
264 context.setStatus(String.format(statusMessage,
265 StringUtils.humanReadableInt(totalBytesWritten),
266 reportBytes/(float)inputFileSize));
267 reportBytes = 0;
268 }
269 }
270
271 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
272 context.setStatus(String.format(statusMessage,
273 StringUtils.humanReadableInt(totalBytesWritten),
274 reportBytes/(float)inputFileSize));
275
276
277 if (totalBytesWritten != inputFileSize) {
278 LOG.error("number of bytes copied not matching copied=" + totalBytesWritten +
279 " expected=" + inputFileSize + " for file=" + inputPath);
280 context.getCounter(Counter.COPY_FAILED).increment(1);
281 return false;
282 }
283
284 return true;
285 } catch (IOException e) {
286 LOG.error("Error copying " + inputPath + " to " + outputPath, e);
287 context.getCounter(Counter.COPY_FAILED).increment(1);
288 return false;
289 }
290 }
291
292 private FSDataInputStream openSourceFile(final Path path) {
293 try {
294 if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
295 return new HFileLink(inputRoot, inputArchive, path).open(inputFs);
296 } else if (isHLogLinkPath(path)) {
297 String serverName = path.getParent().getName();
298 String logName = path.getName();
299 return new HLogLink(inputRoot, serverName, logName).open(inputFs);
300 }
301 return inputFs.open(path);
302 } catch (IOException e) {
303 LOG.error("Unable to open source file=" + path, e);
304 return null;
305 }
306 }
307
308 private FileStatus getFileStatus(final FileSystem fs, final Path path) {
309 try {
310 if (HFileLink.isHFileLink(path) || StoreFileInfo.isReference(path)) {
311 HFileLink link = new HFileLink(inputRoot, inputArchive, path);
312 return link.getFileStatus(fs);
313 } else if (isHLogLinkPath(path)) {
314 String serverName = path.getParent().getName();
315 String logName = path.getName();
316 return new HLogLink(inputRoot, serverName, logName).getFileStatus(fs);
317 }
318 return fs.getFileStatus(path);
319 } catch (IOException e) {
320 LOG.warn("Unable to get the status for file=" + path);
321 return null;
322 }
323 }
324
325 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
326 try {
327 return fs.getFileChecksum(path);
328 } catch (IOException e) {
329 LOG.warn("Unable to get checksum for file=" + path, e);
330 return null;
331 }
332 }
333
334
335
336
337
338 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
339
340 if (inputStat.getLen() != outputStat.getLen()) return false;
341
342
343 if (!verifyChecksum) return true;
344
345
346 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
347 if (inChecksum == null) return false;
348
349 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
350 if (outChecksum == null) return false;
351
352 return inChecksum.equals(outChecksum);
353 }
354
355
356
357
358
359
360 private static boolean isHLogLinkPath(final Path path) {
361 return path.depth() == 2;
362 }
363 }
364
365
366
367
368
369 private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir)
370 throws IOException {
371 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
372
373 final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
374 final String table = snapshotDesc.getTable();
375 final Configuration conf = getConf();
376
377
378 SnapshotReferenceUtil.visitReferencedFiles(fs, snapshotDir,
379 new SnapshotReferenceUtil.FileVisitor() {
380 public void storeFile (final String region, final String family, final String hfile)
381 throws IOException {
382 Path path = new Path(family, HFileLink.createHFileLinkName(table, region, hfile));
383 long size = new HFileLink(conf, path).getFileStatus(fs).getLen();
384 files.add(new Pair<Path, Long>(path, size));
385 }
386
387 public void recoveredEdits (final String region, final String logfile)
388 throws IOException {
389
390 }
391
392 public void logFile (final String server, final String logfile)
393 throws IOException {
394 long size = new HLogLink(conf, server, logfile).getFileStatus(fs).getLen();
395 files.add(new Pair<Path, Long>(new Path(server, logfile), size));
396 }
397 });
398
399 return files;
400 }
401
402
403
404
405
406
407
408
409
410 static List<List<Path>> getBalancedSplits(final List<Pair<Path, Long>> files, int ngroups) {
411
412 Collections.sort(files, new Comparator<Pair<Path, Long>>() {
413 public int compare(Pair<Path, Long> a, Pair<Path, Long> b) {
414 long r = a.getSecond() - b.getSecond();
415 return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
416 }
417 });
418
419
420 List<List<Path>> fileGroups = new LinkedList<List<Path>>();
421 long[] sizeGroups = new long[ngroups];
422 int hi = files.size() - 1;
423 int lo = 0;
424
425 List<Path> group;
426 int dir = 1;
427 int g = 0;
428
429 while (hi >= lo) {
430 if (g == fileGroups.size()) {
431 group = new LinkedList<Path>();
432 fileGroups.add(group);
433 } else {
434 group = fileGroups.get(g);
435 }
436
437 Pair<Path, Long> fileInfo = files.get(hi--);
438
439
440 sizeGroups[g] += fileInfo.getSecond();
441 group.add(fileInfo.getFirst());
442
443
444 g += dir;
445 if (g == ngroups) {
446 dir = -1;
447 g = ngroups - 1;
448 } else if (g < 0) {
449 dir = 1;
450 g = 0;
451 }
452 }
453
454 if (LOG.isDebugEnabled()) {
455 for (int i = 0; i < sizeGroups.length; ++i) {
456 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
457 }
458 }
459
460 return fileGroups;
461 }
462
463 private static Path getInputFolderPath(Configuration conf)
464 throws IOException, InterruptedException {
465 Path stagingDir = JobUtil.getStagingDir(conf);
466 return new Path(stagingDir, INPUT_FOLDER_PREFIX +
467 String.valueOf(EnvironmentEdgeManager.currentTimeMillis()));
468 }
469
470
471
472
473
474
475
476 private static Path[] createInputFiles(final Configuration conf,
477 final List<Pair<Path, Long>> snapshotFiles, int mappers)
478 throws IOException, InterruptedException {
479 Path inputFolderPath = getInputFolderPath(conf);
480 FileSystem fs = inputFolderPath.getFileSystem(conf);
481 LOG.debug("Input folder location: " + inputFolderPath);
482
483 List<List<Path>> splits = getBalancedSplits(snapshotFiles, mappers);
484 Path[] inputFiles = new Path[splits.size()];
485
486 Text key = new Text();
487 for (int i = 0; i < inputFiles.length; i++) {
488 List<Path> files = splits.get(i);
489 inputFiles[i] = new Path(inputFolderPath, String.format("export-%d.seq", i));
490 SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inputFiles[i],
491 Text.class, NullWritable.class);
492 LOG.debug("Input split: " + i);
493 try {
494 for (Path file: files) {
495 LOG.debug(file.toString());
496 key.set(file.toString());
497 writer.append(key, NullWritable.get());
498 }
499 } finally {
500 writer.close();
501 }
502 }
503
504 return inputFiles;
505 }
506
507
508
509
510 private boolean runCopyJob(final Path inputRoot, final Path outputRoot,
511 final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
512 final String filesUser, final String filesGroup, final int filesMode,
513 final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
514 Configuration conf = getConf();
515 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
516 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
517 conf.setInt(CONF_FILES_MODE, filesMode);
518 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
519 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
520 conf.set(CONF_INPUT_ROOT, inputRoot.toString());
521 conf.setInt("mapreduce.job.maps", mappers);
522
523 Job job = new Job(conf);
524 job.setJobName("ExportSnapshot");
525 job.setJarByClass(ExportSnapshot.class);
526 job.setMapperClass(ExportMapper.class);
527 job.setInputFormatClass(SequenceFileInputFormat.class);
528 job.setOutputFormatClass(NullOutputFormat.class);
529 job.setMapSpeculativeExecution(false);
530 job.setNumReduceTasks(0);
531 for (Path path: createInputFiles(conf, snapshotFiles, mappers)) {
532 LOG.debug("Add Input Path=" + path);
533 SequenceFileInputFormat.addInputPath(job, path);
534 }
535
536 return job.waitForCompletion(true);
537 }
538
539
540
541
542
543 @Override
544 public int run(String[] args) throws Exception {
545 boolean verifyChecksum = true;
546 String snapshotName = null;
547 String filesGroup = null;
548 String filesUser = null;
549 Path outputRoot = null;
550 int filesMode = 0;
551 int mappers = getConf().getInt("mapreduce.job.maps", 1);
552
553
554 for (int i = 0; i < args.length; i++) {
555 String cmd = args[i];
556 try {
557 if (cmd.equals("-snapshot")) {
558 snapshotName = args[++i];
559 } else if (cmd.equals("-copy-to")) {
560 outputRoot = new Path(args[++i]);
561 } else if (cmd.equals("-no-checksum-verify")) {
562 verifyChecksum = false;
563 } else if (cmd.equals("-mappers")) {
564 mappers = Integer.parseInt(args[++i]);
565 } else if (cmd.equals("-chuser")) {
566 filesUser = args[++i];
567 } else if (cmd.equals("-chgroup")) {
568 filesGroup = args[++i];
569 } else if (cmd.equals("-chmod")) {
570 filesMode = Integer.parseInt(args[++i], 8);
571 } else if (cmd.equals("-h") || cmd.equals("--help")) {
572 printUsageAndExit();
573 } else {
574 System.err.println("UNEXPECTED: " + cmd);
575 printUsageAndExit();
576 }
577 } catch (Exception e) {
578 printUsageAndExit();
579 }
580 }
581
582
583 if (snapshotName == null) {
584 System.err.println("Snapshot name not provided.");
585 printUsageAndExit();
586 }
587
588 if (outputRoot == null) {
589 System.err.println("Destination file-system not provided.");
590 printUsageAndExit();
591 }
592
593 Configuration conf = getConf();
594 Path inputRoot = FSUtils.getRootDir(conf);
595 FileSystem inputFs = FileSystem.get(conf);
596 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
597
598 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
599 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshotName, outputRoot);
600 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, outputRoot);
601
602
603 if (outputFs.exists(outputSnapshotDir)) {
604 System.err.println("The snapshot '" + snapshotName +
605 "' already exists in the destination: " + outputSnapshotDir);
606 return 1;
607 }
608
609
610 if (outputFs.exists(snapshotTmpDir)) {
611 System.err.println("A snapshot with the same name '" + snapshotName + "' may be in-progress");
612 System.err.println("Please check " + snapshotTmpDir + ". If the snapshot has completed, ");
613 System.err.println("consider removing " + snapshotTmpDir + " before retrying export");
614 return 1;
615 }
616
617
618 final List<Pair<Path, Long>> files = getSnapshotFiles(inputFs, snapshotDir);
619
620
621
622
623 try {
624 FileUtil.copy(inputFs, snapshotDir, outputFs, snapshotTmpDir, false, false, conf);
625 } catch (IOException e) {
626 System.err.println("Failed to copy the snapshot directory: from=" + snapshotDir +
627 " to=" + snapshotTmpDir);
628 e.printStackTrace(System.err);
629 return 1;
630 }
631
632
633
634
635 try {
636 if (files.size() == 0) {
637 LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
638 } else {
639 if (!runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
640 filesUser, filesGroup, filesMode, mappers)) {
641 throw new ExportSnapshotException("Snapshot export failed!");
642 }
643 }
644
645
646 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
647 System.err.println("Snapshot export failed!");
648 System.err.println("Unable to rename snapshot directory from=" +
649 snapshotTmpDir + " to=" + outputSnapshotDir);
650 return 1;
651 }
652
653 return 0;
654 } catch (Exception e) {
655 System.err.println("Snapshot export failed!");
656 e.printStackTrace(System.err);
657 outputFs.delete(outputSnapshotDir, true);
658 return 1;
659 }
660 }
661
662
663 private void printUsageAndExit() {
664 System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
665 System.err.println(" where [options] are:");
666 System.err.println(" -h|-help Show this help and exit.");
667 System.err.println(" -snapshot NAME Snapshot to restore.");
668 System.err.println(" -copy-to NAME Remote destination hdfs://");
669 System.err.println(" -no-checksum-verify Do not verify checksum.");
670 System.err.println(" -chuser USERNAME Change the owner of the files to the specified one.");
671 System.err.println(" -chgroup GROUP Change the group of the files to the specified one.");
672 System.err.println(" -chmod MODE Change the permission of the files to the specified one.");
673 System.err.println(" -mappers Number of mappers to use during the copy (mapreduce.job.maps).");
674 System.err.println();
675 System.err.println("Examples:");
676 System.err.println(" hbase " + getClass() + " \\");
677 System.err.println(" -snapshot MySnapshot -copy-to hdfs:///srv2:8082/hbase \\");
678 System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
679 System.exit(1);
680 }
681
682
683
684
685
686
687
688
689 static int innerMain(final Configuration conf, final String [] args) throws Exception {
690 return ToolRunner.run(conf, new ExportSnapshot(), args);
691 }
692
693 public static void main(String[] args) throws Exception {
694 System.exit(innerMain(HBaseConfiguration.create(), args));
695 }
696 }