001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.lang.reflect.Method;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.DoNotRetryIOException;
025import org.apache.hadoop.hbase.client.ConnectionConfiguration;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.ResultScanner;
028import org.apache.hadoop.hbase.client.Scan;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.mapreduce.Counter;
035import org.apache.hadoop.mapreduce.InputSplit;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.util.StringUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
044 * pairs.
045 */
046@InterfaceAudience.Public
047public class TableRecordReaderImpl {
048  public static final String LOG_PER_ROW_COUNT
049      = "hbase.mapreduce.log.scanner.rowcount";
050
051  private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class);
052
053  // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
054  @InterfaceAudience.Private
055  static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
056
057  private ResultScanner scanner = null;
058  private Scan scan = null;
059  private Scan currentScan = null;
060  private Table htable = null;
061  private byte[] lastSuccessfulRow = null;
062  private ImmutableBytesWritable key = null;
063  private Result value = null;
064  private TaskAttemptContext context = null;
065  private long numRestarts = 0;
066  private long numStale = 0;
067  private long timestamp;
068  private int rowcount;
069  private boolean logScannerActivity = false;
070  private int logPerRowCount = 100;
071
072  /**
073   * Restart from survivable exceptions by creating a new scanner.
074   *
075   * @param firstRow  The first row to start at.
076   * @throws IOException When restarting fails.
077   */
078  public void restart(byte[] firstRow) throws IOException {
079    // Update counter metrics based on current scan before reinitializing it
080    if (currentScan != null) {
081      updateCounters();
082    }
083    currentScan = new Scan(scan);
084    currentScan.withStartRow(firstRow);
085    currentScan.setScanMetricsEnabled(true);
086    if (this.scanner != null) {
087      if (logScannerActivity) {
088        LOG.info("Closing the previously opened scanner object.");
089      }
090      this.scanner.close();
091    }
092    this.scanner = this.htable.getScanner(currentScan);
093    if (logScannerActivity) {
094      LOG.info("Current scan=" + currentScan.toString());
095      timestamp = EnvironmentEdgeManager.currentTime();
096      rowcount = 0;
097    }
098  }
099
100  /**
101   * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
102   * Check if getCounter(String, String) method is available.
103   * @return The getCounter method or null if not available.
104   * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0
105   */
106  @Deprecated
107  protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
108    throws IOException {
109    Method m = null;
110    try {
111      m = context.getClass().getMethod("getCounter",
112        new Class [] {String.class, String.class});
113    } catch (SecurityException e) {
114      throw new IOException("Failed test for getCounter", e);
115    } catch (NoSuchMethodException e) {
116      // Ignore
117    }
118    return m;
119  }
120
121  /**
122   * Sets the HBase table.
123   * @param htable The table to scan.
124   */
125  public void setHTable(Table htable) {
126    Configuration conf = htable.getConfiguration();
127    logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false);
128    logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
129    this.htable = htable;
130  }
131
132  /**
133   * Sets the scan defining the actual details like columns etc.
134   *
135   * @param scan  The scan to set.
136   */
137  public void setScan(Scan scan) {
138    this.scan = scan;
139  }
140
141  /**
142   * Build the scanner. Not done in constructor to allow for extension.
143   */
144  public void initialize(InputSplit inputsplit,
145      TaskAttemptContext context) throws IOException,
146      InterruptedException {
147    if (context != null) {
148      this.context = context;
149    }
150    restart(scan.getStartRow());
151  }
152
153  /**
154   * Closes the split.
155   *
156   *
157   */
158  public void close() {
159    if (this.scanner != null) {
160      this.scanner.close();
161    }
162    try {
163      this.htable.close();
164    } catch (IOException ioe) {
165      LOG.warn("Error closing table", ioe);
166    }
167  }
168
169  /**
170   * Returns the current key.
171   *
172   * @return The current key.
173   * @throws InterruptedException When the job is aborted.
174   */
175  public ImmutableBytesWritable getCurrentKey() throws IOException,
176      InterruptedException {
177    return key;
178  }
179
180  /**
181   * Returns the current value.
182   *
183   * @return The current value.
184   * @throws IOException When the value is faulty.
185   * @throws InterruptedException When the job is aborted.
186   */
187  public Result getCurrentValue() throws IOException, InterruptedException {
188    return value;
189  }
190
191
192  /**
193   * Positions the record reader to the next record.
194   *
195   * @return <code>true</code> if there was another record.
196   * @throws IOException When reading the record failed.
197   * @throws InterruptedException When the job was aborted.
198   */
199  public boolean nextKeyValue() throws IOException, InterruptedException {
200    if (key == null) {
201      key = new ImmutableBytesWritable();
202    }
203    if (value == null) {
204      value = new Result();
205    }
206    try {
207      try {
208        value = this.scanner.next();
209        if (value != null && value.isStale()) {
210          numStale++;
211        }
212        if (logScannerActivity) {
213          rowcount ++;
214          if (rowcount >= logPerRowCount) {
215            long now = EnvironmentEdgeManager.currentTime();
216            LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
217            timestamp = now;
218            rowcount = 0;
219          }
220        }
221      } catch (IOException e) {
222        // do not retry if the exception tells us not to do so
223        if (e instanceof DoNotRetryIOException) {
224          updateCounters();
225          throw e;
226        }
227        // try to handle all other IOExceptions by restarting
228        // the scanner, if the second call fails, it will be rethrown
229        LOG.info("recovered from " + StringUtils.stringifyException(e));
230        if (lastSuccessfulRow == null) {
231          LOG.warn("We are restarting the first next() invocation," +
232              " if your mapper has restarted a few other times like this" +
233              " then you should consider killing this job and investigate" +
234              " why it's taking so long.");
235        }
236        if (lastSuccessfulRow == null) {
237          restart(scan.getStartRow());
238        } else {
239          restart(lastSuccessfulRow);
240          scanner.next();    // skip presumed already mapped row
241        }
242        value = scanner.next();
243        if (value != null && value.isStale()) {
244          numStale++;
245        }
246        numRestarts++;
247      }
248
249      if (value != null && value.size() > 0) {
250        key.set(value.getRow());
251        lastSuccessfulRow = key.get();
252        return true;
253      }
254
255      // Need handle cursor result
256      if (value != null && value.isCursor()) {
257        key.set(value.getCursor().getRow());
258        lastSuccessfulRow = key.get();
259        return true;
260      }
261
262      updateCounters();
263      return false;
264    } catch (IOException ioe) {
265      updateCounters();
266      if (logScannerActivity) {
267        long now = EnvironmentEdgeManager.currentTime();
268        LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount);
269        LOG.info(ioe.toString(), ioe);
270        String lastRow = lastSuccessfulRow == null ?
271          "null" : Bytes.toStringBinary(lastSuccessfulRow);
272        LOG.info("lastSuccessfulRow=" + lastRow);
273      }
274      throw ioe;
275    }
276  }
277
278  /**
279   * If hbase runs on new version of mapreduce, RecordReader has access to
280   * counters thus can update counters based on scanMetrics.
281   * If hbase runs on old version of mapreduce, it won't be able to get
282   * access to counters and TableRecorderReader can't update counter values.
283   */
284  private void updateCounters() {
285    ScanMetrics scanMetrics = scanner.getScanMetrics();
286    if (scanMetrics == null) {
287      return;
288    }
289
290    updateCounters(scanMetrics, numRestarts, context, numStale);
291  }
292
293  /**
294   * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0
295   *   Use {@link #updateCounters(ScanMetrics, long, TaskAttemptContext, long)} instead.
296   */
297  @Deprecated
298  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
299      Method getCounter, TaskAttemptContext context, long numStale) {
300    updateCounters(scanMetrics, numScannerRestarts, context, numStale);
301  }
302
303  protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
304      TaskAttemptContext context, long numStale) {
305    // we can get access to counters only if hbase uses new mapreduce APIs
306    if (context == null) {
307      return;
308    }
309
310      for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) {
311        Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey());
312        if (counter != null) {
313          counter.increment(entry.getValue());
314        }
315      }
316      if (numScannerRestarts != 0L) {
317        Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS");
318        if (counter != null) {
319          counter.increment(numScannerRestarts);
320        }
321      }
322      if (numStale != 0L) {
323        Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE");
324        if (counter != null) {
325          counter.increment(numStale);
326        }
327      }
328  }
329
330  /**
331   * The current progress of the record reader through its data.
332   *
333   * @return A number between 0.0 and 1.0, the fraction of the data read.
334   */
335  public float getProgress() {
336    // Depends on the total number of tuples
337    return 0;
338  }
339
340}