1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.ResultScanner;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.apache.hadoop.hbase.client.ScannerCallable;
32 import org.apache.hadoop.hbase.client.Table;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.filter.Filter;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.util.StringUtils;
39
40 import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
41
42
43
44
45 @InterfaceAudience.Public
46 @InterfaceStability.Stable
47 public class TableRecordReaderImpl {
48 private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
49
50 private byte [] startRow;
51 private byte [] endRow;
52 private byte [] lastSuccessfulRow;
53 private Filter trrRowFilter;
54 private ResultScanner scanner;
55 private Table htable;
56 private byte [][] trrInputColumns;
57 private long timestamp;
58 private int rowcount;
59 private boolean logScannerActivity = false;
60 private int logPerRowCount = 100;
61
62
63
64
65
66
67
68 public void restart(byte[] firstRow) throws IOException {
69 Scan currentScan;
70 if ((endRow != null) && (endRow.length > 0)) {
71 if (trrRowFilter != null) {
72 Scan scan = new Scan(firstRow, endRow);
73 TableInputFormat.addColumns(scan, trrInputColumns);
74 scan.setFilter(trrRowFilter);
75 scan.setCacheBlocks(false);
76 this.scanner = this.htable.getScanner(scan);
77 currentScan = scan;
78 } else {
79 LOG.debug("TIFB.restart, firstRow: " +
80 Bytes.toStringBinary(firstRow) + ", endRow: " +
81 Bytes.toStringBinary(endRow));
82 Scan scan = new Scan(firstRow, endRow);
83 TableInputFormat.addColumns(scan, trrInputColumns);
84 this.scanner = this.htable.getScanner(scan);
85 currentScan = scan;
86 }
87 } else {
88 LOG.debug("TIFB.restart, firstRow: " +
89 Bytes.toStringBinary(firstRow) + ", no endRow");
90
91 Scan scan = new Scan(firstRow);
92 TableInputFormat.addColumns(scan, trrInputColumns);
93 scan.setFilter(trrRowFilter);
94 this.scanner = this.htable.getScanner(scan);
95 currentScan = scan;
96 }
97 if (logScannerActivity) {
98 LOG.info("Current scan=" + currentScan.toString());
99 timestamp = System.currentTimeMillis();
100 rowcount = 0;
101 }
102 }
103
104
105
106
107
108
109 public void init() throws IOException {
110 restart(startRow);
111 }
112
113 byte[] getStartRow() {
114 return this.startRow;
115 }
116
117
118
119 public void setHTable(Table htable) {
120 Configuration conf = htable.getConfiguration();
121 logScannerActivity = conf.getBoolean(
122 ScannerCallable.LOG_SCANNER_ACTIVITY, false);
123 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
124 this.htable = htable;
125 }
126
127
128
129
130 public void setInputColumns(final byte [][] inputColumns) {
131 this.trrInputColumns = inputColumns;
132 }
133
134
135
136
137 public void setStartRow(final byte [] startRow) {
138 this.startRow = startRow;
139 }
140
141
142
143
144
145 public void setEndRow(final byte [] endRow) {
146 this.endRow = endRow;
147 }
148
149
150
151
152 public void setRowFilter(Filter rowFilter) {
153 this.trrRowFilter = rowFilter;
154 }
155
156 public void close() {
157 if (this.scanner != null) {
158 this.scanner.close();
159 }
160 try {
161 this.htable.close();
162 } catch (IOException ioe) {
163 LOG.warn("Error closing table", ioe);
164 }
165 }
166
167
168
169
170
171
172 public ImmutableBytesWritable createKey() {
173 return new ImmutableBytesWritable();
174 }
175
176
177
178
179
180
181 public Result createValue() {
182 return new Result();
183 }
184
185 public long getPos() {
186
187
188 return 0;
189 }
190
191 public float getProgress() {
192
193 return 0;
194 }
195
196
197
198
199
200
201
202 public boolean next(ImmutableBytesWritable key, Result value)
203 throws IOException {
204 Result result;
205 try {
206 try {
207 result = this.scanner.next();
208 if (logScannerActivity) {
209 rowcount ++;
210 if (rowcount >= logPerRowCount) {
211 long now = System.currentTimeMillis();
212 LOG.info("Mapper took " + (now-timestamp)
213 + "ms to process " + rowcount + " rows");
214 timestamp = now;
215 rowcount = 0;
216 }
217 }
218 } catch (IOException e) {
219
220 if (e instanceof DoNotRetryIOException) {
221 throw e;
222 }
223
224
225 LOG.debug("recovered from " + StringUtils.stringifyException(e));
226 if (lastSuccessfulRow == null) {
227 LOG.warn("We are restarting the first next() invocation," +
228 " if your mapper has restarted a few other times like this" +
229 " then you should consider killing this job and investigate" +
230 " why it's taking so long.");
231 }
232 if (lastSuccessfulRow == null) {
233 restart(startRow);
234 } else {
235 restart(lastSuccessfulRow);
236 this.scanner.next();
237 }
238 result = this.scanner.next();
239 }
240
241 if (result != null && result.size() > 0) {
242 key.set(result.getRow());
243 lastSuccessfulRow = key.get();
244 value.copyFrom(result);
245 return true;
246 }
247 return false;
248 } catch (IOException ioe) {
249 if (logScannerActivity) {
250 long now = System.currentTimeMillis();
251 LOG.info("Mapper took " + (now-timestamp)
252 + "ms to process " + rowcount + " rows");
253 LOG.info(ioe);
254 String lastRow = lastSuccessfulRow == null ?
255 "null" : Bytes.toStringBinary(lastSuccessfulRow);
256 LOG.info("lastSuccessfulRow=" + lastRow);
257 }
258 throw ioe;
259 }
260 }
261 }