View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Method;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.client.ResultScanner;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.client.ScannerCallable;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.mapreduce.Counter;
38  import org.apache.hadoop.mapreduce.InputSplit;
39  import org.apache.hadoop.mapreduce.TaskAttemptContext;
40  import org.apache.hadoop.util.StringUtils;
41  
42  /**
43   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
44   * pairs.
45   */
46  @InterfaceAudience.Public
47  @InterfaceStability.Stable
48  public class TableRecordReaderImpl {
49    public static final String LOG_PER_ROW_COUNT
50      = "hbase.mapreduce.log.scanner.rowcount";
51  
52    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
53  
54    // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
55    private static final String HBASE_COUNTER_GROUP_NAME =
56      "HBase Counters";
57    private ResultScanner scanner = null;
58    private Scan scan = null;
59    private Scan currentScan = null;
60    private Table htable = null;
61    private byte[] lastSuccessfulRow = null;
62    private ImmutableBytesWritable key = null;
63    private Result value = null;
64    private TaskAttemptContext context = null;
65    private Method getCounter = null;
66    private long numRestarts = 0;
67    private long numStale = 0;
68    private long timestamp;
69    private int rowcount;
70    private boolean logScannerActivity = false;
71    private int logPerRowCount = 100;
72  
73    /**
74     * Restart from survivable exceptions by creating a new scanner.
75     *
76     * @param firstRow  The first row to start at.
77     * @throws IOException When restarting fails.
78     */
79    public void restart(byte[] firstRow) throws IOException {
80      currentScan = new Scan(scan);
81      currentScan.setStartRow(firstRow);
82      currentScan.setScanMetricsEnabled(true);
83      if (this.scanner != null) {
84        if (logScannerActivity) {
85          LOG.info("Closing the previously opened scanner object.");
86        }
87        this.scanner.close();
88      }
89      this.scanner = this.htable.getScanner(currentScan);
90      if (logScannerActivity) {
91        LOG.info("Current scan=" + currentScan.toString());
92        timestamp = System.currentTimeMillis();
93        rowcount = 0;
94      }
95    }
96  
97    /**
98     * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
99     * Check if getCounter(String, String) method is available.
100    * @return The getCounter method or null if not available.
101    * @throws IOException
102    */
103   protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
104   throws IOException {
105     Method m = null;
106     try {
107       m = context.getClass().getMethod("getCounter",
108         new Class [] {String.class, String.class});
109     } catch (SecurityException e) {
110       throw new IOException("Failed test for getCounter", e);
111     } catch (NoSuchMethodException e) {
112       // Ignore
113     }
114     return m;
115   }
116 
117   /**
118    * Sets the HBase table.
119    *
120    * @param htable  The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan.
121    */
122   public void setHTable(Table htable) {
123     Configuration conf = htable.getConfiguration();
124     logScannerActivity = conf.getBoolean(
125       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
126     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
127     this.htable = htable;
128   }
129 
130   /**
131    * Sets the scan defining the actual details like columns etc.
132    *
133    * @param scan  The scan to set.
134    */
135   public void setScan(Scan scan) {
136     this.scan = scan;
137   }
138 
139   /**
140    * Build the scanner. Not done in constructor to allow for extension.
141    *
142    * @throws IOException, InterruptedException
143    */
144   public void initialize(InputSplit inputsplit,
145       TaskAttemptContext context) throws IOException,
146       InterruptedException {
147     if (context != null) {
148       this.context = context;
149       getCounter = retrieveGetCounterWithStringsParams(context);
150     }
151     restart(scan.getStartRow());
152   }
153 
154   /**
155    * Closes the split.
156    *
157    *
158    */
159   public void close() {
160     this.scanner.close();
161     try {
162       this.htable.close();
163     } catch (IOException ioe) {
164       LOG.warn("Error closing table", ioe);
165     }
166   }
167 
168   /**
169    * Returns the current key.
170    *
171    * @return The current key.
172    * @throws IOException
173    * @throws InterruptedException When the job is aborted.
174    */
175   public ImmutableBytesWritable getCurrentKey() throws IOException,
176       InterruptedException {
177     return key;
178   }
179 
180   /**
181    * Returns the current value.
182    *
183    * @return The current value.
184    * @throws IOException When the value is faulty.
185    * @throws InterruptedException When the job is aborted.
186    */
187   public Result getCurrentValue() throws IOException, InterruptedException {
188     return value;
189   }
190 
191 
192   /**
193    * Positions the record reader to the next record.
194    *
195    * @return <code>true</code> if there was another record.
196    * @throws IOException When reading the record failed.
197    * @throws InterruptedException When the job was aborted.
198    */
199   public boolean nextKeyValue() throws IOException, InterruptedException {
200     if (key == null) key = new ImmutableBytesWritable();
201     if (value == null) value = new Result();
202     try {
203       try {
204         value = this.scanner.next();
205         if (value != null && value.isStale()) numStale++;
206         if (logScannerActivity) {
207           rowcount ++;
208           if (rowcount >= logPerRowCount) {
209             long now = System.currentTimeMillis();
210             LOG.info("Mapper took " + (now-timestamp)
211               + "ms to process " + rowcount + " rows");
212             timestamp = now;
213             rowcount = 0;
214           }
215         }
216       } catch (IOException e) {
217         // try to handle all IOExceptions by restarting
218         // the scanner, if the second call fails, it will be rethrown
219         LOG.info("recovered from " + StringUtils.stringifyException(e));
220         if (lastSuccessfulRow == null) {
221           LOG.warn("We are restarting the first next() invocation," +
222               " if your mapper has restarted a few other times like this" +
223               " then you should consider killing this job and investigate" +
224               " why it's taking so long.");
225         }
226         if (lastSuccessfulRow == null) {
227           restart(scan.getStartRow());
228         } else {
229           restart(lastSuccessfulRow);
230           scanner.next();    // skip presumed already mapped row
231         }
232         value = scanner.next();
233         if (value != null && value.isStale()) numStale++;
234         numRestarts++;
235       }
236       if (value != null && value.size() > 0) {
237         key.set(value.getRow());
238         lastSuccessfulRow = key.get();
239         return true;
240       }
241 
242       updateCounters();
243       return false;
244     } catch (IOException ioe) {
245       if (logScannerActivity) {
246         long now = System.currentTimeMillis();
247         LOG.info("Mapper took " + (now-timestamp)
248           + "ms to process " + rowcount + " rows");
249         LOG.info(ioe);
250         String lastRow = lastSuccessfulRow == null ?
251           "null" : Bytes.toStringBinary(lastSuccessfulRow);
252         LOG.info("lastSuccessfulRow=" + lastRow);
253       }
254       throw ioe;
255     }
256   }
257 
258   /**
259    * If hbase runs on new version of mapreduce, RecordReader has access to
260    * counters thus can update counters based on scanMetrics.
261    * If hbase runs on old version of mapreduce, it won't be able to get
262    * access to counters and TableRecorderReader can't update counter values.
263    * @throws IOException
264    */
265   private void updateCounters() throws IOException {
266     ScanMetrics scanMetrics = this.scan.getScanMetrics();
267     if (scanMetrics == null) {
268       return;
269     }
270 
271     updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
272   }
273 
274   protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
275       Method getCounter, TaskAttemptContext context, long numStale) {
276     // we can get access to counters only if hbase uses new mapreduce APIs
277     if (getCounter == null) {
278       return;
279     }
280 
281     try {
282       for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
283         Counter ct = (Counter)getCounter.invoke(context,
284             HBASE_COUNTER_GROUP_NAME, entry.getKey());
285 
286         ct.increment(entry.getValue());
287       }
288       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
289           "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
290       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
291           "NUM_SCAN_RESULTS_STALE")).increment(numStale);
292     } catch (Exception e) {
293       LOG.debug("can't update counter." + StringUtils.stringifyException(e));
294     }
295   }
296 
297   /**
298    * The current progress of the record reader through its data.
299    *
300    * @return A number between 0.0 and 1.0, the fraction of the data read.
301    */
302   public float getProgress() {
303     // Depends on the total number of tuples
304     return 0;
305   }
306 
307 }