1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.lang.reflect.Method;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.client.ResultScanner;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.client.ScannerCallable;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.mapreduce.Counter;
39 import org.apache.hadoop.mapreduce.InputSplit;
40 import org.apache.hadoop.mapreduce.TaskAttemptContext;
41 import org.apache.hadoop.util.StringUtils;
42
43 import com.google.common.annotations.VisibleForTesting;
44
45
46
47
48
49 @InterfaceAudience.Public
50 @InterfaceStability.Stable
51 public class TableRecordReaderImpl {
52 public static final String LOG_PER_ROW_COUNT
53 = "hbase.mapreduce.log.scanner.rowcount";
54
55 static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
56
57
58 @VisibleForTesting
59 static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
60 private ResultScanner scanner = null;
61 private Scan scan = null;
62 private Scan currentScan = null;
63 private Table htable = null;
64 private byte[] lastSuccessfulRow = null;
65 private ImmutableBytesWritable key = null;
66 private Result value = null;
67 private TaskAttemptContext context = null;
68 private Method getCounter = null;
69 private long numRestarts = 0;
70 private long numStale = 0;
71 private long timestamp;
72 private int rowcount;
73 private boolean logScannerActivity = false;
74 private int logPerRowCount = 100;
75
76
77
78
79
80
81
82 public void restart(byte[] firstRow) throws IOException {
83 currentScan = new Scan(scan);
84 currentScan.setStartRow(firstRow);
85 currentScan.setScanMetricsEnabled(true);
86 if (this.scanner != null) {
87 if (logScannerActivity) {
88 LOG.info("Closing the previously opened scanner object.");
89 }
90 this.scanner.close();
91 }
92 this.scanner = this.htable.getScanner(currentScan);
93 if (logScannerActivity) {
94 LOG.info("Current scan=" + currentScan.toString());
95 timestamp = System.currentTimeMillis();
96 rowcount = 0;
97 }
98 }
99
100
101
102
103
104
105
106 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
107 throws IOException {
108 Method m = null;
109 try {
110 m = context.getClass().getMethod("getCounter",
111 new Class [] {String.class, String.class});
112 } catch (SecurityException e) {
113 throw new IOException("Failed test for getCounter", e);
114 } catch (NoSuchMethodException e) {
115
116 }
117 return m;
118 }
119
120
121
122
123
124
125 public void setHTable(Table htable) {
126 Configuration conf = htable.getConfiguration();
127 logScannerActivity = conf.getBoolean(
128 ScannerCallable.LOG_SCANNER_ACTIVITY, false);
129 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
130 this.htable = htable;
131 }
132
133
134
135
136
137
138 public void setScan(Scan scan) {
139 this.scan = scan;
140 }
141
142
143
144
145
146
147 public void initialize(InputSplit inputsplit,
148 TaskAttemptContext context) throws IOException,
149 InterruptedException {
150 if (context != null) {
151 this.context = context;
152 getCounter = retrieveGetCounterWithStringsParams(context);
153 }
154 restart(scan.getStartRow());
155 }
156
157
158
159
160
161
162 public void close() {
163 this.scanner.close();
164 try {
165 this.htable.close();
166 } catch (IOException ioe) {
167 LOG.warn("Error closing table", ioe);
168 }
169 }
170
171
172
173
174
175
176
177
178 public ImmutableBytesWritable getCurrentKey() throws IOException,
179 InterruptedException {
180 return key;
181 }
182
183
184
185
186
187
188
189
190 public Result getCurrentValue() throws IOException, InterruptedException {
191 return value;
192 }
193
194
195
196
197
198
199
200
201
202 public boolean nextKeyValue() throws IOException, InterruptedException {
203 if (key == null) key = new ImmutableBytesWritable();
204 if (value == null) value = new Result();
205 try {
206 try {
207 value = this.scanner.next();
208 if (value != null && value.isStale()) numStale++;
209 if (logScannerActivity) {
210 rowcount ++;
211 if (rowcount >= logPerRowCount) {
212 long now = System.currentTimeMillis();
213 LOG.info("Mapper took " + (now-timestamp)
214 + "ms to process " + rowcount + " rows");
215 timestamp = now;
216 rowcount = 0;
217 }
218 }
219 } catch (IOException e) {
220
221 if (e instanceof DoNotRetryIOException) {
222 throw e;
223 }
224
225
226 LOG.info("recovered from " + StringUtils.stringifyException(e));
227 if (lastSuccessfulRow == null) {
228 LOG.warn("We are restarting the first next() invocation," +
229 " if your mapper has restarted a few other times like this" +
230 " then you should consider killing this job and investigate" +
231 " why it's taking so long.");
232 }
233 if (lastSuccessfulRow == null) {
234 restart(scan.getStartRow());
235 } else {
236 restart(lastSuccessfulRow);
237 scanner.next();
238 }
239 value = scanner.next();
240 if (value != null && value.isStale()) numStale++;
241 numRestarts++;
242 }
243 if (value != null && value.size() > 0) {
244 key.set(value.getRow());
245 lastSuccessfulRow = key.get();
246 return true;
247 }
248
249 updateCounters();
250 return false;
251 } catch (IOException ioe) {
252 if (logScannerActivity) {
253 long now = System.currentTimeMillis();
254 LOG.info("Mapper took " + (now-timestamp)
255 + "ms to process " + rowcount + " rows");
256 LOG.info(ioe);
257 String lastRow = lastSuccessfulRow == null ?
258 "null" : Bytes.toStringBinary(lastSuccessfulRow);
259 LOG.info("lastSuccessfulRow=" + lastRow);
260 }
261 throw ioe;
262 }
263 }
264
265
266
267
268
269
270
271
272 private void updateCounters() throws IOException {
273 ScanMetrics scanMetrics = currentScan.getScanMetrics();
274 if (scanMetrics == null) {
275 return;
276 }
277
278 updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
279 }
280
281 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
282 Method getCounter, TaskAttemptContext context, long numStale) {
283
284 if (getCounter == null) {
285 return;
286 }
287
288 try {
289 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
290 Counter ct = (Counter)getCounter.invoke(context,
291 HBASE_COUNTER_GROUP_NAME, entry.getKey());
292
293 ct.increment(entry.getValue());
294 }
295 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
296 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
297 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
298 "NUM_SCAN_RESULTS_STALE")).increment(numStale);
299 } catch (Exception e) {
300 LOG.debug("can't update counter." + StringUtils.stringifyException(e));
301 }
302 }
303
304
305
306
307
308
309 public float getProgress() {
310
311 return 0;
312 }
313
314 }