View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.client.ScannerCallable;
32  import org.apache.hadoop.hbase.client.Table;
33  import org.apache.hadoop.hbase.DoNotRetryIOException;
34  import org.apache.hadoop.hbase.filter.Filter;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.util.StringUtils;
39  
40  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
41  
42  /**
43   * Iterate over an HBase table data, return (Text, RowResult) pairs
44   */
45  @InterfaceAudience.Public
46  @InterfaceStability.Stable
47  public class TableRecordReaderImpl {
48    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
49  
50    private byte [] startRow;
51    private byte [] endRow;
52    private byte [] lastSuccessfulRow;
53    private Filter trrRowFilter;
54    private ResultScanner scanner;
55    private Table htable;
56    private byte [][] trrInputColumns;
57    private long timestamp;
58    private int rowcount;
59    private boolean logScannerActivity = false;
60    private int logPerRowCount = 100;
61  
62    /**
63     * Restart from survivable exceptions by creating a new scanner.
64     *
65     * @param firstRow
66     * @throws IOException
67     */
68    public void restart(byte[] firstRow) throws IOException {
69      Scan currentScan;
70      if ((endRow != null) && (endRow.length > 0)) {
71        if (trrRowFilter != null) {
72          Scan scan = new Scan(firstRow, endRow);
73          TableInputFormat.addColumns(scan, trrInputColumns);
74          scan.setFilter(trrRowFilter);
75          scan.setCacheBlocks(false);
76          this.scanner = this.htable.getScanner(scan);
77          currentScan = scan;
78        } else {
79          LOG.debug("TIFB.restart, firstRow: " +
80              Bytes.toStringBinary(firstRow) + ", endRow: " +
81              Bytes.toStringBinary(endRow));
82          Scan scan = new Scan(firstRow, endRow);
83          TableInputFormat.addColumns(scan, trrInputColumns);
84          this.scanner = this.htable.getScanner(scan);
85          currentScan = scan;
86        }
87      } else {
88        LOG.debug("TIFB.restart, firstRow: " +
89            Bytes.toStringBinary(firstRow) + ", no endRow");
90  
91        Scan scan = new Scan(firstRow);
92        TableInputFormat.addColumns(scan, trrInputColumns);
93        scan.setFilter(trrRowFilter);
94        this.scanner = this.htable.getScanner(scan);
95        currentScan = scan;
96      }
97      if (logScannerActivity) {
98        LOG.info("Current scan=" + currentScan.toString());
99        timestamp = System.currentTimeMillis();
100       rowcount = 0;
101     }
102   }
103 
104   /**
105    * Build the scanner. Not done in constructor to allow for extension.
106    *
107    * @throws IOException
108    */
109   public void init() throws IOException {
110     restart(startRow);
111   }
112 
113   byte[] getStartRow() {
114     return this.startRow;
115   }
116   /**
117    * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
118    */
119   public void setHTable(Table htable) {
120     Configuration conf = htable.getConfiguration();
121     logScannerActivity = conf.getBoolean(
122       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
123     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
124     this.htable = htable;
125   }
126 
127   /**
128    * @param inputColumns the columns to be placed in {@link Result}.
129    */
130   public void setInputColumns(final byte [][] inputColumns) {
131     this.trrInputColumns = inputColumns;
132   }
133 
134   /**
135    * @param startRow the first row in the split
136    */
137   public void setStartRow(final byte [] startRow) {
138     this.startRow = startRow;
139   }
140 
141   /**
142    *
143    * @param endRow the last row in the split
144    */
145   public void setEndRow(final byte [] endRow) {
146     this.endRow = endRow;
147   }
148 
149   /**
150    * @param rowFilter the {@link Filter} to be used.
151    */
152   public void setRowFilter(Filter rowFilter) {
153     this.trrRowFilter = rowFilter;
154   }
155 
156   public void close() {
157     this.scanner.close();
158     try {
159       this.htable.close();
160     } catch (IOException ioe) {
161       LOG.warn("Error closing table", ioe);
162     }
163   }
164 
165   /**
166    * @return ImmutableBytesWritable
167    *
168    * @see org.apache.hadoop.mapred.RecordReader#createKey()
169    */
170   public ImmutableBytesWritable createKey() {
171     return new ImmutableBytesWritable();
172   }
173 
174   /**
175    * @return RowResult
176    *
177    * @see org.apache.hadoop.mapred.RecordReader#createValue()
178    */
179   public Result createValue() {
180     return new Result();
181   }
182 
183   public long getPos() {
184     // This should be the ordinal tuple in the range;
185     // not clear how to calculate...
186     return 0;
187   }
188 
189   public float getProgress() {
190     // Depends on the total number of tuples and getPos
191     return 0;
192   }
193 
194   /**
195    * @param key HStoreKey as input key.
196    * @param value MapWritable as input value
197    * @return true if there was more data
198    * @throws IOException
199    */
200   public boolean next(ImmutableBytesWritable key, Result value)
201   throws IOException {
202     Result result;
203     try {
204       try {
205         result = this.scanner.next();
206         if (logScannerActivity) {
207           rowcount ++;
208           if (rowcount >= logPerRowCount) {
209             long now = System.currentTimeMillis();
210             LOG.info("Mapper took " + (now-timestamp)
211               + "ms to process " + rowcount + " rows");
212             timestamp = now;
213             rowcount = 0;
214           }
215         }
216       } catch (IOException e) {
217         // do not retry if the exception tells us not to do so
218         if (e instanceof DoNotRetryIOException) {
219           throw e;
220         }
221         // try to handle all other IOExceptions by restarting
222         // the scanner, if the second call fails, it will be rethrown
223         LOG.debug("recovered from " + StringUtils.stringifyException(e));
224         if (lastSuccessfulRow == null) {
225           LOG.warn("We are restarting the first next() invocation," +
226               " if your mapper has restarted a few other times like this" +
227               " then you should consider killing this job and investigate" +
228               " why it's taking so long.");
229         }
230         if (lastSuccessfulRow == null) {
231           restart(startRow);
232         } else {
233           restart(lastSuccessfulRow);
234           this.scanner.next();    // skip presumed already mapped row
235         }
236         result = this.scanner.next();
237       }
238 
239       if (result != null && result.size() > 0) {
240         key.set(result.getRow());
241         lastSuccessfulRow = key.get();
242         value.copyFrom(result);
243         return true;
244       }
245       return false;
246     } catch (IOException ioe) {
247       if (logScannerActivity) {
248         long now = System.currentTimeMillis();
249         LOG.info("Mapper took " + (now-timestamp)
250           + "ms to process " + rowcount + " rows");
251         LOG.info(ioe);
252         String lastRow = lastSuccessfulRow == null ?
253           "null" : Bytes.toStringBinary(lastSuccessfulRow);
254         LOG.info("lastSuccessfulRow=" + lastRow);
255       }
256       throw ioe;
257     }
258   }
259 }