001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Method;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.client.ResultScanner;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.client.Table;
029import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
030import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.mapreduce.Counter;
034import org.apache.hadoop.mapreduce.InputSplit;
035import org.apache.hadoop.mapreduce.TaskAttemptContext;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) pairs.
043 */
044@InterfaceAudience.Public
045public class TableRecordReaderImpl {
046  public static final String LOG_PER_ROW_COUNT = "hbase.mapreduce.log.scanner.rowcount";
047
048  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
049
050  // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
051  @InterfaceAudience.Private
052  static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
053
054  private ResultScanner scanner = null;
055  private Scan scan = null;
056  private Scan currentScan = null;
057  private Table htable = null;
058  private byte[] lastSuccessfulRow = null;
059  private ImmutableBytesWritable key = null;
060  private Result value = null;
061  private TaskAttemptContext context = null;
062  private long numRestarts = 0;
063  private long numStale = 0;
064  private long timestamp;
065  private int rowcount;
066  private boolean logScannerActivity = false;
067  private int logPerRowCount = 100;
068
069  /**
070   * Restart from survivable exceptions by creating a new scanner.
071   * @param firstRow The first row to start at.
072   * @throws IOException When restarting fails.
073   */
074  public void restart(byte[] firstRow) throws IOException {
075    // Update counter metrics based on current scan before reinitializing it
076    if (currentScan != null) {
077      updateCounters();
078    }
079    currentScan = new Scan(scan);
080    currentScan.withStartRow(firstRow);
081    currentScan.setScanMetricsEnabled(true);
082    if (this.scanner != null) {
083      if (logScannerActivity) {
084        LOG.info("Closing the previously opened scanner object.");
085      }
086      this.scanner.close();
087    }
088    this.scanner = this.htable.getScanner(currentScan);
089    if (logScannerActivity) {
090      LOG.info("Current scan=" + currentScan.toString());
091      timestamp = EnvironmentEdgeManager.currentTime();
092      rowcount = 0;
093    }
094  }
095
096  /**
097   * In new mapreduce APIs, TaskAttemptContext has two getCounter methods Check if
098   * getCounter(String, String) method is available.
099   * @return The getCounter method or null if not available.
100   * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0
101   */
102  @Deprecated
103  protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
104    throws IOException {
105    Method m = null;
106    try {
107      m = context.getClass().getMethod("getCounter", new Class[] { String.class, String.class });
108    } catch (SecurityException e) {
109      throw new IOException("Failed test for getCounter", e);
110    } catch (NoSuchMethodException e) {
111      // Ignore
112    }
113    return m;
114  }
115
116  /**
117   * Sets the HBase table.
118   * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
119   */
120  public void setHTable(Table htable) {
121    Configuration conf = htable.getConfiguration();
122    logScannerActivity = conf.getBoolean(
123      "hbase.client.log.scanner.activity" /* ScannerCallable.LOG_SCANNER_ACTIVITY */, false);
124    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
125    this.htable = htable;
126  }
127
128  /**
129   * Sets the scan defining the actual details like columns etc.
130   * @param scan The scan to set.
131   */
132  public void setScan(Scan scan) {
133    this.scan = scan;
134  }
135
136  /**
137   * Build the scanner. Not done in constructor to allow for extension.
138   */
139  public void initialize(InputSplit inputsplit, TaskAttemptContext context)
140    throws IOException, InterruptedException {
141    if (context != null) {
142      this.context = context;
143    }
144    restart(scan.getStartRow());
145  }
146
147  /**
148   * Closes the split.
149   */
150  public void close() {
151    if (this.scanner != null) {
152      this.scanner.close();
153    }
154    try {
155      this.htable.close();
156    } catch (IOException ioe) {
157      LOG.warn("Error closing table", ioe);
158    }
159  }
160
161  /**
162   * Returns the current key.
163   * @return The current key.
164   * @throws InterruptedException When the job is aborted.
165   */
166  public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
167    return key;
168  }
169
170  /**
171   * Returns the current value.
172   * @return The current value.
173   * @throws IOException          When the value is faulty.
174   * @throws InterruptedException When the job is aborted.
175   */
176  public Result getCurrentValue() throws IOException, InterruptedException {
177    return value;
178  }
179
180  /**
181   * Positions the record reader to the next record.
182   * @return <code>true</code> if there was another record.
183   * @throws IOException          When reading the record failed.
184   * @throws InterruptedException When the job was aborted.
185   */
186  public boolean nextKeyValue() throws IOException, InterruptedException {
187    if (key == null) {
188      key = new ImmutableBytesWritable();
189    }
190    if (value == null) {
191      value = new Result();
192    }
193    try {
194      try {
195        value = this.scanner.next();
196        if (value != null && value.isStale()) {
197          numStale++;
198        }
199        if (logScannerActivity) {
200          rowcount++;
201          if (rowcount >= logPerRowCount) {
202            long now = EnvironmentEdgeManager.currentTime();
203            LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
204            timestamp = now;
205            rowcount = 0;
206          }
207        }
208      } catch (IOException e) {
209        // do not retry if the exception tells us not to do so
210        if (e instanceof DoNotRetryIOException) {
211          updateCounters();
212          throw e;
213        }
214        // try to handle all other IOExceptions by restarting
215        // the scanner, if the second call fails, it will be rethrown
216        LOG.info("recovered from " + StringUtils.stringifyException(e));
217        if (lastSuccessfulRow == null) {
218          LOG.warn("We are restarting the first next() invocation,"
219            + " if your mapper has restarted a few other times like this"
220            + " then you should consider killing this job and investigate"
221            + " why it's taking so long.");
222        }
223        if (lastSuccessfulRow == null) {
224          restart(scan.getStartRow());
225        } else {
226          restart(lastSuccessfulRow);
227          scanner.next(); // skip presumed already mapped row
228        }
229        value = scanner.next();
230        if (value != null && value.isStale()) {
231          numStale++;
232        }
233        numRestarts++;
234      }
235
236      if (value != null && value.size() > 0) {
237        key.set(value.getRow());
238        lastSuccessfulRow = key.get();
239        return true;
240      }
241
242      // Need handle cursor result
243      if (value != null && value.isCursor()) {
244        key.set(value.getCursor().getRow());
245        lastSuccessfulRow = key.get();
246        return true;
247      }
248
249      updateCounters();
250      return false;
251    } catch (IOException ioe) {
252      updateCounters();
253      if (logScannerActivity) {
254        long now = EnvironmentEdgeManager.currentTime();
255        LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
256        LOG.info(ioe.toString(), ioe);
257        String lastRow =
258          lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow);
259        LOG.info("lastSuccessfulRow=" + lastRow);
260      }
261      throw ioe;
262    }
263  }
264
265  /**
266   * If hbase runs on new version of mapreduce, RecordReader has access to counters thus can update
267   * counters based on scanMetrics. If hbase runs on old version of mapreduce, it won't be able to
268   * get access to counters and TableRecorderReader can't update counter values.
269   */
270  private void updateCounters() {
271    ScanMetrics scanMetrics = scanner.getScanMetrics();
272    if (scanMetrics == null) {
273      return;
274    }
275
276    updateCounters(scanMetrics, numRestarts, context, numStale);
277  }
278
279  /**
280   * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0 Use
281   *             {@link #updateCounters(ScanMetrics, long, TaskAttemptContext, long)} instead.
282   */
283  @Deprecated
284  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
285    Method getCounter, TaskAttemptContext context, long numStale) {
286    updateCounters(scanMetrics, numScannerRestarts, context, numStale);
287  }
288
289  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
290    TaskAttemptContext context, long numStale) {
291    // we can get access to counters only if hbase uses new mapreduce APIs
292    if (context == null) {
293      return;
294    }
295
296    for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) {
297      Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey());
298      if (counter != null) {
299        counter.increment(entry.getValue());
300      }
301    }
302    if (numScannerRestarts != 0L) {
303      Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS");
304      if (counter != null) {
305        counter.increment(numScannerRestarts);
306      }
307    }
308    if (numStale != 0L) {
309      Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE");
310      if (counter != null) {
311        counter.increment(numStale);
312      }
313    }
314  }
315
316  /**
317   * The current progress of the record reader through its data.
318   * @return A number between 0.0 and 1.0, the fraction of the data read.
319   */
320  public float getProgress() {
321    // Depends on the total number of tuples
322    return 0;
323  }
324
325}