001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
021
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.client.ConnectionConfiguration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.ResultScanner;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.filter.Filter;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.util.StringUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Iterate over an HBase table data, return (Text, RowResult) pairs
042 */
043@InterfaceAudience.Public
044public class TableRecordReaderImpl {
045  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
046
047  private byte[] startRow;
048  private byte[] endRow;
049  private byte[] lastSuccessfulRow;
050  private Filter trrRowFilter;
051  private ResultScanner scanner;
052  private Table htable;
053  private byte[][] trrInputColumns;
054  private long timestamp;
055  private int rowcount;
056  private boolean logScannerActivity = false;
057  private int logPerRowCount = 100;
058
059  /**
060   * Restart from survivable exceptions by creating a new scanner.
061   */
062  public void restart(byte[] firstRow) throws IOException {
063    Scan currentScan;
064    if ((endRow != null) && (endRow.length > 0)) {
065      if (trrRowFilter != null) {
066        Scan scan = new Scan().withStartRow(firstRow).withStopRow(endRow);
067        TableInputFormat.addColumns(scan, trrInputColumns);
068        scan.setFilter(trrRowFilter);
069        scan.setCacheBlocks(false);
070        this.scanner = this.htable.getScanner(scan);
071        currentScan = scan;
072      } else {
073        LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", endRow: "
074          + Bytes.toStringBinary(endRow));
075        Scan scan = new Scan().withStartRow(firstRow).withStopRow(endRow);
076        TableInputFormat.addColumns(scan, trrInputColumns);
077        this.scanner = this.htable.getScanner(scan);
078        currentScan = scan;
079      }
080    } else {
081      LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", no endRow");
082
083      Scan scan = new Scan().withStartRow(firstRow);
084      TableInputFormat.addColumns(scan, trrInputColumns);
085      scan.setFilter(trrRowFilter);
086      this.scanner = this.htable.getScanner(scan);
087      currentScan = scan;
088    }
089    if (logScannerActivity) {
090      LOG.info("Current scan=" + currentScan.toString());
091      timestamp = EnvironmentEdgeManager.currentTime();
092      rowcount = 0;
093    }
094  }
095
096  /**
097   * Build the scanner. Not done in constructor to allow for extension.
098   */
099  public void init() throws IOException {
100    restart(startRow);
101  }
102
103  byte[] getStartRow() {
104    return this.startRow;
105  }
106
107  /**
108   * @param htable the table to scan.
109   */
110  public void setHTable(Table htable) {
111    Configuration conf = htable.getConfiguration();
112    logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false);
113    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
114    this.htable = htable;
115  }
116
117  /**
118   * @param inputColumns the columns to be placed in {@link Result}.
119   */
120  public void setInputColumns(final byte[][] inputColumns) {
121    this.trrInputColumns = inputColumns;
122  }
123
124  /**
125   * @param startRow the first row in the split
126   */
127  public void setStartRow(final byte[] startRow) {
128    this.startRow = startRow;
129  }
130
131  /**
132   * @param endRow the last row in the split
133   */
134  public void setEndRow(final byte[] endRow) {
135    this.endRow = endRow;
136  }
137
138  /**
139   * @param rowFilter the {@link Filter} to be used.
140   */
141  public void setRowFilter(Filter rowFilter) {
142    this.trrRowFilter = rowFilter;
143  }
144
145  public void close() {
146    if (this.scanner != null) {
147      this.scanner.close();
148    }
149    try {
150      this.htable.close();
151    } catch (IOException ioe) {
152      LOG.warn("Error closing table", ioe);
153    }
154  }
155
156  /**
157   * n *
158   * @see org.apache.hadoop.mapred.RecordReader#createKey()
159   */
160  public ImmutableBytesWritable createKey() {
161    return new ImmutableBytesWritable();
162  }
163
164  /**
165   * n *
166   * @see org.apache.hadoop.mapred.RecordReader#createValue()
167   */
168  public Result createValue() {
169    return new Result();
170  }
171
172  public long getPos() {
173    // This should be the ordinal tuple in the range;
174    // not clear how to calculate...
175    return 0;
176  }
177
178  public float getProgress() {
179    // Depends on the total number of tuples and getPos
180    return 0;
181  }
182
183  /**
184   * @param key   HStoreKey as input key.
185   * @param value MapWritable as input value
186   * @return true if there was more data
187   */
188  public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
189    Result result;
190    try {
191      try {
192        result = this.scanner.next();
193        if (logScannerActivity) {
194          rowcount++;
195          if (rowcount >= logPerRowCount) {
196            long now = EnvironmentEdgeManager.currentTime();
197            LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows");
198            timestamp = now;
199            rowcount = 0;
200          }
201        }
202      } catch (IOException e) {
203        // do not retry if the exception tells us not to do so
204        if (e instanceof DoNotRetryIOException) {
205          throw e;
206        }
207        // try to handle all other IOExceptions by restarting
208        // the scanner, if the second call fails, it will be rethrown
209        LOG.debug("recovered from " + StringUtils.stringifyException(e));
210        if (lastSuccessfulRow == null) {
211          LOG.warn("We are restarting the first next() invocation,"
212            + " if your mapper has restarted a few other times like this"
213            + " then you should consider killing this job and investigate"
214            + " why it's taking so long.");
215        }
216        if (lastSuccessfulRow == null) {
217          restart(startRow);
218        } else {
219          restart(lastSuccessfulRow);
220          this.scanner.next(); // skip presumed already mapped row
221        }
222        result = this.scanner.next();
223      }
224
225      if (result != null && result.size() > 0) {
226        key.set(result.getRow());
227        lastSuccessfulRow = key.get();
228        value.copyFrom(result);
229        return true;
230      }
231      return false;
232    } catch (IOException ioe) {
233      if (logScannerActivity) {
234        long now = EnvironmentEdgeManager.currentTime();
235        LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows");
236        LOG.info(ioe.toString(), ioe);
237        String lastRow =
238          lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow);
239        LOG.info("lastSuccessfulRow=" + lastRow);
240      }
241      throw ioe;
242    }
243  }
244}