View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Method;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.client.ResultScanner;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.client.ScannerCallable;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.DoNotRetryIOException;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.mapreduce.Counter;
39  import org.apache.hadoop.mapreduce.InputSplit;
40  import org.apache.hadoop.mapreduce.TaskAttemptContext;
41  import org.apache.hadoop.util.StringUtils;
42  
43  import com.google.common.annotations.VisibleForTesting;
44  
45  /**
46   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
47   * pairs.
48   */
49  @InterfaceAudience.Public
50  @InterfaceStability.Stable
51  public class TableRecordReaderImpl {
52    public static final String LOG_PER_ROW_COUNT
53      = "hbase.mapreduce.log.scanner.rowcount";
54  
55    private static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
56  
57    // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
58    @VisibleForTesting
59    static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
60    private ResultScanner scanner = null;
61    private Scan scan = null;
62    private Scan currentScan = null;
63    private Table htable = null;
64    private byte[] lastSuccessfulRow = null;
65    private ImmutableBytesWritable key = null;
66    private Result value = null;
67    private TaskAttemptContext context = null;
68    private Method getCounter = null;
69    private long numRestarts = 0;
70    private long numStale = 0;
71    private long timestamp;
72    private int rowcount;
73    private boolean logScannerActivity = false;
74    private int logPerRowCount = 100;
75  
76    /**
77     * Restart from survivable exceptions by creating a new scanner.
78     *
79     * @param firstRow  The first row to start at.
80     * @throws IOException When restarting fails.
81     */
82    public void restart(byte[] firstRow) throws IOException {
83      currentScan = new Scan(scan);
84      currentScan.setStartRow(firstRow);
85      currentScan.setScanMetricsEnabled(true);
86      if (this.scanner != null) {
87        if (logScannerActivity) {
88          LOG.info("Closing the previously opened scanner object.");
89        }
90        this.scanner.close();
91      }
92      this.scanner = this.htable.getScanner(currentScan);
93      if (logScannerActivity) {
94        LOG.info("Current scan=" + currentScan.toString());
95        timestamp = System.currentTimeMillis();
96        rowcount = 0;
97      }
98    }
99  
100   /**
101    * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
102    * Check if getCounter(String, String) method is available.
103    * @return The getCounter method or null if not available.
104    * @throws IOException
105    */
106   protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
107   throws IOException {
108     Method m = null;
109     try {
110       m = context.getClass().getMethod("getCounter",
111         new Class [] {String.class, String.class});
112     } catch (SecurityException e) {
113       throw new IOException("Failed test for getCounter", e);
114     } catch (NoSuchMethodException e) {
115       // Ignore
116     }
117     return m;
118   }
119 
120   /**
121    * Sets the HBase table.
122    *
123    * @param htable  The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
124    */
125   public void setHTable(Table htable) {
126     Configuration conf = htable.getConfiguration();
127     logScannerActivity = conf.getBoolean(
128       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
129     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
130     this.htable = htable;
131   }
132 
133   /**
134    * Sets the scan defining the actual details like columns etc.
135    *
136    * @param scan  The scan to set.
137    */
138   public void setScan(Scan scan) {
139     this.scan = scan;
140   }
141 
142   /**
143    * Build the scanner. Not done in constructor to allow for extension.
144    *
145    * @throws IOException
146    * @throws InterruptedException
147    */
148   public void initialize(InputSplit inputsplit,
149       TaskAttemptContext context) throws IOException,
150       InterruptedException {
151     if (context != null) {
152       this.context = context;
153       getCounter = retrieveGetCounterWithStringsParams(context);
154     }
155     restart(scan.getStartRow());
156   }
157 
158   /**
159    * Closes the split.
160    *
161    *
162    */
163   public void close() {
164     if (this.scanner != null) {
165       this.scanner.close();
166     }
167     try {
168       this.htable.close();
169     } catch (IOException ioe) {
170       LOG.warn("Error closing table", ioe);
171     }
172   }
173 
174   /**
175    * Returns the current key.
176    *
177    * @return The current key.
178    * @throws IOException
179    * @throws InterruptedException When the job is aborted.
180    */
181   public ImmutableBytesWritable getCurrentKey() throws IOException,
182       InterruptedException {
183     return key;
184   }
185 
186   /**
187    * Returns the current value.
188    *
189    * @return The current value.
190    * @throws IOException When the value is faulty.
191    * @throws InterruptedException When the job is aborted.
192    */
193   public Result getCurrentValue() throws IOException, InterruptedException {
194     return value;
195   }
196 
197 
198   /**
199    * Positions the record reader to the next record.
200    *
201    * @return <code>true</code> if there was another record.
202    * @throws IOException When reading the record failed.
203    * @throws InterruptedException When the job was aborted.
204    */
205   public boolean nextKeyValue() throws IOException, InterruptedException {
206     if (key == null) key = new ImmutableBytesWritable();
207     if (value == null) value = new Result();
208     try {
209       try {
210         value = this.scanner.next();
211         if (value != null && value.isStale()) numStale++;
212         if (logScannerActivity) {
213           rowcount ++;
214           if (rowcount >= logPerRowCount) {
215             long now = System.currentTimeMillis();
216             LOG.info("Mapper took " + (now-timestamp)
217               + "ms to process " + rowcount + " rows");
218             timestamp = now;
219             rowcount = 0;
220           }
221         }
222       } catch (IOException e) {
223         // do not retry if the exception tells us not to do so
224         if (e instanceof DoNotRetryIOException) {
225           throw e;
226         }
227         // try to handle all other IOExceptions by restarting
228         // the scanner, if the second call fails, it will be rethrown
229         LOG.info("recovered from " + StringUtils.stringifyException(e));
230         if (lastSuccessfulRow == null) {
231           LOG.warn("We are restarting the first next() invocation," +
232               " if your mapper has restarted a few other times like this" +
233               " then you should consider killing this job and investigate" +
234               " why it's taking so long.");
235         }
236         if (lastSuccessfulRow == null) {
237           restart(scan.getStartRow());
238         } else {
239           restart(lastSuccessfulRow);
240           scanner.next();    // skip presumed already mapped row
241         }
242         value = scanner.next();
243         if (value != null && value.isStale()) numStale++;
244         numRestarts++;
245       }
246       if (value != null && value.size() > 0) {
247         key.set(value.getRow());
248         lastSuccessfulRow = key.get();
249         return true;
250       }
251 
252       updateCounters();
253       return false;
254     } catch (IOException ioe) {
255       if (logScannerActivity) {
256         long now = System.currentTimeMillis();
257         LOG.info("Mapper took " + (now-timestamp)
258           + "ms to process " + rowcount + " rows");
259         LOG.info(ioe);
260         String lastRow = lastSuccessfulRow == null ?
261           "null" : Bytes.toStringBinary(lastSuccessfulRow);
262         LOG.info("lastSuccessfulRow=" + lastRow);
263       }
264       throw ioe;
265     }
266   }
267 
268   /**
269    * If hbase runs on new version of mapreduce, RecordReader has access to
270    * counters thus can update counters based on scanMetrics.
271    * If hbase runs on old version of mapreduce, it won't be able to get
272    * access to counters and TableRecorderReader can't update counter values.
273    * @throws IOException
274    */
275   private void updateCounters() throws IOException {
276     ScanMetrics scanMetrics = currentScan.getScanMetrics();
277     if (scanMetrics == null) {
278       return;
279     }
280 
281     updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
282   }
283 
284   protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
285       Method getCounter, TaskAttemptContext context, long numStale) {
286     // we can get access to counters only if hbase uses new mapreduce APIs
287     if (getCounter == null) {
288       return;
289     }
290 
291     try {
292       for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
293         Counter ct = (Counter)getCounter.invoke(context,
294             HBASE_COUNTER_GROUP_NAME, entry.getKey());
295 
296         ct.increment(entry.getValue());
297       }
298       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
299           "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
300       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
301           "NUM_SCAN_RESULTS_STALE")).increment(numStale);
302     } catch (Exception e) {
303       LOG.debug("can't update counter." + StringUtils.stringifyException(e));
304     }
305   }
306 
307   /**
308    * The current progress of the record reader through its data.
309    *
310    * @return A number between 0.0 and 1.0, the fraction of the data read.
311    */
312   public float getProgress() {
313     // Depends on the total number of tuples
314     return 0;
315   }
316 
317 }