1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import org.apache.hadoop.fs.Path;
23 import org.apache.hadoop.hbase.HRegionInfo;
24 import org.apache.hadoop.hbase.HTableDescriptor;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.mapreduce.InputFormat;
33 import org.apache.hadoop.mapreduce.InputSplit;
34 import org.apache.hadoop.mapreduce.Job;
35 import org.apache.hadoop.mapreduce.JobContext;
36 import org.apache.hadoop.mapreduce.RecordReader;
37 import org.apache.hadoop.mapreduce.TaskAttemptContext;
38
39 import java.io.DataInput;
40 import java.io.DataOutput;
41 import java.io.IOException;
42 import java.lang.reflect.Method;
43 import java.util.ArrayList;
44 import java.util.List;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @InterfaceAudience.Public
85 @InterfaceStability.Evolving
86 public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
87
88 public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
89 private TableSnapshotInputFormatImpl.InputSplit delegate;
90
91
92 public TableSnapshotRegionSplit() {
93 this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
94 }
95
96 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
97 this.delegate = delegate;
98 }
99
100 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
101 List<String> locations, Scan scan, Path restoreDir) {
102 this.delegate =
103 new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
104 }
105
106 @Override
107 public long getLength() throws IOException, InterruptedException {
108 return delegate.getLength();
109 }
110
111 @Override
112 public String[] getLocations() throws IOException, InterruptedException {
113 return delegate.getLocations();
114 }
115
116 @Override
117 public void write(DataOutput out) throws IOException {
118 delegate.write(out);
119 }
120
121 @Override
122 public void readFields(DataInput in) throws IOException {
123 delegate.readFields(in);
124 }
125 }
126
127 @VisibleForTesting
128 static class TableSnapshotRegionRecordReader extends
129 RecordReader<ImmutableBytesWritable, Result> {
130 private TableSnapshotInputFormatImpl.RecordReader delegate =
131 new TableSnapshotInputFormatImpl.RecordReader();
132 private TaskAttemptContext context;
133 private Method getCounter;
134
135 @Override
136 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
137 InterruptedException {
138 this.context = context;
139 getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
140 delegate.initialize(
141 ((TableSnapshotRegionSplit) split).delegate,
142 context.getConfiguration());
143 }
144
145 @Override
146 public boolean nextKeyValue() throws IOException, InterruptedException {
147 boolean result = delegate.nextKeyValue();
148 if (result) {
149 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
150 if (scanMetrics != null && context != null) {
151 TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
152 }
153 }
154 return result;
155 }
156
157 @Override
158 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
159 return delegate.getCurrentKey();
160 }
161
162 @Override
163 public Result getCurrentValue() throws IOException, InterruptedException {
164 return delegate.getCurrentValue();
165 }
166
167 @Override
168 public float getProgress() throws IOException, InterruptedException {
169 return delegate.getProgress();
170 }
171
172 @Override
173 public void close() throws IOException {
174 delegate.close();
175 }
176 }
177
178 @Override
179 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
180 InputSplit split, TaskAttemptContext context) throws IOException {
181 return new TableSnapshotRegionRecordReader();
182 }
183
184 @Override
185 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
186 List<InputSplit> results = new ArrayList<InputSplit>();
187 for (TableSnapshotInputFormatImpl.InputSplit split :
188 TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
189 results.add(new TableSnapshotRegionSplit(split));
190 }
191 return results;
192 }
193
194
195
196
197
198
199
200
201
202
203 public static void setInput(Job job, String snapshotName, Path restoreDir)
204 throws IOException {
205 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
206 }
207 }