1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Method;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.client.ResultScanner;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.client.ScannerCallable;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.Counter;
39 import org.apache.hadoop.mapreduce.InputSplit;
40 import org.apache.hadoop.mapreduce.TaskAttemptContext;
41 import org.apache.hadoop.util.StringUtils;
42
43 import com.google.common.annotations.VisibleForTesting;
44
45
46
47
48
49 @InterfaceAudience.Public
50 @InterfaceStability.Stable
51 public class TableRecordReaderImpl {
52 public static final String LOG_PER_ROW_COUNT
53 = "hbase.mapreduce.log.scanner.rowcount";
54
55 private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
56
57
58 @VisibleForTesting
59 static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
60 private ResultScanner scanner = null;
61 private Scan scan = null;
62 private Scan currentScan = null;
63 private Table htable = null;
64 private byte[] lastSuccessfulRow = null;
65 private ImmutableBytesWritable key = null;
66 private Result value = null;
67 private TaskAttemptContext context = null;
68 private Method getCounter = null;
69 private long numRestarts = 0;
70 private long numStale = 0;
71 private long timestamp;
72 private int rowcount;
73 private boolean logScannerActivity = false;
74 private int logPerRowCount = 100;
75
76
77
78
79
80
81
82 public void restart(byte[] firstRow) throws IOException {
83 currentScan = new Scan(scan);
84 currentScan.setStartRow(firstRow);
85 currentScan.setScanMetricsEnabled(true);
86 if (this.scanner != null) {
87 if (logScannerActivity) {
88 LOG.info("Closing the previously opened scanner object.");
89 }
90 this.scanner.close();
91 }
92 this.scanner = this.htable.getScanner(currentScan);
93 if (logScannerActivity) {
94 LOG.info("Current scan=" + currentScan.toString());
95 timestamp = System.currentTimeMillis();
96 rowcount = 0;
97 }
98 }
99
100
101
102
103
104
105
106 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
107 throws IOException {
108 Method m = null;
109 try {
110 m = context.getClass().getMethod("getCounter",
111 new Class [] {String.class, String.class});
112 } catch (SecurityException e) {
113 throw new IOException("Failed test for getCounter", e);
114 } catch (NoSuchMethodException e) {
115
116 }
117 return m;
118 }
119
120
121
122
123
124
125 public void setHTable(Table htable) {
126 Configuration conf = htable.getConfiguration();
127 logScannerActivity = conf.getBoolean(
128 ScannerCallable.LOG_SCANNER_ACTIVITY, false);
129 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
130 this.htable = htable;
131 }
132
133
134
135
136
137
138 public void setScan(Scan scan) {
139 this.scan = scan;
140 }
141
142
143
144
145
146
147
148 public void initialize(InputSplit inputsplit,
149 TaskAttemptContext context) throws IOException,
150 InterruptedException {
151 if (context != null) {
152 this.context = context;
153 getCounter = retrieveGetCounterWithStringsParams(context);
154 }
155 restart(scan.getStartRow());
156 }
157
158
159
160
161
162
163 public void close() {
164 if (this.scanner != null) {
165 this.scanner.close();
166 }
167 try {
168 this.htable.close();
169 } catch (IOException ioe) {
170 LOG.warn("Error closing table", ioe);
171 }
172 }
173
174
175
176
177
178
179
180
181 public ImmutableBytesWritable getCurrentKey() throws IOException,
182 InterruptedException {
183 return key;
184 }
185
186
187
188
189
190
191
192
193 public Result getCurrentValue() throws IOException, InterruptedException {
194 return value;
195 }
196
197
198
199
200
201
202
203
204
205 public boolean nextKeyValue() throws IOException, InterruptedException {
206 if (key == null) key = new ImmutableBytesWritable();
207 if (value == null) value = new Result();
208 try {
209 try {
210 value = this.scanner.next();
211 if (value != null && value.isStale()) numStale++;
212 if (logScannerActivity) {
213 rowcount ++;
214 if (rowcount >= logPerRowCount) {
215 long now = System.currentTimeMillis();
216 LOG.info("Mapper took " + (now-timestamp)
217 + "ms to process " + rowcount + " rows");
218 timestamp = now;
219 rowcount = 0;
220 }
221 }
222 } catch (IOException e) {
223
224 if (e instanceof DoNotRetryIOException) {
225 throw e;
226 }
227
228
229 LOG.info("recovered from " + StringUtils.stringifyException(e));
230 if (lastSuccessfulRow == null) {
231 LOG.warn("We are restarting the first next() invocation," +
232 " if your mapper has restarted a few other times like this" +
233 " then you should consider killing this job and investigate" +
234 " why it's taking so long.");
235 }
236 if (lastSuccessfulRow == null) {
237 restart(scan.getStartRow());
238 } else {
239 restart(lastSuccessfulRow);
240 scanner.next();
241 }
242 value = scanner.next();
243 if (value != null && value.isStale()) numStale++;
244 numRestarts++;
245 }
246 if (value != null && value.size() > 0) {
247 key.set(value.getRow());
248 lastSuccessfulRow = key.get();
249 return true;
250 }
251
252 updateCounters();
253 return false;
254 } catch (IOException ioe) {
255 if (logScannerActivity) {
256 long now = System.currentTimeMillis();
257 LOG.info("Mapper took " + (now-timestamp)
258 + "ms to process " + rowcount + " rows");
259 LOG.info(ioe);
260 String lastRow = lastSuccessfulRow == null ?
261 "null" : Bytes.toStringBinary(lastSuccessfulRow);
262 LOG.info("lastSuccessfulRow=" + lastRow);
263 }
264 throw ioe;
265 }
266 }
267
268
269
270
271
272
273
274
275 private void updateCounters() throws IOException {
276 ScanMetrics scanMetrics = currentScan.getScanMetrics();
277 if (scanMetrics == null) {
278 return;
279 }
280
281 updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
282 }
283
284 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
285 Method getCounter, TaskAttemptContext context, long numStale) {
286
287 if (getCounter == null) {
288 return;
289 }
290
291 try {
292 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
293 Counter ct = (Counter)getCounter.invoke(context,
294 HBASE_COUNTER_GROUP_NAME, entry.getKey());
295
296 ct.increment(entry.getValue());
297 }
298 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
299 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
300 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
301 "NUM_SCAN_RESULTS_STALE")).increment(numStale);
302 } catch (Exception e) {
303 LOG.debug("can't update counter." + StringUtils.stringifyException(e));
304 }
305 }
306
307
308
309
310
311
312 public float getProgress() {
313
314 return 0;
315 }
316
317 }