1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.BufferedInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.conf.Configured;
40 import org.apache.hadoop.fs.FSDataInputStream;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileChecksum;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.FileUtil;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.permission.FsPermission;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.HBaseConfiguration;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.io.FileLink;
53 import org.apache.hadoop.hbase.io.HFileLink;
54 import org.apache.hadoop.hbase.io.WALLink;
55 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59 import org.apache.hadoop.hbase.util.FSUtils;
60 import org.apache.hadoop.hbase.util.Pair;
61 import org.apache.hadoop.io.BytesWritable;
62 import org.apache.hadoop.io.IOUtils;
63 import org.apache.hadoop.io.NullWritable;
64 import org.apache.hadoop.io.Writable;
65 import org.apache.hadoop.mapreduce.Job;
66 import org.apache.hadoop.mapreduce.JobContext;
67 import org.apache.hadoop.mapreduce.Mapper;
68 import org.apache.hadoop.mapreduce.InputFormat;
69 import org.apache.hadoop.mapreduce.InputSplit;
70 import org.apache.hadoop.mapreduce.RecordReader;
71 import org.apache.hadoop.mapreduce.TaskAttemptContext;
72 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73 import org.apache.hadoop.mapreduce.security.TokenCache;
74 import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75 import org.apache.hadoop.util.StringUtils;
76 import org.apache.hadoop.util.Tool;
77 import org.apache.hadoop.util.ToolRunner;
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Public
87 @InterfaceStability.Evolving
88 public class ExportSnapshot extends Configured implements Tool {
89 public static final String NAME = "exportsnapshot";
90
91 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
92
93 private static final String MR_NUM_MAPS = "mapreduce.job.maps";
94 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
95 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
96 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
97 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
98 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
99 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
100 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
101 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
102 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
103 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
104 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
105 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
106 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
107
108 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
109 static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
110
111 private static final String INPUT_FOLDER_PREFIX = "export-files.";
112
113
114 public enum Counter {
115 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
116 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
117 }
118
119 private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
120 NullWritable, NullWritable> {
121 final static int REPORT_SIZE = 1 * 1024 * 1024;
122 final static int BUFFER_SIZE = 64 * 1024;
123
124 private boolean testFailures;
125 private Random random;
126
127 private boolean verifyChecksum;
128 private String filesGroup;
129 private String filesUser;
130 private short filesMode;
131 private int bufferSize;
132
133 private FileSystem outputFs;
134 private Path outputArchive;
135 private Path outputRoot;
136
137 private FileSystem inputFs;
138 private Path inputArchive;
139 private Path inputRoot;
140
141 @Override
142 public void setup(Context context) throws IOException {
143 Configuration conf = context.getConfiguration();
144 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
145
146 filesGroup = conf.get(CONF_FILES_GROUP);
147 filesUser = conf.get(CONF_FILES_USER);
148 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
149 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
150 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
151
152 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
153 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
154
155 testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
156
157 try {
158 conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
159 inputFs = FileSystem.get(inputRoot.toUri(), conf);
160 } catch (IOException e) {
161 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
162 }
163
164 try {
165 conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166 outputFs = FileSystem.get(outputRoot.toUri(), conf);
167 } catch (IOException e) {
168 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
169 }
170
171
172 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
173 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
174 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
175
176 for (Counter c : Counter.values()) {
177 context.getCounter(c).increment(0);
178 }
179 }
180
181 @Override
182 protected void cleanup(Context context) {
183 IOUtils.closeStream(inputFs);
184 IOUtils.closeStream(outputFs);
185 }
186
187 @Override
188 public void map(BytesWritable key, NullWritable value, Context context)
189 throws InterruptedException, IOException {
190 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
191 Path outputPath = getOutputPath(inputInfo);
192
193 copyFile(context, inputInfo, outputPath);
194 }
195
196
197
198
199 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
200 Path path = null;
201 switch (inputInfo.getType()) {
202 case HFILE:
203 Path inputPath = new Path(inputInfo.getHfile());
204 String family = inputPath.getParent().getName();
205 TableName table =HFileLink.getReferencedTableName(inputPath.getName());
206 String region = HFileLink.getReferencedRegionName(inputPath.getName());
207 String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
208 path = new Path(FSUtils.getTableDir(new Path("./"), table),
209 new Path(region, new Path(family, hfile)));
210 break;
211 case WAL:
212 Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
213 path = new Path(oldLogsDir, inputInfo.getWalName());
214 break;
215 default:
216 throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
217 }
218 return new Path(outputArchive, path);
219 }
220
221
222
223
224 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
225 throws IOException {
226 if (testFailures) {
227 if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
228 if (random == null) {
229 random = new Random();
230 }
231
232
233
234
235 if (random.nextFloat() < 0.03) {
236 throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
237 + " time=" + System.currentTimeMillis());
238 }
239 } else {
240 context.getCounter(Counter.COPY_FAILED).increment(1);
241 throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
242 }
243 }
244 }
245
246 private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
247 final Path outputPath) throws IOException {
248 injectTestFailure(context, inputInfo);
249
250
251 FileStatus inputStat = getSourceFileStatus(context, inputInfo);
252
253
254 if (outputFs.exists(outputPath)) {
255 FileStatus outputStat = outputFs.getFileStatus(outputPath);
256 if (outputStat != null && sameFile(inputStat, outputStat)) {
257 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
258 context.getCounter(Counter.FILES_SKIPPED).increment(1);
259 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
260 return;
261 }
262 }
263
264 InputStream in = openSourceFile(context, inputInfo);
265 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
266 if (Integer.MAX_VALUE != bandwidthMB) {
267 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
268 }
269
270 try {
271 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
272
273
274 createOutputPath(outputPath.getParent());
275 FSDataOutputStream out = outputFs.create(outputPath, true);
276 try {
277 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
278 } finally {
279 out.close();
280 }
281
282
283 if (!preserveAttributes(outputPath, inputStat)) {
284 LOG.warn("You may have to run manually chown on: " + outputPath);
285 }
286 } finally {
287 in.close();
288 }
289 }
290
291
292
293
294 private void createOutputPath(final Path path) throws IOException {
295 if (filesUser == null && filesGroup == null) {
296 outputFs.mkdirs(path);
297 } else {
298 Path parent = path.getParent();
299 if (!outputFs.exists(parent) && !parent.isRoot()) {
300 createOutputPath(parent);
301 }
302 outputFs.mkdirs(path);
303 if (filesUser != null || filesGroup != null) {
304
305 outputFs.setOwner(path, filesUser, filesGroup);
306 }
307 if (filesMode > 0) {
308 outputFs.setPermission(path, new FsPermission(filesMode));
309 }
310 }
311 }
312
313
314
315
316
317
318
319
320
321 private boolean preserveAttributes(final Path path, final FileStatus refStat) {
322 FileStatus stat;
323 try {
324 stat = outputFs.getFileStatus(path);
325 } catch (IOException e) {
326 LOG.warn("Unable to get the status for file=" + path);
327 return false;
328 }
329
330 try {
331 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
332 outputFs.setPermission(path, new FsPermission(filesMode));
333 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
334 outputFs.setPermission(path, refStat.getPermission());
335 }
336 } catch (IOException e) {
337 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
338 return false;
339 }
340
341 boolean hasRefStat = (refStat != null);
342 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
343 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
344 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
345 try {
346 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
347 outputFs.setOwner(path, user, group);
348 }
349 } catch (IOException e) {
350 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
351 LOG.warn("The user/group may not exist on the destination cluster: user=" +
352 user + " group=" + group);
353 return false;
354 }
355 }
356
357 return true;
358 }
359
360 private boolean stringIsNotEmpty(final String str) {
361 return str != null && str.length() > 0;
362 }
363
364 private void copyData(final Context context,
365 final Path inputPath, final InputStream in,
366 final Path outputPath, final FSDataOutputStream out,
367 final long inputFileSize)
368 throws IOException {
369 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
370 " (%.1f%%)";
371
372 try {
373 byte[] buffer = new byte[bufferSize];
374 long totalBytesWritten = 0;
375 int reportBytes = 0;
376 int bytesRead;
377
378 long stime = System.currentTimeMillis();
379 while ((bytesRead = in.read(buffer)) > 0) {
380 out.write(buffer, 0, bytesRead);
381 totalBytesWritten += bytesRead;
382 reportBytes += bytesRead;
383
384 if (reportBytes >= REPORT_SIZE) {
385 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
386 context.setStatus(String.format(statusMessage,
387 StringUtils.humanReadableInt(totalBytesWritten),
388 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
389 " from " + inputPath + " to " + outputPath);
390 reportBytes = 0;
391 }
392 }
393 long etime = System.currentTimeMillis();
394
395 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
396 context.setStatus(String.format(statusMessage,
397 StringUtils.humanReadableInt(totalBytesWritten),
398 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
399 " from " + inputPath + " to " + outputPath);
400
401
402 if (totalBytesWritten != inputFileSize) {
403 String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
404 " expected=" + inputFileSize + " for file=" + inputPath;
405 throw new IOException(msg);
406 }
407
408 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
409 LOG.info("size=" + totalBytesWritten +
410 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
411 " time=" + StringUtils.formatTimeDiff(etime, stime) +
412 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
413 context.getCounter(Counter.FILES_COPIED).increment(1);
414 } catch (IOException e) {
415 LOG.error("Error copying " + inputPath + " to " + outputPath, e);
416 context.getCounter(Counter.COPY_FAILED).increment(1);
417 throw e;
418 }
419 }
420
421
422
423
424
425
426 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
427 throws IOException {
428 try {
429 Configuration conf = context.getConfiguration();
430 FileLink link = null;
431 switch (fileInfo.getType()) {
432 case HFILE:
433 Path inputPath = new Path(fileInfo.getHfile());
434 link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
435 break;
436 case WAL:
437 String serverName = fileInfo.getWalServer();
438 String logName = fileInfo.getWalName();
439 link = new WALLink(inputRoot, serverName, logName);
440 break;
441 default:
442 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
443 }
444 return link.open(inputFs);
445 } catch (IOException e) {
446 context.getCounter(Counter.MISSING_FILES).increment(1);
447 LOG.error("Unable to open source file=" + fileInfo.toString(), e);
448 throw e;
449 }
450 }
451
452 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
453 throws IOException {
454 try {
455 Configuration conf = context.getConfiguration();
456 FileLink link = null;
457 switch (fileInfo.getType()) {
458 case HFILE:
459 Path inputPath = new Path(fileInfo.getHfile());
460 link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
461 break;
462 case WAL:
463 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
464 break;
465 default:
466 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
467 }
468 return link.getFileStatus(inputFs);
469 } catch (FileNotFoundException e) {
470 context.getCounter(Counter.MISSING_FILES).increment(1);
471 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
472 throw e;
473 } catch (IOException e) {
474 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
475 throw e;
476 }
477 }
478
479 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
480 try {
481 return fs.getFileChecksum(path);
482 } catch (IOException e) {
483 LOG.warn("Unable to get checksum for file=" + path, e);
484 return null;
485 }
486 }
487
488
489
490
491
492 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
493
494 if (inputStat.getLen() != outputStat.getLen()) return false;
495
496
497 if (!verifyChecksum) return true;
498
499
500 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
501 if (inChecksum == null) return false;
502
503 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
504 if (outChecksum == null) return false;
505
506 return inChecksum.equals(outChecksum);
507 }
508 }
509
510
511
512
513
514
515
516
517
518 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
519 final FileSystem fs, final Path snapshotDir) throws IOException {
520 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
521
522 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
523 final TableName table = TableName.valueOf(snapshotDesc.getTable());
524
525
526 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
527 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
528 new SnapshotReferenceUtil.SnapshotVisitor() {
529 @Override
530 public void storeFile(final HRegionInfo regionInfo, final String family,
531 final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
532 if (storeFile.hasReference()) {
533
534 } else {
535 String region = regionInfo.getEncodedName();
536 String hfile = storeFile.getName();
537 Path path = HFileLink.createPath(table, region, family, hfile);
538
539 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
540 .setType(SnapshotFileInfo.Type.HFILE)
541 .setHfile(path.toString())
542 .build();
543
544 long size;
545 if (storeFile.hasFileSize()) {
546 size = storeFile.getFileSize();
547 } else {
548 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
549 }
550 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
551 }
552 }
553
554 @Override
555 public void logFile (final String server, final String logfile)
556 throws IOException {
557 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
558 .setType(SnapshotFileInfo.Type.WAL)
559 .setWalServer(server)
560 .setWalName(logfile)
561 .build();
562
563 long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
564 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
565 }
566 });
567
568 return files;
569 }
570
571
572
573
574
575
576
577
578
579 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
580 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
581
582 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
583 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
584 long r = a.getSecond() - b.getSecond();
585 return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
586 }
587 });
588
589
590 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
591 new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
592 long[] sizeGroups = new long[ngroups];
593 int hi = files.size() - 1;
594 int lo = 0;
595
596 List<Pair<SnapshotFileInfo, Long>> group;
597 int dir = 1;
598 int g = 0;
599
600 while (hi >= lo) {
601 if (g == fileGroups.size()) {
602 group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
603 fileGroups.add(group);
604 } else {
605 group = fileGroups.get(g);
606 }
607
608 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
609
610
611 sizeGroups[g] += fileInfo.getSecond();
612 group.add(fileInfo);
613
614
615 g += dir;
616 if (g == ngroups) {
617 dir = -1;
618 g = ngroups - 1;
619 } else if (g < 0) {
620 dir = 1;
621 g = 0;
622 }
623 }
624
625 if (LOG.isDebugEnabled()) {
626 for (int i = 0; i < sizeGroups.length; ++i) {
627 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
628 }
629 }
630
631 return fileGroups;
632 }
633
634 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
635 @Override
636 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
637 TaskAttemptContext tac) throws IOException, InterruptedException {
638 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
639 }
640
641 @Override
642 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
643 Configuration conf = context.getConfiguration();
644 String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
645 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
646 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
647
648 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
649 int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
650 if (mappers == 0 && snapshotFiles.size() > 0) {
651 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
652 mappers = Math.min(mappers, snapshotFiles.size());
653 conf.setInt(CONF_NUM_SPLITS, mappers);
654 conf.setInt(MR_NUM_MAPS, mappers);
655 }
656
657 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
658 List<InputSplit> splits = new ArrayList(groups.size());
659 for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
660 splits.add(new ExportSnapshotInputSplit(files));
661 }
662 return splits;
663 }
664
665 private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
666 private List<Pair<BytesWritable, Long>> files;
667 private long length;
668
669 public ExportSnapshotInputSplit() {
670 this.files = null;
671 }
672
673 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
674 this.files = new ArrayList(snapshotFiles.size());
675 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
676 this.files.add(new Pair<BytesWritable, Long>(
677 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
678 this.length += fileInfo.getSecond();
679 }
680 }
681
682 private List<Pair<BytesWritable, Long>> getSplitKeys() {
683 return files;
684 }
685
686 @Override
687 public long getLength() throws IOException, InterruptedException {
688 return length;
689 }
690
691 @Override
692 public String[] getLocations() throws IOException, InterruptedException {
693 return new String[] {};
694 }
695
696 @Override
697 public void readFields(DataInput in) throws IOException {
698 int count = in.readInt();
699 files = new ArrayList<Pair<BytesWritable, Long>>(count);
700 length = 0;
701 for (int i = 0; i < count; ++i) {
702 BytesWritable fileInfo = new BytesWritable();
703 fileInfo.readFields(in);
704 long size = in.readLong();
705 files.add(new Pair<BytesWritable, Long>(fileInfo, size));
706 length += size;
707 }
708 }
709
710 @Override
711 public void write(DataOutput out) throws IOException {
712 out.writeInt(files.size());
713 for (final Pair<BytesWritable, Long> fileInfo: files) {
714 fileInfo.getFirst().write(out);
715 out.writeLong(fileInfo.getSecond());
716 }
717 }
718 }
719
720 private static class ExportSnapshotRecordReader
721 extends RecordReader<BytesWritable, NullWritable> {
722 private final List<Pair<BytesWritable, Long>> files;
723 private long totalSize = 0;
724 private long procSize = 0;
725 private int index = -1;
726
727 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
728 this.files = files;
729 for (Pair<BytesWritable, Long> fileInfo: files) {
730 totalSize += fileInfo.getSecond();
731 }
732 }
733
734 @Override
735 public void close() { }
736
737 @Override
738 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
739
740 @Override
741 public NullWritable getCurrentValue() { return NullWritable.get(); }
742
743 @Override
744 public float getProgress() { return (float)procSize / totalSize; }
745
746 @Override
747 public void initialize(InputSplit split, TaskAttemptContext tac) { }
748
749 @Override
750 public boolean nextKeyValue() {
751 if (index >= 0) {
752 procSize += files.get(index).getSecond();
753 }
754 return(++index < files.size());
755 }
756 }
757 }
758
759
760
761
762
763
764
765
766 private void runCopyJob(final Path inputRoot, final Path outputRoot,
767 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
768 final String filesUser, final String filesGroup, final int filesMode,
769 final int mappers, final int bandwidthMB)
770 throws IOException, InterruptedException, ClassNotFoundException {
771 Configuration conf = getConf();
772 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
773 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
774 if (mappers > 0) {
775 conf.setInt(CONF_NUM_SPLITS, mappers);
776 conf.setInt(MR_NUM_MAPS, mappers);
777 }
778 conf.setInt(CONF_FILES_MODE, filesMode);
779 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
780 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
781 conf.set(CONF_INPUT_ROOT, inputRoot.toString());
782 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
783 conf.set(CONF_SNAPSHOT_NAME, snapshotName);
784 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
785
786 Job job = new Job(conf);
787 job.setJobName("ExportSnapshot-" + snapshotName);
788 job.setJarByClass(ExportSnapshot.class);
789 TableMapReduceUtil.addDependencyJars(job);
790 job.setMapperClass(ExportMapper.class);
791 job.setInputFormatClass(ExportSnapshotInputFormat.class);
792 job.setOutputFormatClass(NullOutputFormat.class);
793 job.setMapSpeculativeExecution(false);
794 job.setNumReduceTasks(0);
795
796
797 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
798 new Path[] { inputRoot, outputRoot }, conf);
799
800
801 if (!job.waitForCompletion(true)) {
802
803
804 throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
805 }
806 }
807
808 private void verifySnapshot(final Configuration baseConf,
809 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
810
811 Configuration conf = new Configuration(baseConf);
812 FSUtils.setRootDir(conf, rootDir);
813 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
814 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
815 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
816 }
817
818
819
820
821 private void setOwner(final FileSystem fs, final Path path, final String user,
822 final String group, final boolean recursive) throws IOException {
823 if (user != null || group != null) {
824 if (recursive && fs.isDirectory(path)) {
825 for (FileStatus child : fs.listStatus(path)) {
826 setOwner(fs, child.getPath(), user, group, recursive);
827 }
828 }
829 fs.setOwner(path, user, group);
830 }
831 }
832
833
834
835
836 private void setPermission(final FileSystem fs, final Path path, final short filesMode,
837 final boolean recursive) throws IOException {
838 if (filesMode > 0) {
839 FsPermission perm = new FsPermission(filesMode);
840 if (recursive && fs.isDirectory(path)) {
841 for (FileStatus child : fs.listStatus(path)) {
842 setPermission(fs, child.getPath(), filesMode, recursive);
843 }
844 }
845 fs.setPermission(path, perm);
846 }
847 }
848
849
850
851
852
853 @Override
854 public int run(String[] args) throws IOException {
855 boolean verifyTarget = true;
856 boolean verifyChecksum = true;
857 String snapshotName = null;
858 String targetName = null;
859 boolean overwrite = false;
860 String filesGroup = null;
861 String filesUser = null;
862 Path outputRoot = null;
863 int bandwidthMB = Integer.MAX_VALUE;
864 int filesMode = 0;
865 int mappers = 0;
866
867 Configuration conf = getConf();
868 Path inputRoot = FSUtils.getRootDir(conf);
869
870
871 for (int i = 0; i < args.length; i++) {
872 String cmd = args[i];
873 if (cmd.equals("-snapshot")) {
874 snapshotName = args[++i];
875 } else if (cmd.equals("-target")) {
876 targetName = args[++i];
877 } else if (cmd.equals("-copy-to")) {
878 outputRoot = new Path(args[++i]);
879 } else if (cmd.equals("-copy-from")) {
880 inputRoot = new Path(args[++i]);
881 FSUtils.setRootDir(conf, inputRoot);
882 } else if (cmd.equals("-no-checksum-verify")) {
883 verifyChecksum = false;
884 } else if (cmd.equals("-no-target-verify")) {
885 verifyTarget = false;
886 } else if (cmd.equals("-mappers")) {
887 mappers = Integer.parseInt(args[++i]);
888 } else if (cmd.equals("-chuser")) {
889 filesUser = args[++i];
890 } else if (cmd.equals("-chgroup")) {
891 filesGroup = args[++i];
892 } else if (cmd.equals("-bandwidth")) {
893 bandwidthMB = Integer.parseInt(args[++i]);
894 } else if (cmd.equals("-chmod")) {
895 filesMode = Integer.parseInt(args[++i], 8);
896 } else if (cmd.equals("-overwrite")) {
897 overwrite = true;
898 } else if (cmd.equals("-h") || cmd.equals("--help")) {
899 printUsageAndExit();
900 } else {
901 System.err.println("UNEXPECTED: " + cmd);
902 printUsageAndExit();
903 }
904 }
905
906
907 if (snapshotName == null) {
908 System.err.println("Snapshot name not provided.");
909 printUsageAndExit();
910 }
911
912 if (outputRoot == null) {
913 System.err.println("Destination file-system not provided.");
914 printUsageAndExit();
915 }
916
917 if (targetName == null) {
918 targetName = snapshotName;
919 }
920
921 conf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
922 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), conf);
923 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
924 conf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
925 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), conf);
926 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
927
928 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
929
930 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
931 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
932 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
933 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
934
935
936 if (outputFs.exists(outputSnapshotDir)) {
937 if (overwrite) {
938 if (!outputFs.delete(outputSnapshotDir, true)) {
939 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
940 return 1;
941 }
942 } else {
943 System.err.println("The snapshot '" + targetName +
944 "' already exists in the destination: " + outputSnapshotDir);
945 return 1;
946 }
947 }
948
949 if (!skipTmp) {
950
951 if (outputFs.exists(snapshotTmpDir)) {
952 if (overwrite) {
953 if (!outputFs.delete(snapshotTmpDir, true)) {
954 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
955 return 1;
956 }
957 } else {
958 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
959 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
960 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
961 return 1;
962 }
963 }
964 }
965
966
967
968
969 try {
970 LOG.info("Copy Snapshot Manifest");
971 FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
972 if (filesUser != null || filesGroup != null) {
973 setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
974 }
975 if (filesMode > 0) {
976 setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
977 }
978 } catch (IOException e) {
979 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
980 snapshotDir + " to=" + initialOutputSnapshotDir, e);
981 }
982
983
984 if (!targetName.equals(snapshotName)) {
985 SnapshotDescription snapshotDesc =
986 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
987 .toBuilder()
988 .setName(targetName)
989 .build();
990 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
991 }
992
993
994
995
996 try {
997 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
998 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
999
1000 LOG.info("Finalize the Snapshot Export");
1001 if (!skipTmp) {
1002
1003 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1004 throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1005 snapshotTmpDir + " to=" + outputSnapshotDir);
1006 }
1007 }
1008
1009
1010 if (verifyTarget) {
1011 LOG.info("Verify snapshot integrity");
1012 verifySnapshot(conf, outputFs, outputRoot, outputSnapshotDir);
1013 }
1014
1015 LOG.info("Export Completed: " + targetName);
1016 return 0;
1017 } catch (Exception e) {
1018 LOG.error("Snapshot export failed", e);
1019 if (!skipTmp) {
1020 outputFs.delete(snapshotTmpDir, true);
1021 }
1022 outputFs.delete(outputSnapshotDir, true);
1023 return 1;
1024 } finally {
1025 IOUtils.closeStream(inputFs);
1026 IOUtils.closeStream(outputFs);
1027 }
1028 }
1029
1030
1031 private void printUsageAndExit() {
1032 System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1033 System.err.println(" where [options] are:");
1034 System.err.println(" -h|-help Show this help and exit.");
1035 System.err.println(" -snapshot NAME Snapshot to restore.");
1036 System.err.println(" -copy-to NAME Remote destination hdfs://");
1037 System.err.println(" -copy-from NAME Input folder hdfs:// (default hbase.rootdir)");
1038 System.err.println(" -no-checksum-verify Do not verify checksum, use name+length only.");
1039 System.err.println(" -no-target-verify Do not verify the integrity of the \\" +
1040 "exported snapshot.");
1041 System.err.println(" -overwrite Rewrite the snapshot manifest if already exists");
1042 System.err.println(" -chuser USERNAME Change the owner of the files " +
1043 "to the specified one.");
1044 System.err.println(" -chgroup GROUP Change the group of the files to " +
1045 "the specified one.");
1046 System.err.println(" -chmod MODE Change the permission of the files " +
1047 "to the specified one.");
1048 System.err.println(" -mappers Number of mappers to use during the " +
1049 "copy (mapreduce.job.maps).");
1050 System.err.println(" -bandwidth Limit bandwidth to this value in MB/second.");
1051 System.err.println();
1052 System.err.println("Examples:");
1053 System.err.println(" hbase " + getClass().getName() + " \\");
1054 System.err.println(" -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1055 System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1056 System.err.println();
1057 System.err.println(" hbase " + getClass().getName() + " \\");
1058 System.err.println(" -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1059 System.err.println(" -copy-to hdfs://srv1:50070/hbase \\");
1060 System.exit(1);
1061 }
1062
1063
1064
1065
1066
1067
1068
1069
1070 static int innerMain(final Configuration conf, final String [] args) throws Exception {
1071 return ToolRunner.run(conf, new ExportSnapshot(), args);
1072 }
1073
1074 public static void main(String[] args) throws Exception {
1075 System.exit(innerMain(HBaseConfiguration.create(), args));
1076 }
1077 }