1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.lang.reflect.Method;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.io.Writable;
37 import org.apache.hadoop.mapreduce.InputFormat;
38 import org.apache.hadoop.mapreduce.InputSplit;
39 import org.apache.hadoop.mapreduce.Job;
40 import org.apache.hadoop.mapreduce.JobContext;
41 import org.apache.hadoop.mapreduce.RecordReader;
42 import org.apache.hadoop.mapreduce.TaskAttemptContext;
43
44 import com.google.common.annotations.VisibleForTesting;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @InterfaceAudience.Public
85 @InterfaceStability.Evolving
86 public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
87
88 public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
89 private TableSnapshotInputFormatImpl.InputSplit delegate;
90
91
92 public TableSnapshotRegionSplit() {
93 this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
94 }
95
96 public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
97 this.delegate = delegate;
98 }
99
100 public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
101 List<String> locations) {
102 this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
103 }
104
105 @Override
106 public long getLength() throws IOException, InterruptedException {
107 return delegate.getLength();
108 }
109
110 @Override
111 public String[] getLocations() throws IOException, InterruptedException {
112 return delegate.getLocations();
113 }
114
115 @Override
116 public void write(DataOutput out) throws IOException {
117 delegate.write(out);
118 }
119
120 @Override
121 public void readFields(DataInput in) throws IOException {
122 delegate.readFields(in);
123 }
124 }
125
126 @VisibleForTesting
127 static class TableSnapshotRegionRecordReader extends
128 RecordReader<ImmutableBytesWritable, Result> {
129 private TableSnapshotInputFormatImpl.RecordReader delegate =
130 new TableSnapshotInputFormatImpl.RecordReader();
131 private TaskAttemptContext context;
132 private Method getCounter;
133
134 @Override
135 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
136 InterruptedException {
137 this.context = context;
138 getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
139 delegate.initialize(
140 ((TableSnapshotRegionSplit) split).delegate,
141 context.getConfiguration());
142 }
143
144 @Override
145 public boolean nextKeyValue() throws IOException, InterruptedException {
146 boolean result = delegate.nextKeyValue();
147 if (result) {
148 ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
149 if (scanMetrics != null && context != null) {
150 TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
151 }
152 }
153 return result;
154 }
155
156 @Override
157 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
158 return delegate.getCurrentKey();
159 }
160
161 @Override
162 public Result getCurrentValue() throws IOException, InterruptedException {
163 return delegate.getCurrentValue();
164 }
165
166 @Override
167 public float getProgress() throws IOException, InterruptedException {
168 return delegate.getProgress();
169 }
170
171 @Override
172 public void close() throws IOException {
173 delegate.close();
174 }
175 }
176
177 @Override
178 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
179 InputSplit split, TaskAttemptContext context) throws IOException {
180 return new TableSnapshotRegionRecordReader();
181 }
182
183 @Override
184 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
185 List<InputSplit> results = new ArrayList<InputSplit>();
186 for (TableSnapshotInputFormatImpl.InputSplit split :
187 TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
188 results.add(new TableSnapshotRegionSplit(split));
189 }
190 return results;
191 }
192
193
194
195
196
197
198
199
200
201
202 public static void setInput(Job job, String snapshotName, Path restoreDir)
203 throws IOException {
204 TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
205 }
206 }