001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import java.io.IOException;
022import org.apache.yetus.audience.InterfaceAudience;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.ResultScanner;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.client.ScannerCallable;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.DoNotRetryIOException;
032import org.apache.hadoop.hbase.filter.Filter;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.util.StringUtils;
037
038import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
039
040/**
041 * Iterate over an HBase table data, return (Text, RowResult) pairs
042 */
043@InterfaceAudience.Public
044public class TableRecordReaderImpl {
045  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
046
047  private byte [] startRow;
048  private byte [] endRow;
049  private byte [] lastSuccessfulRow;
050  private Filter trrRowFilter;
051  private ResultScanner scanner;
052  private Table htable;
053  private byte [][] trrInputColumns;
054  private long timestamp;
055  private int rowcount;
056  private boolean logScannerActivity = false;
057  private int logPerRowCount = 100;
058
059  /**
060   * Restart from survivable exceptions by creating a new scanner.
061   *
062   * @param firstRow
063   * @throws IOException
064   */
065  public void restart(byte[] firstRow) throws IOException {
066    Scan currentScan;
067    if ((endRow != null) && (endRow.length > 0)) {
068      if (trrRowFilter != null) {
069        Scan scan = new Scan(firstRow, endRow);
070        TableInputFormat.addColumns(scan, trrInputColumns);
071        scan.setFilter(trrRowFilter);
072        scan.setCacheBlocks(false);
073        this.scanner = this.htable.getScanner(scan);
074        currentScan = scan;
075      } else {
076        LOG.debug("TIFB.restart, firstRow: " +
077            Bytes.toStringBinary(firstRow) + ", endRow: " +
078            Bytes.toStringBinary(endRow));
079        Scan scan = new Scan(firstRow, endRow);
080        TableInputFormat.addColumns(scan, trrInputColumns);
081        this.scanner = this.htable.getScanner(scan);
082        currentScan = scan;
083      }
084    } else {
085      LOG.debug("TIFB.restart, firstRow: " +
086          Bytes.toStringBinary(firstRow) + ", no endRow");
087
088      Scan scan = new Scan(firstRow);
089      TableInputFormat.addColumns(scan, trrInputColumns);
090      scan.setFilter(trrRowFilter);
091      this.scanner = this.htable.getScanner(scan);
092      currentScan = scan;
093    }
094    if (logScannerActivity) {
095      LOG.info("Current scan=" + currentScan.toString());
096      timestamp = System.currentTimeMillis();
097      rowcount = 0;
098    }
099  }
100
101  /**
102   * Build the scanner. Not done in constructor to allow for extension.
103   *
104   * @throws IOException
105   */
106  public void init() throws IOException {
107    restart(startRow);
108  }
109
110  byte[] getStartRow() {
111    return this.startRow;
112  }
113  /**
114   * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
115   */
116  public void setHTable(Table htable) {
117    Configuration conf = htable.getConfiguration();
118    logScannerActivity = conf.getBoolean(
119      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
120    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
121    this.htable = htable;
122  }
123
124  /**
125   * @param inputColumns the columns to be placed in {@link Result}.
126   */
127  public void setInputColumns(final byte [][] inputColumns) {
128    this.trrInputColumns = inputColumns;
129  }
130
131  /**
132   * @param startRow the first row in the split
133   */
134  public void setStartRow(final byte [] startRow) {
135    this.startRow = startRow;
136  }
137
138  /**
139   *
140   * @param endRow the last row in the split
141   */
142  public void setEndRow(final byte [] endRow) {
143    this.endRow = endRow;
144  }
145
146  /**
147   * @param rowFilter the {@link Filter} to be used.
148   */
149  public void setRowFilter(Filter rowFilter) {
150    this.trrRowFilter = rowFilter;
151  }
152
153  public void close() {
154    if (this.scanner != null) {
155      this.scanner.close();
156    }
157    try {
158      this.htable.close();
159    } catch (IOException ioe) {
160      LOG.warn("Error closing table", ioe);
161    }
162  }
163
164  /**
165   * @return ImmutableBytesWritable
166   *
167   * @see org.apache.hadoop.mapred.RecordReader#createKey()
168   */
169  public ImmutableBytesWritable createKey() {
170    return new ImmutableBytesWritable();
171  }
172
173  /**
174   * @return RowResult
175   *
176   * @see org.apache.hadoop.mapred.RecordReader#createValue()
177   */
178  public Result createValue() {
179    return new Result();
180  }
181
182  public long getPos() {
183    // This should be the ordinal tuple in the range;
184    // not clear how to calculate...
185    return 0;
186  }
187
188  public float getProgress() {
189    // Depends on the total number of tuples and getPos
190    return 0;
191  }
192
193  /**
194   * @param key HStoreKey as input key.
195   * @param value MapWritable as input value
196   * @return true if there was more data
197   * @throws IOException
198   */
199  public boolean next(ImmutableBytesWritable key, Result value)
200  throws IOException {
201    Result result;
202    try {
203      try {
204        result = this.scanner.next();
205        if (logScannerActivity) {
206          rowcount ++;
207          if (rowcount >= logPerRowCount) {
208            long now = System.currentTimeMillis();
209            LOG.info("Mapper took " + (now-timestamp)
210              + "ms to process " + rowcount + " rows");
211            timestamp = now;
212            rowcount = 0;
213          }
214        }
215      } catch (IOException e) {
216        // do not retry if the exception tells us not to do so
217        if (e instanceof DoNotRetryIOException) {
218          throw e;
219        }
220        // try to handle all other IOExceptions by restarting
221        // the scanner, if the second call fails, it will be rethrown
222        LOG.debug("recovered from " + StringUtils.stringifyException(e));
223        if (lastSuccessfulRow == null) {
224          LOG.warn("We are restarting the first next() invocation," +
225              " if your mapper has restarted a few other times like this" +
226              " then you should consider killing this job and investigate" +
227              " why it's taking so long.");
228        }
229        if (lastSuccessfulRow == null) {
230          restart(startRow);
231        } else {
232          restart(lastSuccessfulRow);
233          this.scanner.next();    // skip presumed already mapped row
234        }
235        result = this.scanner.next();
236      }
237
238      if (result != null && result.size() > 0) {
239        key.set(result.getRow());
240        lastSuccessfulRow = key.get();
241        value.copyFrom(result);
242        return true;
243      }
244      return false;
245    } catch (IOException ioe) {
246      if (logScannerActivity) {
247        long now = System.currentTimeMillis();
248        LOG.info("Mapper took " + (now-timestamp)
249          + "ms to process " + rowcount + " rows");
250        LOG.info(ioe.toString(), ioe);
251        String lastRow = lastSuccessfulRow == null ?
252          "null" : Bytes.toStringBinary(lastSuccessfulRow);
253        LOG.info("lastSuccessfulRow=" + lastRow);
254      }
255      throw ioe;
256    }
257  }
258}