001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.client.ConnectionConfiguration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.ResultScanner;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.filter.Filter;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.util.StringUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Iterate over an HBase table data, return (Text, RowResult) pairs
041 */
042@InterfaceAudience.Public
043public class TableRecordReaderImpl {
044  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
045
046  private byte [] startRow;
047  private byte [] endRow;
048  private byte [] lastSuccessfulRow;
049  private Filter trrRowFilter;
050  private ResultScanner scanner;
051  private Table htable;
052  private byte [][] trrInputColumns;
053  private long timestamp;
054  private int rowcount;
055  private boolean logScannerActivity = false;
056  private int logPerRowCount = 100;
057
058  /**
059   * Restart from survivable exceptions by creating a new scanner.
060   */
061  public void restart(byte[] firstRow) throws IOException {
062    Scan currentScan;
063    if ((endRow != null) && (endRow.length > 0)) {
064      if (trrRowFilter != null) {
065        Scan scan = new Scan().withStartRow(firstRow).withStopRow(endRow);
066        TableInputFormat.addColumns(scan, trrInputColumns);
067        scan.setFilter(trrRowFilter);
068        scan.setCacheBlocks(false);
069        this.scanner = this.htable.getScanner(scan);
070        currentScan = scan;
071      } else {
072        LOG.debug("TIFB.restart, firstRow: " +
073            Bytes.toStringBinary(firstRow) + ", endRow: " +
074            Bytes.toStringBinary(endRow));
075        Scan scan = new Scan().withStartRow(firstRow).withStopRow(endRow);
076        TableInputFormat.addColumns(scan, trrInputColumns);
077        this.scanner = this.htable.getScanner(scan);
078        currentScan = scan;
079      }
080    } else {
081      LOG.debug("TIFB.restart, firstRow: " +
082          Bytes.toStringBinary(firstRow) + ", no endRow");
083
084      Scan scan = new Scan().withStartRow(firstRow);
085      TableInputFormat.addColumns(scan, trrInputColumns);
086      scan.setFilter(trrRowFilter);
087      this.scanner = this.htable.getScanner(scan);
088      currentScan = scan;
089    }
090    if (logScannerActivity) {
091      LOG.info("Current scan=" + currentScan.toString());
092      timestamp = System.currentTimeMillis();
093      rowcount = 0;
094    }
095  }
096
097  /**
098   * Build the scanner. Not done in constructor to allow for extension.
099   */
100  public void init() throws IOException {
101    restart(startRow);
102  }
103
104  byte[] getStartRow() {
105    return this.startRow;
106  }
107
108  /**
109   * @param htable the table to scan.
110   */
111  public void setHTable(Table htable) {
112    Configuration conf = htable.getConfiguration();
113    logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false);
114    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
115    this.htable = htable;
116  }
117
118  /**
119   * @param inputColumns the columns to be placed in {@link Result}.
120   */
121  public void setInputColumns(final byte [][] inputColumns) {
122    this.trrInputColumns = inputColumns;
123  }
124
125  /**
126   * @param startRow the first row in the split
127   */
128  public void setStartRow(final byte [] startRow) {
129    this.startRow = startRow;
130  }
131
132  /**
133   *
134   * @param endRow the last row in the split
135   */
136  public void setEndRow(final byte [] endRow) {
137    this.endRow = endRow;
138  }
139
140  /**
141   * @param rowFilter the {@link Filter} to be used.
142   */
143  public void setRowFilter(Filter rowFilter) {
144    this.trrRowFilter = rowFilter;
145  }
146
147  public void close() {
148    if (this.scanner != null) {
149      this.scanner.close();
150    }
151    try {
152      this.htable.close();
153    } catch (IOException ioe) {
154      LOG.warn("Error closing table", ioe);
155    }
156  }
157
158  /**
159   * @return ImmutableBytesWritable
160   *
161   * @see org.apache.hadoop.mapred.RecordReader#createKey()
162   */
163  public ImmutableBytesWritable createKey() {
164    return new ImmutableBytesWritable();
165  }
166
167  /**
168   * @return RowResult
169   *
170   * @see org.apache.hadoop.mapred.RecordReader#createValue()
171   */
172  public Result createValue() {
173    return new Result();
174  }
175
176  public long getPos() {
177    // This should be the ordinal tuple in the range;
178    // not clear how to calculate...
179    return 0;
180  }
181
182  public float getProgress() {
183    // Depends on the total number of tuples and getPos
184    return 0;
185  }
186
187  /**
188   * @param key HStoreKey as input key.
189   * @param value MapWritable as input value
190   * @return true if there was more data
191   */
192  public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
193    Result result;
194    try {
195      try {
196        result = this.scanner.next();
197        if (logScannerActivity) {
198          rowcount ++;
199          if (rowcount >= logPerRowCount) {
200            long now = System.currentTimeMillis();
201            LOG.info("Mapper took " + (now-timestamp)
202              + "ms to process " + rowcount + " rows");
203            timestamp = now;
204            rowcount = 0;
205          }
206        }
207      } catch (IOException e) {
208        // do not retry if the exception tells us not to do so
209        if (e instanceof DoNotRetryIOException) {
210          throw e;
211        }
212        // try to handle all other IOExceptions by restarting
213        // the scanner, if the second call fails, it will be rethrown
214        LOG.debug("recovered from " + StringUtils.stringifyException(e));
215        if (lastSuccessfulRow == null) {
216          LOG.warn("We are restarting the first next() invocation," +
217              " if your mapper has restarted a few other times like this" +
218              " then you should consider killing this job and investigate" +
219              " why it's taking so long.");
220        }
221        if (lastSuccessfulRow == null) {
222          restart(startRow);
223        } else {
224          restart(lastSuccessfulRow);
225          this.scanner.next();    // skip presumed already mapped row
226        }
227        result = this.scanner.next();
228      }
229
230      if (result != null && result.size() > 0) {
231        key.set(result.getRow());
232        lastSuccessfulRow = key.get();
233        value.copyFrom(result);
234        return true;
235      }
236      return false;
237    } catch (IOException ioe) {
238      if (logScannerActivity) {
239        long now = System.currentTimeMillis();
240        LOG.info("Mapper took " + (now-timestamp)
241          + "ms to process " + rowcount + " rows");
242        LOG.info(ioe.toString(), ioe);
243        String lastRow = lastSuccessfulRow == null ?
244          "null" : Bytes.toStringBinary(lastSuccessfulRow);
245        LOG.info("lastSuccessfulRow=" + lastRow);
246      }
247      throw ioe;
248    }
249  }
250}