1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.ResultScanner;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.apache.hadoop.hbase.client.ScannerCallable;
32 import org.apache.hadoop.hbase.client.Table;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.filter.Filter;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.util.StringUtils;
39
40 import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
41
42
43
44
45 @InterfaceAudience.Public
46 @InterfaceStability.Stable
47 public class TableRecordReaderImpl {
48 static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
49
50 private byte [] startRow;
51 private byte [] endRow;
52 private byte [] lastSuccessfulRow;
53 private Filter trrRowFilter;
54 private ResultScanner scanner;
55 private Table htable;
56 private byte [][] trrInputColumns;
57 private long timestamp;
58 private int rowcount;
59 private boolean logScannerActivity = false;
60 private int logPerRowCount = 100;
61
62
63
64
65
66
67
68 public void restart(byte[] firstRow) throws IOException {
69 Scan currentScan;
70 if ((endRow != null) && (endRow.length > 0)) {
71 if (trrRowFilter != null) {
72 Scan scan = new Scan(firstRow, endRow);
73 TableInputFormat.addColumns(scan, trrInputColumns);
74 scan.setFilter(trrRowFilter);
75 scan.setCacheBlocks(false);
76 this.scanner = this.htable.getScanner(scan);
77 currentScan = scan;
78 } else {
79 LOG.debug("TIFB.restart, firstRow: " +
80 Bytes.toStringBinary(firstRow) + ", endRow: " +
81 Bytes.toStringBinary(endRow));
82 Scan scan = new Scan(firstRow, endRow);
83 TableInputFormat.addColumns(scan, trrInputColumns);
84 this.scanner = this.htable.getScanner(scan);
85 currentScan = scan;
86 }
87 } else {
88 LOG.debug("TIFB.restart, firstRow: " +
89 Bytes.toStringBinary(firstRow) + ", no endRow");
90
91 Scan scan = new Scan(firstRow);
92 TableInputFormat.addColumns(scan, trrInputColumns);
93 scan.setFilter(trrRowFilter);
94 this.scanner = this.htable.getScanner(scan);
95 currentScan = scan;
96 }
97 if (logScannerActivity) {
98 LOG.info("Current scan=" + currentScan.toString());
99 timestamp = System.currentTimeMillis();
100 rowcount = 0;
101 }
102 }
103
104
105
106
107
108
109 public void init() throws IOException {
110 restart(startRow);
111 }
112
113 byte[] getStartRow() {
114 return this.startRow;
115 }
116
117
118
119 public void setHTable(Table htable) {
120 Configuration conf = htable.getConfiguration();
121 logScannerActivity = conf.getBoolean(
122 ScannerCallable.LOG_SCANNER_ACTIVITY, false);
123 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
124 this.htable = htable;
125 }
126
127
128
129
130 public void setInputColumns(final byte [][] inputColumns) {
131 this.trrInputColumns = inputColumns;
132 }
133
134
135
136
137 public void setStartRow(final byte [] startRow) {
138 this.startRow = startRow;
139 }
140
141
142
143
144
145 public void setEndRow(final byte [] endRow) {
146 this.endRow = endRow;
147 }
148
149
150
151
152 public void setRowFilter(Filter rowFilter) {
153 this.trrRowFilter = rowFilter;
154 }
155
156 public void close() {
157 this.scanner.close();
158 try {
159 this.htable.close();
160 } catch (IOException ioe) {
161 LOG.warn("Error closing table", ioe);
162 }
163 }
164
165
166
167
168
169
170 public ImmutableBytesWritable createKey() {
171 return new ImmutableBytesWritable();
172 }
173
174
175
176
177
178
179 public Result createValue() {
180 return new Result();
181 }
182
183 public long getPos() {
184
185
186 return 0;
187 }
188
189 public float getProgress() {
190
191 return 0;
192 }
193
194
195
196
197
198
199
200 public boolean next(ImmutableBytesWritable key, Result value)
201 throws IOException {
202 Result result;
203 try {
204 try {
205 result = this.scanner.next();
206 if (logScannerActivity) {
207 rowcount ++;
208 if (rowcount >= logPerRowCount) {
209 long now = System.currentTimeMillis();
210 LOG.info("Mapper took " + (now-timestamp)
211 + "ms to process " + rowcount + " rows");
212 timestamp = now;
213 rowcount = 0;
214 }
215 }
216 } catch (IOException e) {
217
218 if (e instanceof DoNotRetryIOException) {
219 throw e;
220 }
221
222
223 LOG.debug("recovered from " + StringUtils.stringifyException(e));
224 if (lastSuccessfulRow == null) {
225 LOG.warn("We are restarting the first next() invocation," +
226 " if your mapper has restarted a few other times like this" +
227 " then you should consider killing this job and investigate" +
228 " why it's taking so long.");
229 }
230 if (lastSuccessfulRow == null) {
231 restart(startRow);
232 } else {
233 restart(lastSuccessfulRow);
234 this.scanner.next();
235 }
236 result = this.scanner.next();
237 }
238
239 if (result != null && result.size() > 0) {
240 key.set(result.getRow());
241 lastSuccessfulRow = key.get();
242 value.copyFrom(result);
243 return true;
244 }
245 return false;
246 } catch (IOException ioe) {
247 if (logScannerActivity) {
248 long now = System.currentTimeMillis();
249 LOG.info("Mapper took " + (now-timestamp)
250 + "ms to process " + rowcount + " rows");
251 LOG.info(ioe);
252 String lastRow = lastSuccessfulRow == null ?
253 "null" : Bytes.toStringBinary(lastSuccessfulRow);
254 LOG.info("lastSuccessfulRow=" + lastRow);
255 }
256 throw ioe;
257 }
258 }
259 }