View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.client.ResultScanner;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.client.ScannerCallable;
33  import org.apache.hadoop.hbase.filter.Filter;
34  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.util.StringUtils;
38  
39  import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
40  
41  /**
42   * Iterate over an HBase table data, return (Text, RowResult) pairs
43   */
44  @Deprecated
45  @InterfaceAudience.Public
46  @InterfaceStability.Stable
47  public class TableRecordReaderImpl {
48    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
49  
50    private byte [] startRow;
51    private byte [] endRow;
52    private byte [] lastSuccessfulRow;
53    private Filter trrRowFilter;
54    private ResultScanner scanner;
55    private HTable htable;
56    private byte [][] trrInputColumns;
57    private long timestamp;
58    private int rowcount;
59    private boolean logScannerActivity = false;
60    private int logPerRowCount = 100;
61  
62    /**
63     * Restart from survivable exceptions by creating a new scanner.
64     *
65     * @param firstRow
66     * @throws IOException
67     */
68    public void restart(byte[] firstRow) throws IOException {
69      Scan currentScan;
70      if ((endRow != null) && (endRow.length > 0)) {
71        if (trrRowFilter != null) {
72          Scan scan = new Scan(firstRow, endRow);
73          TableInputFormat.addColumns(scan, trrInputColumns);
74          scan.setFilter(trrRowFilter);
75          scan.setCacheBlocks(false);
76          this.scanner = this.htable.getScanner(scan);
77          currentScan = scan;
78        } else {
79          LOG.debug("TIFB.restart, firstRow: " +
80              Bytes.toStringBinary(firstRow) + ", endRow: " +
81              Bytes.toStringBinary(endRow));
82          Scan scan = new Scan(firstRow, endRow);
83          TableInputFormat.addColumns(scan, trrInputColumns);
84          this.scanner = this.htable.getScanner(scan);
85          currentScan = scan;
86        }
87      } else {
88        LOG.debug("TIFB.restart, firstRow: " +
89            Bytes.toStringBinary(firstRow) + ", no endRow");
90  
91        Scan scan = new Scan(firstRow);
92        TableInputFormat.addColumns(scan, trrInputColumns);
93        scan.setFilter(trrRowFilter);
94        this.scanner = this.htable.getScanner(scan);
95        currentScan = scan;
96      }
97      if (logScannerActivity) {
98        LOG.info("Current scan=" + currentScan.toString());
99        timestamp = System.currentTimeMillis();
100       rowcount = 0;
101     }
102   }
103 
104   /**
105    * Build the scanner. Not done in constructor to allow for extension.
106    *
107    * @throws IOException
108    */
109   public void init() throws IOException {
110     restart(startRow);
111   }
112 
113   byte[] getStartRow() {
114     return this.startRow;
115   }
116   /**
117    * @param htable the {@link HTable} to scan.
118    */
119   public void setHTable(HTable htable) {
120     Configuration conf = htable.getConfiguration();
121     logScannerActivity = conf.getBoolean(
122       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
123     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
124     this.htable = htable;
125   }
126 
127   /**
128    * @param inputColumns the columns to be placed in {@link Result}.
129    */
130   public void setInputColumns(final byte [][] inputColumns) {
131     this.trrInputColumns = inputColumns;
132   }
133 
134   /**
135    * @param startRow the first row in the split
136    */
137   public void setStartRow(final byte [] startRow) {
138     this.startRow = startRow;
139   }
140 
141   /**
142    *
143    * @param endRow the last row in the split
144    */
145   public void setEndRow(final byte [] endRow) {
146     this.endRow = endRow;
147   }
148 
149   /**
150    * @param rowFilter the {@link Filter} to be used.
151    */
152   public void setRowFilter(Filter rowFilter) {
153     this.trrRowFilter = rowFilter;
154   }
155 
156   public void close() {
157     this.scanner.close();
158   }
159 
160   /**
161    * @return ImmutableBytesWritable
162    *
163    * @see org.apache.hadoop.mapred.RecordReader#createKey()
164    */
165   public ImmutableBytesWritable createKey() {
166     return new ImmutableBytesWritable();
167   }
168 
169   /**
170    * @return RowResult
171    *
172    * @see org.apache.hadoop.mapred.RecordReader#createValue()
173    */
174   public Result createValue() {
175     return new Result();
176   }
177 
178   public long getPos() {
179     // This should be the ordinal tuple in the range;
180     // not clear how to calculate...
181     return 0;
182   }
183 
184   public float getProgress() {
185     // Depends on the total number of tuples and getPos
186     return 0;
187   }
188 
189   /**
190    * @param key HStoreKey as input key.
191    * @param value MapWritable as input value
192    * @return true if there was more data
193    * @throws IOException
194    */
195   public boolean next(ImmutableBytesWritable key, Result value)
196   throws IOException {
197     Result result;
198     try {
199       try {
200         result = this.scanner.next();
201         if (logScannerActivity) {
202           rowcount ++;
203           if (rowcount >= logPerRowCount) {
204             long now = System.currentTimeMillis();
205             LOG.info("Mapper took " + (now-timestamp)
206               + "ms to process " + rowcount + " rows");
207             timestamp = now;
208             rowcount = 0;
209           }
210         }
211       } catch (IOException e) {
212         // try to handle all IOExceptions by restarting
213         // the scanner, if the second call fails, it will be rethrown
214         LOG.debug("recovered from " + StringUtils.stringifyException(e));
215         if (lastSuccessfulRow == null) {
216           LOG.warn("We are restarting the first next() invocation," +
217               " if your mapper has restarted a few other times like this" +
218               " then you should consider killing this job and investigate" +
219               " why it's taking so long.");
220         }
221         if (lastSuccessfulRow == null) {
222           restart(startRow);
223         } else {
224           restart(lastSuccessfulRow);
225           this.scanner.next();    // skip presumed already mapped row
226         }
227         result = this.scanner.next();
228       }
229 
230       if (result != null && result.size() > 0) {
231         key.set(result.getRow());
232         lastSuccessfulRow = key.get();
233         value.copyFrom(result);
234         return true;
235       }
236       return false;
237     } catch (IOException ioe) {
238       if (logScannerActivity) {
239         long now = System.currentTimeMillis();
240         LOG.info("Mapper took " + (now-timestamp)
241           + "ms to process " + rowcount + " rows");
242         LOG.info(ioe);
243         String lastRow = lastSuccessfulRow == null ?
244           "null" : Bytes.toStringBinary(lastSuccessfulRow);
245         LOG.info("lastSuccessfulRow=" + lastRow);
246       }
247       throw ioe;
248     }
249   }
250 }