View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.client.ScannerCallable;
32  import org.apache.hadoop.hbase.client.Table;
33  import org.apache.hadoop.hbase.filter.Filter;
34  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.util.StringUtils;
38  
39  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
40  
41  /**
42   * Iterate over an HBase table data, return (Text, RowResult) pairs
43   */
44  @InterfaceAudience.Public
45  @InterfaceStability.Stable
46  public class TableRecordReaderImpl {
47    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
48  
49    private byte [] startRow;
50    private byte [] endRow;
51    private byte [] lastSuccessfulRow;
52    private Filter trrRowFilter;
53    private ResultScanner scanner;
54    private Table htable;
55    private byte [][] trrInputColumns;
56    private long timestamp;
57    private int rowcount;
58    private boolean logScannerActivity = false;
59    private int logPerRowCount = 100;
60  
61    /**
62     * Restart from survivable exceptions by creating a new scanner.
63     *
64     * @param firstRow
65     * @throws IOException
66     */
67    public void restart(byte[] firstRow) throws IOException {
68      Scan currentScan;
69      if ((endRow != null) && (endRow.length > 0)) {
70        if (trrRowFilter != null) {
71          Scan scan = new Scan(firstRow, endRow);
72          TableInputFormat.addColumns(scan, trrInputColumns);
73          scan.setFilter(trrRowFilter);
74          scan.setCacheBlocks(false);
75          this.scanner = this.htable.getScanner(scan);
76          currentScan = scan;
77        } else {
78          LOG.debug("TIFB.restart, firstRow: " +
79              Bytes.toStringBinary(firstRow) + ", endRow: " +
80              Bytes.toStringBinary(endRow));
81          Scan scan = new Scan(firstRow, endRow);
82          TableInputFormat.addColumns(scan, trrInputColumns);
83          this.scanner = this.htable.getScanner(scan);
84          currentScan = scan;
85        }
86      } else {
87        LOG.debug("TIFB.restart, firstRow: " +
88            Bytes.toStringBinary(firstRow) + ", no endRow");
89  
90        Scan scan = new Scan(firstRow);
91        TableInputFormat.addColumns(scan, trrInputColumns);
92        scan.setFilter(trrRowFilter);
93        this.scanner = this.htable.getScanner(scan);
94        currentScan = scan;
95      }
96      if (logScannerActivity) {
97        LOG.info("Current scan=" + currentScan.toString());
98        timestamp = System.currentTimeMillis();
99        rowcount = 0;
100     }
101   }
102 
103   /**
104    * Build the scanner. Not done in constructor to allow for extension.
105    *
106    * @throws IOException
107    */
108   public void init() throws IOException {
109     restart(startRow);
110   }
111 
112   byte[] getStartRow() {
113     return this.startRow;
114   }
115   /**
116    * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
117    */
118   public void setHTable(Table htable) {
119     Configuration conf = htable.getConfiguration();
120     logScannerActivity = conf.getBoolean(
121       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
122     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
123     this.htable = htable;
124   }
125 
126   /**
127    * @param inputColumns the columns to be placed in {@link Result}.
128    */
129   public void setInputColumns(final byte [][] inputColumns) {
130     this.trrInputColumns = inputColumns;
131   }
132 
133   /**
134    * @param startRow the first row in the split
135    */
136   public void setStartRow(final byte [] startRow) {
137     this.startRow = startRow;
138   }
139 
140   /**
141    *
142    * @param endRow the last row in the split
143    */
144   public void setEndRow(final byte [] endRow) {
145     this.endRow = endRow;
146   }
147 
148   /**
149    * @param rowFilter the {@link Filter} to be used.
150    */
151   public void setRowFilter(Filter rowFilter) {
152     this.trrRowFilter = rowFilter;
153   }
154 
155   public void close() {
156     this.scanner.close();
157     try {
158       this.htable.close();
159     } catch (IOException ioe) {
160       LOG.warn("Error closing table", ioe);
161     }
162   }
163 
164   /**
165    * @return ImmutableBytesWritable
166    *
167    * @see org.apache.hadoop.mapred.RecordReader#createKey()
168    */
169   public ImmutableBytesWritable createKey() {
170     return new ImmutableBytesWritable();
171   }
172 
173   /**
174    * @return RowResult
175    *
176    * @see org.apache.hadoop.mapred.RecordReader#createValue()
177    */
178   public Result createValue() {
179     return new Result();
180   }
181 
182   public long getPos() {
183     // This should be the ordinal tuple in the range;
184     // not clear how to calculate...
185     return 0;
186   }
187 
188   public float getProgress() {
189     // Depends on the total number of tuples and getPos
190     return 0;
191   }
192 
193   /**
194    * @param key HStoreKey as input key.
195    * @param value MapWritable as input value
196    * @return true if there was more data
197    * @throws IOException
198    */
199   public boolean next(ImmutableBytesWritable key, Result value)
200   throws IOException {
201     Result result;
202     try {
203       try {
204         result = this.scanner.next();
205         if (logScannerActivity) {
206           rowcount ++;
207           if (rowcount >= logPerRowCount) {
208             long now = System.currentTimeMillis();
209             LOG.info("Mapper took " + (now-timestamp)
210               + "ms to process " + rowcount + " rows");
211             timestamp = now;
212             rowcount = 0;
213           }
214         }
215       } catch (IOException e) {
216         // try to handle all IOExceptions by restarting
217         // the scanner, if the second call fails, it will be rethrown
218         LOG.debug("recovered from " + StringUtils.stringifyException(e));
219         if (lastSuccessfulRow == null) {
220           LOG.warn("We are restarting the first next() invocation," +
221               " if your mapper has restarted a few other times like this" +
222               " then you should consider killing this job and investigate" +
223               " why it's taking so long.");
224         }
225         if (lastSuccessfulRow == null) {
226           restart(startRow);
227         } else {
228           restart(lastSuccessfulRow);
229           this.scanner.next();    // skip presumed already mapped row
230         }
231         result = this.scanner.next();
232       }
233 
234       if (result != null && result.size() > 0) {
235         key.set(result.getRow());
236         lastSuccessfulRow = key.get();
237         value.copyFrom(result);
238         return true;
239       }
240       return false;
241     } catch (IOException ioe) {
242       if (logScannerActivity) {
243         long now = System.currentTimeMillis();
244         LOG.info("Mapper took " + (now-timestamp)
245           + "ms to process " + rowcount + " rows");
246         LOG.info(ioe);
247         String lastRow = lastSuccessfulRow == null ?
248           "null" : Bytes.toStringBinary(lastSuccessfulRow);
249         LOG.info("lastSuccessfulRow=" + lastRow);
250       }
251       throw ioe;
252     }
253   }
254 }