View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.client.ResultScanner;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.client.ScannerCallable;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.filter.Filter;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.util.StringUtils;
39  
40  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
41  
42  /**
43   * Iterate over an HBase table data, return (Text, RowResult) pairs
44   */
45  @Deprecated
46  @InterfaceAudience.Public
47  @InterfaceStability.Stable
48  public class TableRecordReaderImpl {
49    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
50  
51    private byte [] startRow;
52    private byte [] endRow;
53    private byte [] lastSuccessfulRow;
54    private Filter trrRowFilter;
55    private ResultScanner scanner;
56    private Table htable;
57    private byte [][] trrInputColumns;
58    private long timestamp;
59    private int rowcount;
60    private boolean logScannerActivity = false;
61    private int logPerRowCount = 100;
62  
63    /**
64     * Restart from survivable exceptions by creating a new scanner.
65     *
66     * @param firstRow
67     * @throws IOException
68     */
69    public void restart(byte[] firstRow) throws IOException {
70      Scan currentScan;
71      if ((endRow != null) && (endRow.length > 0)) {
72        if (trrRowFilter != null) {
73          Scan scan = new Scan(firstRow, endRow);
74          TableInputFormat.addColumns(scan, trrInputColumns);
75          scan.setFilter(trrRowFilter);
76          scan.setCacheBlocks(false);
77          this.scanner = this.htable.getScanner(scan);
78          currentScan = scan;
79        } else {
80          LOG.debug("TIFB.restart, firstRow: " +
81              Bytes.toStringBinary(firstRow) + ", endRow: " +
82              Bytes.toStringBinary(endRow));
83          Scan scan = new Scan(firstRow, endRow);
84          TableInputFormat.addColumns(scan, trrInputColumns);
85          this.scanner = this.htable.getScanner(scan);
86          currentScan = scan;
87        }
88      } else {
89        LOG.debug("TIFB.restart, firstRow: " +
90            Bytes.toStringBinary(firstRow) + ", no endRow");
91  
92        Scan scan = new Scan(firstRow);
93        TableInputFormat.addColumns(scan, trrInputColumns);
94        scan.setFilter(trrRowFilter);
95        this.scanner = this.htable.getScanner(scan);
96        currentScan = scan;
97      }
98      if (logScannerActivity) {
99        LOG.info("Current scan=" + currentScan.toString());
100       timestamp = System.currentTimeMillis();
101       rowcount = 0;
102     }
103   }
104 
105   /**
106    * Build the scanner. Not done in constructor to allow for extension.
107    *
108    * @throws IOException
109    */
110   public void init() throws IOException {
111     restart(startRow);
112   }
113 
114   byte[] getStartRow() {
115     return this.startRow;
116   }
117   /**
118    * @param htable the {@link HTable} to scan.
119    */
120   public void setHTable(Table htable) {
121     Configuration conf = htable.getConfiguration();
122     logScannerActivity = conf.getBoolean(
123       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
124     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
125     this.htable = htable;
126   }
127 
128   /**
129    * @param inputColumns the columns to be placed in {@link Result}.
130    */
131   public void setInputColumns(final byte [][] inputColumns) {
132     this.trrInputColumns = inputColumns;
133   }
134 
135   /**
136    * @param startRow the first row in the split
137    */
138   public void setStartRow(final byte [] startRow) {
139     this.startRow = startRow;
140   }
141 
142   /**
143    *
144    * @param endRow the last row in the split
145    */
146   public void setEndRow(final byte [] endRow) {
147     this.endRow = endRow;
148   }
149 
150   /**
151    * @param rowFilter the {@link Filter} to be used.
152    */
153   public void setRowFilter(Filter rowFilter) {
154     this.trrRowFilter = rowFilter;
155   }
156 
157   public void close() {
158     this.scanner.close();
159     try {
160       this.htable.close();
161     } catch (IOException ioe) {
162       LOG.warn("Error closing table", ioe);
163     }
164   }
165 
166   /**
167    * @return ImmutableBytesWritable
168    *
169    * @see org.apache.hadoop.mapred.RecordReader#createKey()
170    */
171   public ImmutableBytesWritable createKey() {
172     return new ImmutableBytesWritable();
173   }
174 
175   /**
176    * @return RowResult
177    *
178    * @see org.apache.hadoop.mapred.RecordReader#createValue()
179    */
180   public Result createValue() {
181     return new Result();
182   }
183 
184   public long getPos() {
185     // This should be the ordinal tuple in the range;
186     // not clear how to calculate...
187     return 0;
188   }
189 
190   public float getProgress() {
191     // Depends on the total number of tuples and getPos
192     return 0;
193   }
194 
195   /**
196    * @param key HStoreKey as input key.
197    * @param value MapWritable as input value
198    * @return true if there was more data
199    * @throws IOException
200    */
201   public boolean next(ImmutableBytesWritable key, Result value)
202   throws IOException {
203     Result result;
204     try {
205       try {
206         result = this.scanner.next();
207         if (logScannerActivity) {
208           rowcount ++;
209           if (rowcount >= logPerRowCount) {
210             long now = System.currentTimeMillis();
211             LOG.info("Mapper took " + (now-timestamp)
212               + "ms to process " + rowcount + " rows");
213             timestamp = now;
214             rowcount = 0;
215           }
216         }
217       } catch (IOException e) {
218         // try to handle all IOExceptions by restarting
219         // the scanner, if the second call fails, it will be rethrown
220         LOG.debug("recovered from " + StringUtils.stringifyException(e));
221         if (lastSuccessfulRow == null) {
222           LOG.warn("We are restarting the first next() invocation," +
223               " if your mapper has restarted a few other times like this" +
224               " then you should consider killing this job and investigate" +
225               " why it's taking so long.");
226         }
227         if (lastSuccessfulRow == null) {
228           restart(startRow);
229         } else {
230           restart(lastSuccessfulRow);
231           this.scanner.next();    // skip presumed already mapped row
232         }
233         result = this.scanner.next();
234       }
235 
236       if (result != null && result.size() > 0) {
237         key.set(result.getRow());
238         lastSuccessfulRow = key.get();
239         value.copyFrom(result);
240         return true;
241       }
242       return false;
243     } catch (IOException ioe) {
244       if (logScannerActivity) {
245         long now = System.currentTimeMillis();
246         LOG.info("Mapper took " + (now-timestamp)
247           + "ms to process " + rowcount + " rows");
248         LOG.info(ioe);
249         String lastRow = lastSuccessfulRow == null ?
250           "null" : Bytes.toStringBinary(lastSuccessfulRow);
251         LOG.info("lastSuccessfulRow=" + lastRow);
252       }
253       throw ioe;
254     }
255   }
256 }