View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Method;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.classification.InterfaceAudience;
27  import org.apache.hadoop.classification.InterfaceStability;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.client.ScannerCallable;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.mapreduce.Counter;
39  import org.apache.hadoop.mapreduce.InputSplit;
40  import org.apache.hadoop.mapreduce.TaskAttemptContext;
41  import org.apache.hadoop.util.StringUtils;
42  
43  /**
44   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
45   * pairs.
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class TableRecordReaderImpl {
50    public static final String LOG_PER_ROW_COUNT
51      = "hbase.mapreduce.log.scanner.rowcount";
52  
53    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
54  
55    // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
56    private static final String HBASE_COUNTER_GROUP_NAME =
57      "HBase Counters";
58    private ResultScanner scanner = null;
59    private Scan scan = null;
60    private Scan currentScan = null;
61    private HTable htable = null;
62    private byte[] lastSuccessfulRow = null;
63    private ImmutableBytesWritable key = null;
64    private Result value = null;
65    private TaskAttemptContext context = null;
66    private Method getCounter = null;
67    private long numRestarts = 0;
68    private long timestamp;
69    private int rowcount;
70    private boolean logScannerActivity = false;
71    private int logPerRowCount = 100;
72  
73    /**
74     * Restart from survivable exceptions by creating a new scanner.
75     *
76     * @param firstRow  The first row to start at.
77     * @throws IOException When restarting fails.
78     */
79    public void restart(byte[] firstRow) throws IOException {
80      currentScan = new Scan(scan);
81      currentScan.setStartRow(firstRow);
82      currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
83        Bytes.toBytes(Boolean.TRUE));
84      if (this.scanner != null) {
85        if (logScannerActivity) {
86          LOG.info("Closing the previously opened scanner object.");
87        }
88        this.scanner.close();
89      }
90      this.scanner = this.htable.getScanner(currentScan);
91      if (logScannerActivity) {
92        LOG.info("Current scan=" + currentScan.toString());
93        timestamp = System.currentTimeMillis();
94        rowcount = 0;
95      }
96    }
97  
98    /**
99     * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
100    * Check if getCounter(String, String) method is available.
101    * @return The getCounter method or null if not available.
102    * @throws IOException
103    */
104   protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
105   throws IOException {
106     Method m = null;
107     try {
108       m = context.getClass().getMethod("getCounter",
109         new Class [] {String.class, String.class});
110     } catch (SecurityException e) {
111       throw new IOException("Failed test for getCounter", e);
112     } catch (NoSuchMethodException e) {
113       // Ignore
114     }
115     return m;
116   }
117 
118   /**
119    * Sets the HBase table.
120    *
121    * @param htable  The {@link HTable} to scan.
122    */
123   public void setHTable(HTable htable) {
124     Configuration conf = htable.getConfiguration();
125     logScannerActivity = conf.getBoolean(
126       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
127     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
128     this.htable = htable;
129   }
130 
131   /**
132    * Sets the scan defining the actual details like columns etc.
133    *
134    * @param scan  The scan to set.
135    */
136   public void setScan(Scan scan) {
137     this.scan = scan;
138   }
139 
140   /**
141    * Build the scanner. Not done in constructor to allow for extension.
142    *
143    * @throws IOException, InterruptedException
144    */
145   public void initialize(InputSplit inputsplit,
146       TaskAttemptContext context) throws IOException,
147       InterruptedException {
148     if (context != null) {
149       this.context = context;
150       getCounter = retrieveGetCounterWithStringsParams(context);
151     }
152     restart(scan.getStartRow());
153   }
154 
155   /**
156    * Closes the split.
157    *
158    *
159    */
160   public void close() {
161     this.scanner.close();
162   }
163 
164   /**
165    * Returns the current key.
166    *
167    * @return The current key.
168    * @throws IOException
169    * @throws InterruptedException When the job is aborted.
170    */
171   public ImmutableBytesWritable getCurrentKey() throws IOException,
172       InterruptedException {
173     return key;
174   }
175 
176   /**
177    * Returns the current value.
178    *
179    * @return The current value.
180    * @throws IOException When the value is faulty.
181    * @throws InterruptedException When the job is aborted.
182    */
183   public Result getCurrentValue() throws IOException, InterruptedException {
184     return value;
185   }
186 
187 
188   /**
189    * Positions the record reader to the next record.
190    *
191    * @return <code>true</code> if there was another record.
192    * @throws IOException When reading the record failed.
193    * @throws InterruptedException When the job was aborted.
194    */
195   public boolean nextKeyValue() throws IOException, InterruptedException {
196     if (key == null) key = new ImmutableBytesWritable();
197     if (value == null) value = new Result();
198     try {
199       try {
200         value = this.scanner.next();
201         if (logScannerActivity) {
202           rowcount ++;
203           if (rowcount >= logPerRowCount) {
204             long now = System.currentTimeMillis();
205             LOG.info("Mapper took " + (now-timestamp)
206               + "ms to process " + rowcount + " rows");
207             timestamp = now;
208             rowcount = 0;
209           }
210         }
211       } catch (IOException e) {
212         // try to handle all IOExceptions by restarting
213         // the scanner, if the second call fails, it will be rethrown
214         LOG.info("recovered from " + StringUtils.stringifyException(e));
215         if (lastSuccessfulRow == null) {
216           LOG.warn("We are restarting the first next() invocation," +
217               " if your mapper has restarted a few other times like this" +
218               " then you should consider killing this job and investigate" +
219               " why it's taking so long.");
220         }
221         if (lastSuccessfulRow == null) {
222           restart(scan.getStartRow());
223         } else {
224           restart(lastSuccessfulRow);
225           scanner.next();    // skip presumed already mapped row
226         }
227         value = scanner.next();
228         numRestarts++;
229       }
230       if (value != null && value.size() > 0) {
231         key.set(value.getRow());
232         lastSuccessfulRow = key.get();
233         return true;
234       }
235 
236       updateCounters();
237       return false;
238     } catch (IOException ioe) {
239       if (logScannerActivity) {
240         long now = System.currentTimeMillis();
241         LOG.info("Mapper took " + (now-timestamp)
242           + "ms to process " + rowcount + " rows");
243         LOG.info(ioe);
244         String lastRow = lastSuccessfulRow == null ?
245           "null" : Bytes.toStringBinary(lastSuccessfulRow);
246         LOG.info("lastSuccessfulRow=" + lastRow);
247       }
248       throw ioe;
249     }
250   }
251 
252   /**
253    * If hbase runs on new version of mapreduce, RecordReader has access to
254    * counters thus can update counters based on scanMetrics.
255    * If hbase runs on old version of mapreduce, it won't be able to get
256    * access to counters and TableRecorderReader can't update counter values.
257    * @throws IOException
258    */
259   private void updateCounters() throws IOException {
260     byte[] serializedMetrics = currentScan.getAttribute(
261         Scan.SCAN_ATTRIBUTES_METRICS_DATA);
262     if (serializedMetrics == null || serializedMetrics.length == 0 ) {
263       return;
264     }
265 
266     ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);
267 
268     updateCounters(scanMetrics, numRestarts, getCounter, context);
269   }
270 
271   protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
272       Method getCounter, TaskAttemptContext context) {
273     // we can get access to counters only if hbase uses new mapreduce APIs
274     if (getCounter == null) {
275       return;
276     }
277 
278     try {
279       for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
280         Counter ct = (Counter)getCounter.invoke(context,
281             HBASE_COUNTER_GROUP_NAME, entry.getKey());
282 
283         ct.increment(entry.getValue());
284       }
285       ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
286           "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
287     } catch (Exception e) {
288       LOG.debug("can't update counter." + StringUtils.stringifyException(e));
289     }
290   }
291 
292   /**
293    * The current progress of the record reader through its data.
294    *
295    * @return A number between 0.0 and 1.0, the fraction of the data read.
296    */
297   public float getProgress() {
298     // Depends on the total number of tuples
299     return 0;
300   }
301 
302 }