1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.BufferedInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.conf.Configured;
40 import org.apache.hadoop.fs.FSDataInputStream;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileChecksum;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.FileUtil;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.permission.FsPermission;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.HBaseConfiguration;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.io.FileLink;
53 import org.apache.hadoop.hbase.io.HFileLink;
54 import org.apache.hadoop.hbase.io.WALLink;
55 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59 import org.apache.hadoop.hbase.util.FSUtils;
60 import org.apache.hadoop.hbase.util.Pair;
61 import org.apache.hadoop.io.BytesWritable;
62 import org.apache.hadoop.io.IOUtils;
63 import org.apache.hadoop.io.NullWritable;
64 import org.apache.hadoop.io.Writable;
65 import org.apache.hadoop.mapreduce.Job;
66 import org.apache.hadoop.mapreduce.JobContext;
67 import org.apache.hadoop.mapreduce.Mapper;
68 import org.apache.hadoop.mapreduce.InputFormat;
69 import org.apache.hadoop.mapreduce.InputSplit;
70 import org.apache.hadoop.mapreduce.RecordReader;
71 import org.apache.hadoop.mapreduce.TaskAttemptContext;
72 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73 import org.apache.hadoop.mapreduce.security.TokenCache;
74 import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75 import org.apache.hadoop.util.StringUtils;
76 import org.apache.hadoop.util.Tool;
77 import org.apache.hadoop.util.ToolRunner;
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Public
87 @InterfaceStability.Evolving
88 public class ExportSnapshot extends Configured implements Tool {
89 public static final String NAME = "exportsnapshot";
90
91 public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
92
93 public static final String CONF_DEST_PREFIX = NAME + ".to.";
94
95 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
96
97 private static final String MR_NUM_MAPS = "mapreduce.job.maps";
98 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
99 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
111
112 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
113 static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
114
115 private static final String INPUT_FOLDER_PREFIX = "export-files.";
116
117
118 public enum Counter {
119 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121 }
122
123 private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124 NullWritable, NullWritable> {
125 final static int REPORT_SIZE = 1 * 1024 * 1024;
126 final static int BUFFER_SIZE = 64 * 1024;
127
128 private boolean testFailures;
129 private Random random;
130
131 private boolean verifyChecksum;
132 private String filesGroup;
133 private String filesUser;
134 private short filesMode;
135 private int bufferSize;
136
137 private FileSystem outputFs;
138 private Path outputArchive;
139 private Path outputRoot;
140
141 private FileSystem inputFs;
142 private Path inputArchive;
143 private Path inputRoot;
144
145 @Override
146 public void setup(Context context) throws IOException {
147 Configuration conf = context.getConfiguration();
148 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
149 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
150
151 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
152
153 filesGroup = conf.get(CONF_FILES_GROUP);
154 filesUser = conf.get(CONF_FILES_USER);
155 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
156 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
157 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
158
159 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
160 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
161
162 testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
163
164 try {
165 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166 inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
167 } catch (IOException e) {
168 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
169 }
170
171 try {
172 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
173 outputFs = FileSystem.get(outputRoot.toUri(), destConf);
174 } catch (IOException e) {
175 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
176 }
177
178
179 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
180 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
181 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
182
183 for (Counter c : Counter.values()) {
184 context.getCounter(c).increment(0);
185 }
186 }
187
188 @Override
189 protected void cleanup(Context context) {
190 IOUtils.closeStream(inputFs);
191 IOUtils.closeStream(outputFs);
192 }
193
194 @Override
195 public void map(BytesWritable key, NullWritable value, Context context)
196 throws InterruptedException, IOException {
197 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
198 Path outputPath = getOutputPath(inputInfo);
199
200 copyFile(context, inputInfo, outputPath);
201 }
202
203
204
205
206 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
207 Path path = null;
208 switch (inputInfo.getType()) {
209 case HFILE:
210 Path inputPath = new Path(inputInfo.getHfile());
211 String family = inputPath.getParent().getName();
212 TableName table =HFileLink.getReferencedTableName(inputPath.getName());
213 String region = HFileLink.getReferencedRegionName(inputPath.getName());
214 String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
215 path = new Path(FSUtils.getTableDir(new Path("./"), table),
216 new Path(region, new Path(family, hfile)));
217 break;
218 case WAL:
219 Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
220 path = new Path(oldLogsDir, inputInfo.getWalName());
221 break;
222 default:
223 throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
224 }
225 return new Path(outputArchive, path);
226 }
227
228
229
230
231 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
232 throws IOException {
233 if (testFailures) {
234 if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
235 if (random == null) {
236 random = new Random();
237 }
238
239
240
241
242 if (random.nextFloat() < 0.03) {
243 throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
244 + " time=" + System.currentTimeMillis());
245 }
246 } else {
247 context.getCounter(Counter.COPY_FAILED).increment(1);
248 throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
249 }
250 }
251 }
252
253 private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
254 final Path outputPath) throws IOException {
255 injectTestFailure(context, inputInfo);
256
257
258 FileStatus inputStat = getSourceFileStatus(context, inputInfo);
259
260
261 if (outputFs.exists(outputPath)) {
262 FileStatus outputStat = outputFs.getFileStatus(outputPath);
263 if (outputStat != null && sameFile(inputStat, outputStat)) {
264 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
265 context.getCounter(Counter.FILES_SKIPPED).increment(1);
266 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
267 return;
268 }
269 }
270
271 InputStream in = openSourceFile(context, inputInfo);
272 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
273 if (Integer.MAX_VALUE != bandwidthMB) {
274 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024L * 1024L);
275 }
276
277 try {
278 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
279
280
281 createOutputPath(outputPath.getParent());
282 FSDataOutputStream out = outputFs.create(outputPath, true);
283 try {
284 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
285 } finally {
286 out.close();
287 }
288
289
290 if (!preserveAttributes(outputPath, inputStat)) {
291 LOG.warn("You may have to run manually chown on: " + outputPath);
292 }
293 } finally {
294 in.close();
295 }
296 }
297
298
299
300
301 private void createOutputPath(final Path path) throws IOException {
302 if (filesUser == null && filesGroup == null) {
303 outputFs.mkdirs(path);
304 } else {
305 Path parent = path.getParent();
306 if (!outputFs.exists(parent) && !parent.isRoot()) {
307 createOutputPath(parent);
308 }
309 outputFs.mkdirs(path);
310 if (filesUser != null || filesGroup != null) {
311
312 outputFs.setOwner(path, filesUser, filesGroup);
313 }
314 if (filesMode > 0) {
315 outputFs.setPermission(path, new FsPermission(filesMode));
316 }
317 }
318 }
319
320
321
322
323
324
325
326
327
328 private boolean preserveAttributes(final Path path, final FileStatus refStat) {
329 FileStatus stat;
330 try {
331 stat = outputFs.getFileStatus(path);
332 } catch (IOException e) {
333 LOG.warn("Unable to get the status for file=" + path);
334 return false;
335 }
336
337 try {
338 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
339 outputFs.setPermission(path, new FsPermission(filesMode));
340 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
341 outputFs.setPermission(path, refStat.getPermission());
342 }
343 } catch (IOException e) {
344 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
345 return false;
346 }
347
348 boolean hasRefStat = (refStat != null);
349 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
350 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
351 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
352 try {
353 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
354 outputFs.setOwner(path, user, group);
355 }
356 } catch (IOException e) {
357 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
358 LOG.warn("The user/group may not exist on the destination cluster: user=" +
359 user + " group=" + group);
360 return false;
361 }
362 }
363
364 return true;
365 }
366
367 private boolean stringIsNotEmpty(final String str) {
368 return str != null && str.length() > 0;
369 }
370
371 private void copyData(final Context context,
372 final Path inputPath, final InputStream in,
373 final Path outputPath, final FSDataOutputStream out,
374 final long inputFileSize)
375 throws IOException {
376 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
377 " (%.1f%%)";
378
379 try {
380 byte[] buffer = new byte[bufferSize];
381 long totalBytesWritten = 0;
382 int reportBytes = 0;
383 int bytesRead;
384
385 long stime = System.currentTimeMillis();
386 while ((bytesRead = in.read(buffer)) > 0) {
387 out.write(buffer, 0, bytesRead);
388 totalBytesWritten += bytesRead;
389 reportBytes += bytesRead;
390
391 if (reportBytes >= REPORT_SIZE) {
392 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
393 context.setStatus(String.format(statusMessage,
394 StringUtils.humanReadableInt(totalBytesWritten),
395 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
396 " from " + inputPath + " to " + outputPath);
397 reportBytes = 0;
398 }
399 }
400 long etime = System.currentTimeMillis();
401
402 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
403 context.setStatus(String.format(statusMessage,
404 StringUtils.humanReadableInt(totalBytesWritten),
405 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
406 " from " + inputPath + " to " + outputPath);
407
408
409 if (totalBytesWritten != inputFileSize) {
410 String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
411 " expected=" + inputFileSize + " for file=" + inputPath;
412 throw new IOException(msg);
413 }
414
415 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
416 LOG.info("size=" + totalBytesWritten +
417 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
418 " time=" + StringUtils.formatTimeDiff(etime, stime) +
419 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
420 context.getCounter(Counter.FILES_COPIED).increment(1);
421 } catch (IOException e) {
422 LOG.error("Error copying " + inputPath + " to " + outputPath, e);
423 context.getCounter(Counter.COPY_FAILED).increment(1);
424 throw e;
425 }
426 }
427
428
429
430
431
432
433 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
434 throws IOException {
435 try {
436 Configuration conf = context.getConfiguration();
437 FileLink link = null;
438 switch (fileInfo.getType()) {
439 case HFILE:
440 Path inputPath = new Path(fileInfo.getHfile());
441 link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
442 break;
443 case WAL:
444 String serverName = fileInfo.getWalServer();
445 String logName = fileInfo.getWalName();
446 link = new WALLink(inputRoot, serverName, logName);
447 break;
448 default:
449 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
450 }
451 return link.open(inputFs);
452 } catch (IOException e) {
453 context.getCounter(Counter.MISSING_FILES).increment(1);
454 LOG.error("Unable to open source file=" + fileInfo.toString(), e);
455 throw e;
456 }
457 }
458
459 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
460 throws IOException {
461 try {
462 Configuration conf = context.getConfiguration();
463 FileLink link = null;
464 switch (fileInfo.getType()) {
465 case HFILE:
466 Path inputPath = new Path(fileInfo.getHfile());
467 link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
468 break;
469 case WAL:
470 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
471 break;
472 default:
473 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
474 }
475 return link.getFileStatus(inputFs);
476 } catch (FileNotFoundException e) {
477 context.getCounter(Counter.MISSING_FILES).increment(1);
478 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
479 throw e;
480 } catch (IOException e) {
481 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
482 throw e;
483 }
484 }
485
486 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
487 try {
488 return fs.getFileChecksum(path);
489 } catch (IOException e) {
490 LOG.warn("Unable to get checksum for file=" + path, e);
491 return null;
492 }
493 }
494
495
496
497
498
499 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
500
501 if (inputStat.getLen() != outputStat.getLen()) return false;
502
503
504 if (!verifyChecksum) return true;
505
506
507 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
508 if (inChecksum == null) return false;
509
510 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
511 if (outChecksum == null) return false;
512
513 return inChecksum.equals(outChecksum);
514 }
515 }
516
517
518
519
520
521
522
523
524
525 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
526 final FileSystem fs, final Path snapshotDir) throws IOException {
527 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
528
529 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
530 final TableName table = TableName.valueOf(snapshotDesc.getTable());
531
532
533 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
534 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
535 new SnapshotReferenceUtil.SnapshotVisitor() {
536 @Override
537 public void storeFile(final HRegionInfo regionInfo, final String family,
538 final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
539 if (storeFile.hasReference()) {
540
541 } else {
542 String region = regionInfo.getEncodedName();
543 String hfile = storeFile.getName();
544 Path path = HFileLink.createPath(table, region, family, hfile);
545
546 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
547 .setType(SnapshotFileInfo.Type.HFILE)
548 .setHfile(path.toString())
549 .build();
550
551 long size;
552 if (storeFile.hasFileSize()) {
553 size = storeFile.getFileSize();
554 } else {
555 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
556 }
557 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
558 }
559 }
560
561 @Override
562 public void logFile (final String server, final String logfile)
563 throws IOException {
564 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
565 .setType(SnapshotFileInfo.Type.WAL)
566 .setWalServer(server)
567 .setWalName(logfile)
568 .build();
569
570 long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
571 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
572 }
573 });
574
575 return files;
576 }
577
578
579
580
581
582
583
584
585
586 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
587 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
588
589 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
590 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
591 long r = a.getSecond() - b.getSecond();
592 return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
593 }
594 });
595
596
597 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
598 new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
599 long[] sizeGroups = new long[ngroups];
600 int hi = files.size() - 1;
601 int lo = 0;
602
603 List<Pair<SnapshotFileInfo, Long>> group;
604 int dir = 1;
605 int g = 0;
606
607 while (hi >= lo) {
608 if (g == fileGroups.size()) {
609 group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
610 fileGroups.add(group);
611 } else {
612 group = fileGroups.get(g);
613 }
614
615 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
616
617
618 sizeGroups[g] += fileInfo.getSecond();
619 group.add(fileInfo);
620
621
622 g += dir;
623 if (g == ngroups) {
624 dir = -1;
625 g = ngroups - 1;
626 } else if (g < 0) {
627 dir = 1;
628 g = 0;
629 }
630 }
631
632 if (LOG.isDebugEnabled()) {
633 for (int i = 0; i < sizeGroups.length; ++i) {
634 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
635 }
636 }
637
638 return fileGroups;
639 }
640
641 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
642 @Override
643 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
644 TaskAttemptContext tac) throws IOException, InterruptedException {
645 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
646 }
647
648 @Override
649 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
650 Configuration conf = context.getConfiguration();
651 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
652 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
653
654 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
655 int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
656 if (mappers == 0 && snapshotFiles.size() > 0) {
657 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
658 mappers = Math.min(mappers, snapshotFiles.size());
659 conf.setInt(CONF_NUM_SPLITS, mappers);
660 conf.setInt(MR_NUM_MAPS, mappers);
661 }
662
663 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
664 List<InputSplit> splits = new ArrayList(groups.size());
665 for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
666 splits.add(new ExportSnapshotInputSplit(files));
667 }
668 return splits;
669 }
670
671 private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
672 private List<Pair<BytesWritable, Long>> files;
673 private long length;
674
675 public ExportSnapshotInputSplit() {
676 this.files = null;
677 }
678
679 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
680 this.files = new ArrayList(snapshotFiles.size());
681 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
682 this.files.add(new Pair<BytesWritable, Long>(
683 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
684 this.length += fileInfo.getSecond();
685 }
686 }
687
688 private List<Pair<BytesWritable, Long>> getSplitKeys() {
689 return files;
690 }
691
692 @Override
693 public long getLength() throws IOException, InterruptedException {
694 return length;
695 }
696
697 @Override
698 public String[] getLocations() throws IOException, InterruptedException {
699 return new String[] {};
700 }
701
702 @Override
703 public void readFields(DataInput in) throws IOException {
704 int count = in.readInt();
705 files = new ArrayList<Pair<BytesWritable, Long>>(count);
706 length = 0;
707 for (int i = 0; i < count; ++i) {
708 BytesWritable fileInfo = new BytesWritable();
709 fileInfo.readFields(in);
710 long size = in.readLong();
711 files.add(new Pair<BytesWritable, Long>(fileInfo, size));
712 length += size;
713 }
714 }
715
716 @Override
717 public void write(DataOutput out) throws IOException {
718 out.writeInt(files.size());
719 for (final Pair<BytesWritable, Long> fileInfo: files) {
720 fileInfo.getFirst().write(out);
721 out.writeLong(fileInfo.getSecond());
722 }
723 }
724 }
725
726 private static class ExportSnapshotRecordReader
727 extends RecordReader<BytesWritable, NullWritable> {
728 private final List<Pair<BytesWritable, Long>> files;
729 private long totalSize = 0;
730 private long procSize = 0;
731 private int index = -1;
732
733 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
734 this.files = files;
735 for (Pair<BytesWritable, Long> fileInfo: files) {
736 totalSize += fileInfo.getSecond();
737 }
738 }
739
740 @Override
741 public void close() { }
742
743 @Override
744 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
745
746 @Override
747 public NullWritable getCurrentValue() { return NullWritable.get(); }
748
749 @Override
750 public float getProgress() { return (float)procSize / totalSize; }
751
752 @Override
753 public void initialize(InputSplit split, TaskAttemptContext tac) { }
754
755 @Override
756 public boolean nextKeyValue() {
757 if (index >= 0) {
758 procSize += files.get(index).getSecond();
759 }
760 return(++index < files.size());
761 }
762 }
763 }
764
765
766
767
768
769
770
771
772 private void runCopyJob(final Path inputRoot, final Path outputRoot,
773 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
774 final String filesUser, final String filesGroup, final int filesMode,
775 final int mappers, final int bandwidthMB)
776 throws IOException, InterruptedException, ClassNotFoundException {
777 Configuration conf = getConf();
778 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
779 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
780 if (mappers > 0) {
781 conf.setInt(CONF_NUM_SPLITS, mappers);
782 conf.setInt(MR_NUM_MAPS, mappers);
783 }
784 conf.setInt(CONF_FILES_MODE, filesMode);
785 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
786 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
787 conf.set(CONF_INPUT_ROOT, inputRoot.toString());
788 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
789 conf.set(CONF_SNAPSHOT_NAME, snapshotName);
790 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
791
792 Job job = new Job(conf);
793 job.setJobName("ExportSnapshot-" + snapshotName);
794 job.setJarByClass(ExportSnapshot.class);
795 TableMapReduceUtil.addDependencyJars(job);
796 job.setMapperClass(ExportMapper.class);
797 job.setInputFormatClass(ExportSnapshotInputFormat.class);
798 job.setOutputFormatClass(NullOutputFormat.class);
799 job.setMapSpeculativeExecution(false);
800 job.setNumReduceTasks(0);
801
802
803 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
804 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
805 new Path[] { inputRoot }, srcConf);
806 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
807 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
808 new Path[] { outputRoot }, destConf);
809
810
811 if (!job.waitForCompletion(true)) {
812 throw new ExportSnapshotException(job.getStatus().getFailureInfo());
813 }
814 }
815
816 private void verifySnapshot(final Configuration baseConf,
817 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
818
819 Configuration conf = new Configuration(baseConf);
820 FSUtils.setRootDir(conf, rootDir);
821 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
822 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
823 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
824 }
825
826
827
828
829 private void setOwner(final FileSystem fs, final Path path, final String user,
830 final String group, final boolean recursive) throws IOException {
831 if (user != null || group != null) {
832 if (recursive && fs.isDirectory(path)) {
833 for (FileStatus child : fs.listStatus(path)) {
834 setOwner(fs, child.getPath(), user, group, recursive);
835 }
836 }
837 fs.setOwner(path, user, group);
838 }
839 }
840
841
842
843
844 private void setPermission(final FileSystem fs, final Path path, final short filesMode,
845 final boolean recursive) throws IOException {
846 if (filesMode > 0) {
847 FsPermission perm = new FsPermission(filesMode);
848 if (recursive && fs.isDirectory(path)) {
849 for (FileStatus child : fs.listStatus(path)) {
850 setPermission(fs, child.getPath(), filesMode, recursive);
851 }
852 }
853 fs.setPermission(path, perm);
854 }
855 }
856
857
858
859
860
861 @Override
862 public int run(String[] args) throws IOException {
863 boolean verifyTarget = true;
864 boolean verifyChecksum = true;
865 String snapshotName = null;
866 String targetName = null;
867 boolean overwrite = false;
868 String filesGroup = null;
869 String filesUser = null;
870 Path outputRoot = null;
871 int bandwidthMB = Integer.MAX_VALUE;
872 int filesMode = 0;
873 int mappers = 0;
874
875 Configuration conf = getConf();
876 Path inputRoot = FSUtils.getRootDir(conf);
877
878
879 for (int i = 0; i < args.length; i++) {
880 String cmd = args[i];
881 if (cmd.equals("-snapshot")) {
882 snapshotName = args[++i];
883 } else if (cmd.equals("-target")) {
884 targetName = args[++i];
885 } else if (cmd.equals("-copy-to")) {
886 outputRoot = new Path(args[++i]);
887 } else if (cmd.equals("-copy-from")) {
888 inputRoot = new Path(args[++i]);
889 FSUtils.setRootDir(conf, inputRoot);
890 } else if (cmd.equals("-no-checksum-verify")) {
891 verifyChecksum = false;
892 } else if (cmd.equals("-no-target-verify")) {
893 verifyTarget = false;
894 } else if (cmd.equals("-mappers")) {
895 mappers = Integer.parseInt(args[++i]);
896 } else if (cmd.equals("-chuser")) {
897 filesUser = args[++i];
898 } else if (cmd.equals("-chgroup")) {
899 filesGroup = args[++i];
900 } else if (cmd.equals("-bandwidth")) {
901 bandwidthMB = Integer.parseInt(args[++i]);
902 } else if (cmd.equals("-chmod")) {
903 filesMode = Integer.parseInt(args[++i], 8);
904 } else if (cmd.equals("-overwrite")) {
905 overwrite = true;
906 } else if (cmd.equals("-h") || cmd.equals("--help")) {
907 printUsageAndExit();
908 } else {
909 System.err.println("UNEXPECTED: " + cmd);
910 printUsageAndExit();
911 }
912 }
913
914
915 if (snapshotName == null) {
916 System.err.println("Snapshot name not provided.");
917 printUsageAndExit();
918 }
919
920 if (outputRoot == null) {
921 System.err.println("Destination file-system not provided.");
922 printUsageAndExit();
923 }
924
925 if (targetName == null) {
926 targetName = snapshotName;
927 }
928
929 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
930 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
931 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
932 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
933 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
934 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
935 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
936 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
937
938 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
939
940 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
941 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
942 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
943 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
944
945
946 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot);
947 if (outputFs.exists(needSetOwnerDir)) {
948 if (skipTmp) {
949 needSetOwnerDir = outputSnapshotDir;
950 } else {
951 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot);
952 if (outputFs.exists(needSetOwnerDir)) {
953 needSetOwnerDir = snapshotTmpDir;
954 }
955 }
956 }
957
958
959 if (outputFs.exists(outputSnapshotDir)) {
960 if (overwrite) {
961 if (!outputFs.delete(outputSnapshotDir, true)) {
962 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
963 return 1;
964 }
965 } else {
966 System.err.println("The snapshot '" + targetName +
967 "' already exists in the destination: " + outputSnapshotDir);
968 return 1;
969 }
970 }
971
972 if (!skipTmp) {
973
974 if (outputFs.exists(snapshotTmpDir)) {
975 if (overwrite) {
976 if (!outputFs.delete(snapshotTmpDir, true)) {
977 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
978 return 1;
979 }
980 } else {
981 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
982 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
983 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
984 return 1;
985 }
986 }
987 }
988
989
990
991
992 try {
993 LOG.info("Copy Snapshot Manifest");
994 FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
995 } catch (IOException e) {
996 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
997 snapshotDir + " to=" + initialOutputSnapshotDir, e);
998 } finally {
999 if (filesUser != null || filesGroup != null) {
1000 LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to "
1001 + filesUser)
1002 + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
1003 + filesGroup));
1004 setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
1005 }
1006 if (filesMode > 0) {
1007 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
1008 setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
1009 }
1010 }
1011
1012
1013 if (!targetName.equals(snapshotName)) {
1014 SnapshotDescription snapshotDesc =
1015 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
1016 .toBuilder()
1017 .setName(targetName)
1018 .build();
1019 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
1020 }
1021
1022
1023
1024
1025 try {
1026 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1027 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1028
1029 LOG.info("Finalize the Snapshot Export");
1030 if (!skipTmp) {
1031
1032 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1033 throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1034 snapshotTmpDir + " to=" + outputSnapshotDir);
1035 }
1036 }
1037
1038
1039 if (verifyTarget) {
1040 LOG.info("Verify snapshot integrity");
1041 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1042 }
1043
1044 LOG.info("Export Completed: " + targetName);
1045 return 0;
1046 } catch (Exception e) {
1047 LOG.error("Snapshot export failed", e);
1048 if (!skipTmp) {
1049 outputFs.delete(snapshotTmpDir, true);
1050 }
1051 outputFs.delete(outputSnapshotDir, true);
1052 return 1;
1053 } finally {
1054 IOUtils.closeStream(inputFs);
1055 IOUtils.closeStream(outputFs);
1056 }
1057 }
1058
1059
1060 private void printUsageAndExit() {
1061 System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1062 System.err.println(" where [options] are:");
1063 System.err.println(" -h|-help Show this help and exit.");
1064 System.err.println(" -snapshot NAME Snapshot to restore.");
1065 System.err.println(" -copy-to NAME Remote destination hdfs://");
1066 System.err.println(" -copy-from NAME Input folder hdfs:// (default hbase.rootdir)");
1067 System.err.println(" -no-checksum-verify Do not verify checksum, use name+length only.");
1068 System.err.println(" -no-target-verify Do not verify the integrity of the \\" +
1069 "exported snapshot.");
1070 System.err.println(" -overwrite Rewrite the snapshot manifest if already exists");
1071 System.err.println(" -chuser USERNAME Change the owner of the files " +
1072 "to the specified one.");
1073 System.err.println(" -chgroup GROUP Change the group of the files to " +
1074 "the specified one.");
1075 System.err.println(" -chmod MODE Change the permission of the files " +
1076 "to the specified one.");
1077 System.err.println(" -mappers Number of mappers to use during the " +
1078 "copy (mapreduce.job.maps).");
1079 System.err.println(" -bandwidth Limit bandwidth to this value in MB/second.");
1080 System.err.println();
1081 System.err.println("Examples:");
1082 System.err.println(" hbase " + getClass().getName() + " \\");
1083 System.err.println(" -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1084 System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1085 System.err.println();
1086 System.err.println(" hbase " + getClass().getName() + " \\");
1087 System.err.println(" -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1088 System.err.println(" -copy-to hdfs://srv1:50070/hbase \\");
1089 System.exit(1);
1090 }
1091
1092
1093
1094
1095
1096
1097
1098
1099 static int innerMain(final Configuration conf, final String [] args) throws Exception {
1100 return ToolRunner.run(conf, new ExportSnapshot(), args);
1101 }
1102
1103 public static void main(String[] args) throws Exception {
1104 System.exit(innerMain(HBaseConfiguration.create(), args));
1105 }
1106 }