1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import org.apache.hadoop.fs.Path;
22 import org.apache.hadoop.hbase.HRegionInfo;
23 import org.apache.hadoop.hbase.HTableDescriptor;
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.hbase.client.Result;
27 import org.apache.hadoop.hbase.client.Scan;
28 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
30 import org.apache.hadoop.mapred.InputFormat;
31 import org.apache.hadoop.mapred.InputSplit;
32 import org.apache.hadoop.mapred.JobConf;
33 import org.apache.hadoop.mapred.RecordReader;
34 import org.apache.hadoop.mapred.Reporter;
35
36 import java.io.DataInput;
37 import java.io.DataOutput;
38 import java.io.IOException;
39 import java.util.List;
40
41
42
43
44
45
46
47 @InterfaceAudience.Public
48 @InterfaceStability.Evolving
49 public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
50
51 public static class TableSnapshotRegionSplit implements InputSplit {
52 private TableSnapshotInputFormatImpl.InputSplit delegate;
53
54
55 public TableSnapshotRegionSplit() {
56 this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
57 }
58
59 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
60 this.delegate = delegate;
61 }
62
63 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
64 List<String> locations, Scan scan, Path restoreDir) {
65 this.delegate =
66 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
67 }
68
69 @Override
70 public long getLength() throws IOException {
71 return delegate.getLength();
72 }
73
74 @Override
75 public String[] getLocations() throws IOException {
76 return delegate.getLocations();
77 }
78
79 @Override
80 public void write(DataOutput out) throws IOException {
81 delegate.write(out);
82 }
83
84 @Override
85 public void readFields(DataInput in) throws IOException {
86 delegate.readFields(in);
87 }
88 }
89
90 static class TableSnapshotRecordReader
91 implements RecordReader<ImmutableBytesWritable, Result> {
92
93 private TableSnapshotInputFormatImpl.RecordReader delegate;
94
95 public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
96 throws IOException {
97 delegate = new TableSnapshotInputFormatImpl.RecordReader();
98 delegate.initialize(split.delegate, job);
99 }
100
101 @Override
102 public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
103 if (!delegate.nextKeyValue()) {
104 return false;
105 }
106 ImmutableBytesWritable currentKey = delegate.getCurrentKey();
107 key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
108 value.copyFrom(delegate.getCurrentValue());
109 return true;
110 }
111
112 @Override
113 public ImmutableBytesWritable createKey() {
114 return new ImmutableBytesWritable();
115 }
116
117 @Override
118 public Result createValue() {
119 return new Result();
120 }
121
122 @Override
123 public long getPos() throws IOException {
124 return delegate.getPos();
125 }
126
127 @Override
128 public void close() throws IOException {
129 delegate.close();
130 }
131
132 @Override
133 public float getProgress() throws IOException {
134 return delegate.getProgress();
135 }
136 }
137
138 @Override
139 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
140 List<TableSnapshotInputFormatImpl.InputSplit> splits =
141 TableSnapshotInputFormatImpl.getSplits(job);
142 InputSplit[] results = new InputSplit[splits.size()];
143 for (int i = 0; i < splits.size(); i++) {
144 results[i] = new TableSnapshotRegionSplit(splits.get(i));
145 }
146 return results;
147 }
148
149 @Override
150 public RecordReader<ImmutableBytesWritable, Result>
151 getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
152 return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
153 }
154
155
156
157
158
159
160
161
162
163
164 public static void setInput(JobConf job, String snapshotName, Path restoreDir)
165 throws IOException {
166 TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
167 }
168 }