001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Method;
022import java.util.Map;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.client.Result;
028import org.apache.hadoop.hbase.client.ResultScanner;
029import org.apache.hadoop.hbase.client.Scan;
030import org.apache.hadoop.hbase.client.ScannerCallable;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.mapreduce.Counter;
037import org.apache.hadoop.mapreduce.InputSplit;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.util.StringUtils;
040
041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
042
043/**
044 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
045 * pairs.
046 */
047@InterfaceAudience.Public
048public class TableRecordReaderImpl {
049  public static final String LOG_PER_ROW_COUNT
050      = "hbase.mapreduce.log.scanner.rowcount";
051
052  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
053
054  // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
055  @VisibleForTesting
056  static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
057  private ResultScanner scanner = null;
058  private Scan scan = null;
059  private Scan currentScan = null;
060  private Table htable = null;
061  private byte[] lastSuccessfulRow = null;
062  private ImmutableBytesWritable key = null;
063  private Result value = null;
064  private TaskAttemptContext context = null;
065  private Method getCounter = null;
066  private long numRestarts = 0;
067  private long numStale = 0;
068  private long timestamp;
069  private int rowcount;
070  private boolean logScannerActivity = false;
071  private int logPerRowCount = 100;
072
073  /**
074   * Restart from survivable exceptions by creating a new scanner.
075   *
076   * @param firstRow  The first row to start at.
077   * @throws IOException When restarting fails.
078   */
079  public void restart(byte[] firstRow) throws IOException {
080    currentScan = new Scan(scan);
081    currentScan.withStartRow(firstRow);
082    currentScan.setScanMetricsEnabled(true);
083    if (this.scanner != null) {
084      if (logScannerActivity) {
085        LOG.info("Closing the previously opened scanner object.");
086      }
087      this.scanner.close();
088    }
089    this.scanner = this.htable.getScanner(currentScan);
090    if (logScannerActivity) {
091      LOG.info("Current scan=" + currentScan.toString());
092      timestamp = System.currentTimeMillis();
093      rowcount = 0;
094    }
095  }
096
097  /**
098   * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
099   * Check if getCounter(String, String) method is available.
100   * @return The getCounter method or null if not available.
101   * @throws IOException
102   */
103  protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
104  throws IOException {
105    Method m = null;
106    try {
107      m = context.getClass().getMethod("getCounter",
108        new Class [] {String.class, String.class});
109    } catch (SecurityException e) {
110      throw new IOException("Failed test for getCounter", e);
111    } catch (NoSuchMethodException e) {
112      // Ignore
113    }
114    return m;
115  }
116
117  /**
118   * Sets the HBase table.
119   *
120   * @param htable  The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
121   */
122  public void setHTable(Table htable) {
123    Configuration conf = htable.getConfiguration();
124    logScannerActivity = conf.getBoolean(
125      ScannerCallable.LOG_SCANNER_ACTIVITY, false);
126    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
127    this.htable = htable;
128  }
129
130  /**
131   * Sets the scan defining the actual details like columns etc.
132   *
133   * @param scan  The scan to set.
134   */
135  public void setScan(Scan scan) {
136    this.scan = scan;
137  }
138
139  /**
140   * Build the scanner. Not done in constructor to allow for extension.
141   *
142   * @throws IOException
143   * @throws InterruptedException
144   */
145  public void initialize(InputSplit inputsplit,
146      TaskAttemptContext context) throws IOException,
147      InterruptedException {
148    if (context != null) {
149      this.context = context;
150      getCounter = retrieveGetCounterWithStringsParams(context);
151    }
152    restart(scan.getStartRow());
153  }
154
155  /**
156   * Closes the split.
157   *
158   *
159   */
160  public void close() {
161    if (this.scanner != null) {
162      this.scanner.close();
163    }
164    try {
165      this.htable.close();
166    } catch (IOException ioe) {
167      LOG.warn("Error closing table", ioe);
168    }
169  }
170
171  /**
172   * Returns the current key.
173   *
174   * @return The current key.
175   * @throws IOException
176   * @throws InterruptedException When the job is aborted.
177   */
178  public ImmutableBytesWritable getCurrentKey() throws IOException,
179      InterruptedException {
180    return key;
181  }
182
183  /**
184   * Returns the current value.
185   *
186   * @return The current value.
187   * @throws IOException When the value is faulty.
188   * @throws InterruptedException When the job is aborted.
189   */
190  public Result getCurrentValue() throws IOException, InterruptedException {
191    return value;
192  }
193
194
195  /**
196   * Positions the record reader to the next record.
197   *
198   * @return <code>true</code> if there was another record.
199   * @throws IOException When reading the record failed.
200   * @throws InterruptedException When the job was aborted.
201   */
202  public boolean nextKeyValue() throws IOException, InterruptedException {
203    if (key == null) key = new ImmutableBytesWritable();
204    if (value == null) value = new Result();
205    try {
206      try {
207        value = this.scanner.next();
208        if (value != null && value.isStale()) numStale++;
209        if (logScannerActivity) {
210          rowcount ++;
211          if (rowcount >= logPerRowCount) {
212            long now = System.currentTimeMillis();
213            LOG.info("Mapper took " + (now-timestamp)
214              + "ms to process " + rowcount + " rows");
215            timestamp = now;
216            rowcount = 0;
217          }
218        }
219      } catch (IOException e) {
220        // do not retry if the exception tells us not to do so
221        if (e instanceof DoNotRetryIOException) {
222          throw e;
223        }
224        // try to handle all other IOExceptions by restarting
225        // the scanner, if the second call fails, it will be rethrown
226        LOG.info("recovered from " + StringUtils.stringifyException(e));
227        if (lastSuccessfulRow == null) {
228          LOG.warn("We are restarting the first next() invocation," +
229              " if your mapper has restarted a few other times like this" +
230              " then you should consider killing this job and investigate" +
231              " why it's taking so long.");
232        }
233        if (lastSuccessfulRow == null) {
234          restart(scan.getStartRow());
235        } else {
236          restart(lastSuccessfulRow);
237          scanner.next();    // skip presumed already mapped row
238        }
239        value = scanner.next();
240        if (value != null && value.isStale()) numStale++;
241        numRestarts++;
242      }
243
244      if (value != null && value.size() > 0) {
245        key.set(value.getRow());
246        lastSuccessfulRow = key.get();
247        return true;
248      }
249
250      // Need handle cursor result
251      if (value != null && value.isCursor()) {
252        key.set(value.getCursor().getRow());
253        lastSuccessfulRow = key.get();
254        return true;
255      }
256
257      updateCounters();
258      return false;
259    } catch (IOException ioe) {
260      if (logScannerActivity) {
261        long now = System.currentTimeMillis();
262        LOG.info("Mapper took " + (now-timestamp)
263          + "ms to process " + rowcount + " rows");
264        LOG.info(ioe.toString(), ioe);
265        String lastRow = lastSuccessfulRow == null ?
266          "null" : Bytes.toStringBinary(lastSuccessfulRow);
267        LOG.info("lastSuccessfulRow=" + lastRow);
268      }
269      throw ioe;
270    }
271  }
272
273  /**
274   * If hbase runs on new version of mapreduce, RecordReader has access to
275   * counters thus can update counters based on scanMetrics.
276   * If hbase runs on old version of mapreduce, it won't be able to get
277   * access to counters and TableRecorderReader can't update counter values.
278   * @throws IOException
279   */
280  private void updateCounters() throws IOException {
281    ScanMetrics scanMetrics = scanner.getScanMetrics();
282    if (scanMetrics == null) {
283      return;
284    }
285
286    updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
287  }
288
289  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
290      Method getCounter, TaskAttemptContext context, long numStale) {
291    // we can get access to counters only if hbase uses new mapreduce APIs
292    if (getCounter == null) {
293      return;
294    }
295
296    try {
297      for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
298        Counter ct = (Counter)getCounter.invoke(context,
299            HBASE_COUNTER_GROUP_NAME, entry.getKey());
300
301        ct.increment(entry.getValue());
302      }
303      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
304          "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
305      ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
306          "NUM_SCAN_RESULTS_STALE")).increment(numStale);
307    } catch (Exception e) {
308      LOG.debug("can't update counter." + StringUtils.stringifyException(e));
309    }
310  }
311
312  /**
313   * The current progress of the record reader through its data.
314   *
315   * @return A number between 0.0 and 1.0, the fraction of the data read.
316   */
317  public float getProgress() {
318    // Depends on the total number of tuples
319    return 0;
320  }
321
322}