001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.client.ResultScanner;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.filter.Filter;
030import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
031import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.util.StringUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Iterate over an HBase table data, return (Text, RowResult) pairs
040 */
041@InterfaceAudience.Public
042public class TableRecordReaderImpl {
043  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
044
045  private byte [] startRow;
046  private byte [] endRow;
047  private byte [] lastSuccessfulRow;
048  private Filter trrRowFilter;
049  private ResultScanner scanner;
050  private Table htable;
051  private byte [][] trrInputColumns;
052  private long timestamp;
053  private int rowcount;
054  private boolean logScannerActivity = false;
055  private int logPerRowCount = 100;
056
057  /**
058   * Restart from survivable exceptions by creating a new scanner.
059   */
060  public void restart(byte[] firstRow) throws IOException {
061    Scan currentScan;
062    if ((endRow != null) && (endRow.length > 0)) {
063      if (trrRowFilter != null) {
064        Scan scan = new Scan(firstRow, endRow);
065        TableInputFormat.addColumns(scan, trrInputColumns);
066        scan.setFilter(trrRowFilter);
067        scan.setCacheBlocks(false);
068        this.scanner = this.htable.getScanner(scan);
069        currentScan = scan;
070      } else {
071        LOG.debug("TIFB.restart, firstRow: " +
072            Bytes.toStringBinary(firstRow) + ", endRow: " +
073            Bytes.toStringBinary(endRow));
074        Scan scan = new Scan(firstRow, endRow);
075        TableInputFormat.addColumns(scan, trrInputColumns);
076        this.scanner = this.htable.getScanner(scan);
077        currentScan = scan;
078      }
079    } else {
080      LOG.debug("TIFB.restart, firstRow: " +
081          Bytes.toStringBinary(firstRow) + ", no endRow");
082
083      Scan scan = new Scan(firstRow);
084      TableInputFormat.addColumns(scan, trrInputColumns);
085      scan.setFilter(trrRowFilter);
086      this.scanner = this.htable.getScanner(scan);
087      currentScan = scan;
088    }
089    if (logScannerActivity) {
090      LOG.info("Current scan=" + currentScan.toString());
091      timestamp = System.currentTimeMillis();
092      rowcount = 0;
093    }
094  }
095
096  /**
097   * Build the scanner. Not done in constructor to allow for extension.
098   */
099  public void init() throws IOException {
100    restart(startRow);
101  }
102
103  byte[] getStartRow() {
104    return this.startRow;
105  }
106  /**
107   * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
108   */
109  public void setHTable(Table htable) {
110    Configuration conf = htable.getConfiguration();
111    logScannerActivity = conf.getBoolean(
112      "hbase.client.log.scanner.activity" /*ScannerCallable.LOG_SCANNER_ACTIVITY*/, false);
113    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
114    this.htable = htable;
115  }
116
117  /**
118   * @param inputColumns the columns to be placed in {@link Result}.
119   */
120  public void setInputColumns(final byte [][] inputColumns) {
121    this.trrInputColumns = inputColumns;
122  }
123
124  /**
125   * @param startRow the first row in the split
126   */
127  public void setStartRow(final byte [] startRow) {
128    this.startRow = startRow;
129  }
130
131  /**
132   *
133   * @param endRow the last row in the split
134   */
135  public void setEndRow(final byte [] endRow) {
136    this.endRow = endRow;
137  }
138
139  /**
140   * @param rowFilter the {@link Filter} to be used.
141   */
142  public void setRowFilter(Filter rowFilter) {
143    this.trrRowFilter = rowFilter;
144  }
145
146  public void close() {
147    if (this.scanner != null) {
148      this.scanner.close();
149    }
150    try {
151      this.htable.close();
152    } catch (IOException ioe) {
153      LOG.warn("Error closing table", ioe);
154    }
155  }
156
157  /**
158   * @return ImmutableBytesWritable
159   *
160   * @see org.apache.hadoop.mapred.RecordReader#createKey()
161   */
162  public ImmutableBytesWritable createKey() {
163    return new ImmutableBytesWritable();
164  }
165
166  /**
167   * @return RowResult
168   *
169   * @see org.apache.hadoop.mapred.RecordReader#createValue()
170   */
171  public Result createValue() {
172    return new Result();
173  }
174
175  public long getPos() {
176    // This should be the ordinal tuple in the range;
177    // not clear how to calculate...
178    return 0;
179  }
180
181  public float getProgress() {
182    // Depends on the total number of tuples and getPos
183    return 0;
184  }
185
186  /**
187   * @param key HStoreKey as input key.
188   * @param value MapWritable as input value
189   * @return true if there was more data
190   */
191  public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
192    Result result;
193    try {
194      try {
195        result = this.scanner.next();
196        if (logScannerActivity) {
197          rowcount ++;
198          if (rowcount >= logPerRowCount) {
199            long now = System.currentTimeMillis();
200            LOG.info("Mapper took " + (now-timestamp)
201              + "ms to process " + rowcount + " rows");
202            timestamp = now;
203            rowcount = 0;
204          }
205        }
206      } catch (IOException e) {
207        // do not retry if the exception tells us not to do so
208        if (e instanceof DoNotRetryIOException) {
209          throw e;
210        }
211        // try to handle all other IOExceptions by restarting
212        // the scanner, if the second call fails, it will be rethrown
213        LOG.debug("recovered from " + StringUtils.stringifyException(e));
214        if (lastSuccessfulRow == null) {
215          LOG.warn("We are restarting the first next() invocation," +
216              " if your mapper has restarted a few other times like this" +
217              " then you should consider killing this job and investigate" +
218              " why it's taking so long.");
219        }
220        if (lastSuccessfulRow == null) {
221          restart(startRow);
222        } else {
223          restart(lastSuccessfulRow);
224          this.scanner.next();    // skip presumed already mapped row
225        }
226        result = this.scanner.next();
227      }
228
229      if (result != null && result.size() > 0) {
230        key.set(result.getRow());
231        lastSuccessfulRow = key.get();
232        value.copyFrom(result);
233        return true;
234      }
235      return false;
236    } catch (IOException ioe) {
237      if (logScannerActivity) {
238        long now = System.currentTimeMillis();
239        LOG.info("Mapper took " + (now-timestamp)
240          + "ms to process " + rowcount + " rows");
241        LOG.info(ioe.toString(), ioe);
242        String lastRow = lastSuccessfulRow == null ?
243          "null" : Bytes.toStringBinary(lastSuccessfulRow);
244        LOG.info("lastSuccessfulRow=" + lastRow);
245      }
246      throw ioe;
247    }
248  }
249}