View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.client.ScannerCallable;
32  import org.apache.hadoop.hbase.client.Table;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.filter.Filter;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.util.StringUtils;
39  
40  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
41  
42  /**
43   * Iterate over an HBase table data, return (Text, RowResult) pairs
44   */
45  @InterfaceAudience.Public
46  @InterfaceStability.Stable
47  public class TableRecordReaderImpl {
48    private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
49  
50    private byte [] startRow;
51    private byte [] endRow;
52    private byte [] lastSuccessfulRow;
53    private Filter trrRowFilter;
54    private ResultScanner scanner;
55    private Table htable;
56    private byte [][] trrInputColumns;
57    private long timestamp;
58    private int rowcount;
59    private boolean logScannerActivity = false;
60    private int logPerRowCount = 100;
61  
62    /**
63     * Restart from survivable exceptions by creating a new scanner.
64     *
65     * @param firstRow
66     * @throws IOException
67     */
68    public void restart(byte[] firstRow) throws IOException {
69      Scan currentScan;
70      if ((endRow != null) && (endRow.length > 0)) {
71        if (trrRowFilter != null) {
72          Scan scan = new Scan(firstRow, endRow);
73          TableInputFormat.addColumns(scan, trrInputColumns);
74          scan.setFilter(trrRowFilter);
75          scan.setCacheBlocks(false);
76          this.scanner = this.htable.getScanner(scan);
77          currentScan = scan;
78        } else {
79          LOG.debug("TIFB.restart, firstRow: " +
80              Bytes.toStringBinary(firstRow) + ", endRow: " +
81              Bytes.toStringBinary(endRow));
82          Scan scan = new Scan(firstRow, endRow);
83          TableInputFormat.addColumns(scan, trrInputColumns);
84          this.scanner = this.htable.getScanner(scan);
85          currentScan = scan;
86        }
87      } else {
88        LOG.debug("TIFB.restart, firstRow: " +
89            Bytes.toStringBinary(firstRow) + ", no endRow");
90  
91        Scan scan = new Scan(firstRow);
92        TableInputFormat.addColumns(scan, trrInputColumns);
93        scan.setFilter(trrRowFilter);
94        this.scanner = this.htable.getScanner(scan);
95        currentScan = scan;
96      }
97      if (logScannerActivity) {
98        LOG.info("Current scan=" + currentScan.toString());
99        timestamp = System.currentTimeMillis();
100       rowcount = 0;
101     }
102   }
103 
104   /**
105    * Build the scanner. Not done in constructor to allow for extension.
106    *
107    * @throws IOException
108    */
109   public void init() throws IOException {
110     restart(startRow);
111   }
112 
113   byte[] getStartRow() {
114     return this.startRow;
115   }
116   /**
117    * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
118    */
119   public void setHTable(Table htable) {
120     Configuration conf = htable.getConfiguration();
121     logScannerActivity = conf.getBoolean(
122       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
123     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
124     this.htable = htable;
125   }
126 
127   /**
128    * @param inputColumns the columns to be placed in {@link Result}.
129    */
130   public void setInputColumns(final byte [][] inputColumns) {
131     this.trrInputColumns = inputColumns;
132   }
133 
134   /**
135    * @param startRow the first row in the split
136    */
137   public void setStartRow(final byte [] startRow) {
138     this.startRow = startRow;
139   }
140 
141   /**
142    *
143    * @param endRow the last row in the split
144    */
145   public void setEndRow(final byte [] endRow) {
146     this.endRow = endRow;
147   }
148 
149   /**
150    * @param rowFilter the {@link Filter} to be used.
151    */
152   public void setRowFilter(Filter rowFilter) {
153     this.trrRowFilter = rowFilter;
154   }
155 
156   public void close() {
157     if (this.scanner != null) {
158       this.scanner.close();
159     }
160     try {
161       this.htable.close();
162     } catch (IOException ioe) {
163       LOG.warn("Error closing table", ioe);
164     }
165   }
166 
167   /**
168    * @return ImmutableBytesWritable
169    *
170    * @see org.apache.hadoop.mapred.RecordReader#createKey()
171    */
172   public ImmutableBytesWritable createKey() {
173     return new ImmutableBytesWritable();
174   }
175 
176   /**
177    * @return RowResult
178    *
179    * @see org.apache.hadoop.mapred.RecordReader#createValue()
180    */
181   public Result createValue() {
182     return new Result();
183   }
184 
185   public long getPos() {
186     // This should be the ordinal tuple in the range;
187     // not clear how to calculate...
188     return 0;
189   }
190 
191   public float getProgress() {
192     // Depends on the total number of tuples and getPos
193     return 0;
194   }
195 
196   /**
197    * @param key HStoreKey as input key.
198    * @param value MapWritable as input value
199    * @return true if there was more data
200    * @throws IOException
201    */
202   public boolean next(ImmutableBytesWritable key, Result value)
203   throws IOException {
204     Result result;
205     try {
206       try {
207         result = this.scanner.next();
208         if (logScannerActivity) {
209           rowcount ++;
210           if (rowcount >= logPerRowCount) {
211             long now = System.currentTimeMillis();
212             LOG.info("Mapper took " + (now-timestamp)
213               + "ms to process " + rowcount + " rows");
214             timestamp = now;
215             rowcount = 0;
216           }
217         }
218       } catch (IOException e) {
219         // do not retry if the exception tells us not to do so
220         if (e instanceof DoNotRetryIOException) {
221           throw e;
222         }
223         // try to handle all other IOExceptions by restarting
224         // the scanner, if the second call fails, it will be rethrown
225         LOG.debug("recovered from " + StringUtils.stringifyException(e));
226         if (lastSuccessfulRow == null) {
227           LOG.warn("We are restarting the first next() invocation," +
228               " if your mapper has restarted a few other times like this" +
229               " then you should consider killing this job and investigate" +
230               " why it's taking so long.");
231         }
232         if (lastSuccessfulRow == null) {
233           restart(startRow);
234         } else {
235           restart(lastSuccessfulRow);
236           this.scanner.next();    // skip presumed already mapped row
237         }
238         result = this.scanner.next();
239       }
240 
241       if (result != null && result.size() > 0) {
242         key.set(result.getRow());
243         lastSuccessfulRow = key.get();
244         value.copyFrom(result);
245         return true;
246       }
247       return false;
248     } catch (IOException ioe) {
249       if (logScannerActivity) {
250         long now = System.currentTimeMillis();
251         LOG.info("Mapper took " + (now-timestamp)
252           + "ms to process " + rowcount + " rows");
253         LOG.info(ioe);
254         String lastRow = lastSuccessfulRow == null ?
255           "null" : Bytes.toStringBinary(lastSuccessfulRow);
256         LOG.info("lastSuccessfulRow=" + lastRow);
257       }
258       throw ioe;
259     }
260   }
261 }