1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import org.apache.hadoop.hbase.classification.InterfaceAudience;
22 import org.apache.hadoop.hbase.classification.InterfaceStability;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hbase.HRegionInfo;
25 import org.apache.hadoop.hbase.HTableDescriptor;
26 import org.apache.hadoop.hbase.client.Result;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
29 import org.apache.hadoop.mapred.InputFormat;
30 import org.apache.hadoop.mapred.InputSplit;
31 import org.apache.hadoop.mapred.JobConf;
32 import org.apache.hadoop.mapred.RecordReader;
33 import org.apache.hadoop.mapred.Reporter;
34
35 import java.io.DataInput;
36 import java.io.DataOutput;
37 import java.io.IOException;
38 import java.util.List;
39
40
41
42
43
44
45
46 @InterfaceAudience.Public
47 @InterfaceStability.Evolving
48 public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
49
50 public static class TableSnapshotRegionSplit implements InputSplit {
51 private TableSnapshotInputFormatImpl.InputSplit delegate;
52
53
54 public TableSnapshotRegionSplit() {
55 this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
56 }
57
58 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
59 this.delegate = delegate;
60 }
61
62 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
63 List<String> locations) {
64 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
65 }
66
67 @Override
68 public long getLength() throws IOException {
69 return delegate.getLength();
70 }
71
72 @Override
73 public String[] getLocations() throws IOException {
74 return delegate.getLocations();
75 }
76
77 @Override
78 public void write(DataOutput out) throws IOException {
79 delegate.write(out);
80 }
81
82 @Override
83 public void readFields(DataInput in) throws IOException {
84 delegate.readFields(in);
85 }
86 }
87
88 static class TableSnapshotRecordReader
89 implements RecordReader<ImmutableBytesWritable, Result> {
90
91 private TableSnapshotInputFormatImpl.RecordReader delegate;
92
93 public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
94 throws IOException {
95 delegate = new TableSnapshotInputFormatImpl.RecordReader();
96 delegate.initialize(split.delegate, job);
97 }
98
99 @Override
100 public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
101 if (!delegate.nextKeyValue()) {
102 return false;
103 }
104 ImmutableBytesWritable currentKey = delegate.getCurrentKey();
105 key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
106 value.copyFrom(delegate.getCurrentValue());
107 return true;
108 }
109
110 @Override
111 public ImmutableBytesWritable createKey() {
112 return new ImmutableBytesWritable();
113 }
114
115 @Override
116 public Result createValue() {
117 return new Result();
118 }
119
120 @Override
121 public long getPos() throws IOException {
122 return delegate.getPos();
123 }
124
125 @Override
126 public void close() throws IOException {
127 delegate.close();
128 }
129
130 @Override
131 public float getProgress() throws IOException {
132 return delegate.getProgress();
133 }
134 }
135
136 @Override
137 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
138 List<TableSnapshotInputFormatImpl.InputSplit> splits =
139 TableSnapshotInputFormatImpl.getSplits(job);
140 InputSplit[] results = new InputSplit[splits.size()];
141 for (int i = 0; i < splits.size(); i++) {
142 results[i] = new TableSnapshotRegionSplit(splits.get(i));
143 }
144 return results;
145 }
146
147 @Override
148 public RecordReader<ImmutableBytesWritable, Result>
149 getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
150 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
151 }
152
153
154
155
156
157
158
159
160
161
162 public static void setInput(JobConf job, String snapshotName, Path restoreDir)
163 throws IOException {
164 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
165 }
166 }