001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Method;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.client.ConnectionConfiguration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.ResultScanner;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.mapreduce.Counter;
035import org.apache.hadoop.mapreduce.InputSplit;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.util.StringUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) pairs.
044 */
045@InterfaceAudience.Public
046public class TableRecordReaderImpl {
047  public static final String LOG_PER_ROW_COUNT = "hbase.mapreduce.log.scanner.rowcount";
048
049  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
050
051  // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
052  @InterfaceAudience.Private
053  static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
054
055  private ResultScanner scanner = null;
056  private Scan scan = null;
057  private Scan currentScan = null;
058  private Table htable = null;
059  private byte[] lastSuccessfulRow = null;
060  private ImmutableBytesWritable key = null;
061  private Result value = null;
062  private TaskAttemptContext context = null;
063  private long numRestarts = 0;
064  private long numStale = 0;
065  private long timestamp;
066  private int rowcount;
067  private boolean logScannerActivity = false;
068  private int logPerRowCount = 100;
069
070  /**
071   * Restart from survivable exceptions by creating a new scanner.
072   * @param firstRow The first row to start at.
073   * @throws IOException When restarting fails.
074   */
075  public void restart(byte[] firstRow) throws IOException {
076    // Update counter metrics based on current scan before reinitializing it
077    if (currentScan != null) {
078      updateCounters();
079    }
080    currentScan = new Scan(scan);
081    currentScan.withStartRow(firstRow);
082    currentScan.setScanMetricsEnabled(true);
083    if (this.scanner != null) {
084      if (logScannerActivity) {
085        LOG.info("Closing the previously opened scanner object.");
086      }
087      this.scanner.close();
088    }
089    this.scanner = this.htable.getScanner(currentScan);
090    if (logScannerActivity) {
091      LOG.info("Current scan=" + currentScan.toString());
092      timestamp = EnvironmentEdgeManager.currentTime();
093      rowcount = 0;
094    }
095  }
096
097  /**
098   * In new mapreduce APIs, TaskAttemptContext has two getCounter methods Check if
099   * getCounter(String, String) method is available.
100   * @return The getCounter method or null if not available.
101   * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0
102   */
103  @Deprecated
104  protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
105    throws IOException {
106    Method m = null;
107    try {
108      m = context.getClass().getMethod("getCounter", new Class[] { String.class, String.class });
109    } catch (SecurityException e) {
110      throw new IOException("Failed test for getCounter", e);
111    } catch (NoSuchMethodException e) {
112      // Ignore
113    }
114    return m;
115  }
116
117  /**
118   * Sets the HBase table.
119   * @param htable The table to scan.
120   */
121  public void setHTable(Table htable) {
122    Configuration conf = htable.getConfiguration();
123    logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false);
124    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
125    this.htable = htable;
126  }
127
128  /**
129   * Sets the scan defining the actual details like columns etc.
130   * @param scan The scan to set.
131   */
132  public void setScan(Scan scan) {
133    this.scan = scan;
134  }
135
136  /**
137   * Build the scanner. Not done in constructor to allow for extension.
138   */
139  public void initialize(InputSplit inputsplit, TaskAttemptContext context)
140    throws IOException, InterruptedException {
141    if (context != null) {
142      this.context = context;
143    }
144    restart(scan.getStartRow());
145  }
146
147  /**
148   * Closes the split.
149   */
150  public void close() {
151    if (this.scanner != null) {
152      this.scanner.close();
153    }
154    try {
155      this.htable.close();
156    } catch (IOException ioe) {
157      LOG.warn("Error closing table", ioe);
158    }
159  }
160
161  /**
162   * Returns the current key.
163   * @return The current key.
164   * @throws InterruptedException When the job is aborted.
165   */
166  public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
167    return key;
168  }
169
170  /**
171   * Returns the current value.
172   * @return The current value.
173   * @throws IOException          When the value is faulty.
174   * @throws InterruptedException When the job is aborted.
175   */
176  public Result getCurrentValue() throws IOException, InterruptedException {
177    return value;
178  }
179
180  /**
181   * Positions the record reader to the next record.
182   * @return <code>true</code> if there was another record.
183   * @throws IOException          When reading the record failed.
184   * @throws InterruptedException When the job was aborted.
185   */
186  public boolean nextKeyValue() throws IOException, InterruptedException {
187    if (key == null) {
188      key = new ImmutableBytesWritable();
189    }
190    if (value == null) {
191      value = new Result();
192    }
193    try {
194      try {
195        value = this.scanner.next();
196        if (value != null && value.isStale()) {
197          numStale++;
198        }
199        if (logScannerActivity) {
200          rowcount++;
201          if (rowcount >= logPerRowCount) {
202            long now = EnvironmentEdgeManager.currentTime();
203            LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
204            timestamp = now;
205            rowcount = 0;
206          }
207        }
208      } catch (IOException e) {
209        // do not retry if the exception tells us not to do so
210        if (e instanceof DoNotRetryIOException) {
211          updateCounters();
212          throw e;
213        }
214        // try to handle all other IOExceptions by restarting
215        // the scanner, if the second call fails, it will be rethrown
216        LOG.info("recovered from " + StringUtils.stringifyException(e));
217        if (lastSuccessfulRow == null) {
218          LOG.warn("We are restarting the first next() invocation,"
219            + " if your mapper has restarted a few other times like this"
220            + " then you should consider killing this job and investigate"
221            + " why it's taking so long.");
222        }
223        if (lastSuccessfulRow == null) {
224          restart(scan.getStartRow());
225        } else {
226          restart(lastSuccessfulRow);
227          scanner.next(); // skip presumed already mapped row
228        }
229        value = scanner.next();
230        if (value != null && value.isStale()) {
231          numStale++;
232        }
233        numRestarts++;
234      }
235
236      if (value != null && value.size() > 0) {
237        key.set(value.getRow());
238        lastSuccessfulRow = key.get();
239        return true;
240      }
241
242      // Need handle cursor result
243      if (value != null && value.isCursor()) {
244        key.set(value.getCursor().getRow());
245        lastSuccessfulRow = key.get();
246        return true;
247      }
248
249      updateCounters();
250      return false;
251    } catch (IOException ioe) {
252      updateCounters();
253      if (logScannerActivity) {
254        long now = EnvironmentEdgeManager.currentTime();
255        LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
256        LOG.info(ioe.toString(), ioe);
257        String lastRow =
258          lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow);
259        LOG.info("lastSuccessfulRow=" + lastRow);
260      }
261      throw ioe;
262    }
263  }
264
265  /**
266   * If hbase runs on new version of mapreduce, RecordReader has access to counters thus can update
267   * counters based on scanMetrics. If hbase runs on old version of mapreduce, it won't be able to
268   * get access to counters and TableRecorderReader can't update counter values.
269   */
270  private void updateCounters() {
271    ScanMetrics scanMetrics = scanner.getScanMetrics();
272    if (scanMetrics == null) {
273      return;
274    }
275
276    updateCounters(scanMetrics, numRestarts, context, numStale);
277  }
278
279  /**
280   * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0 Use
281   *             {@link #updateCounters(ScanMetrics, long, TaskAttemptContext, long)} instead.
282   */
283  @Deprecated
284  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
285    Method getCounter, TaskAttemptContext context, long numStale) {
286    updateCounters(scanMetrics, numScannerRestarts, context, numStale);
287  }
288
289  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
290    TaskAttemptContext context, long numStale) {
291    // we can get access to counters only if hbase uses new mapreduce APIs
292    if (context == null) {
293      return;
294    }
295
296    for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) {
297      Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey());
298      if (counter != null) {
299        counter.increment(entry.getValue());
300      }
301    }
302    if (numScannerRestarts != 0L) {
303      Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS");
304      if (counter != null) {
305        counter.increment(numScannerRestarts);
306      }
307    }
308    if (numStale != 0L) {
309      Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE");
310      if (counter != null) {
311        counter.increment(numStale);
312      }
313    }
314  }
315
316  /**
317   * The current progress of the record reader through its data.
318   * @return A number between 0.0 and 1.0, the fraction of the data read.
319   */
320  public float getProgress() {
321    // Depends on the total number of tuples
322    return 0;
323  }
324
325}