View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.lang.reflect.Method;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.client.HTable;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.client.ResultScanner;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.client.ScannerCallable;
31  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.io.DataInputBuffer;
35  import org.apache.hadoop.mapreduce.Counter;
36  import org.apache.hadoop.mapreduce.InputSplit;
37  import org.apache.hadoop.mapreduce.TaskAttemptContext;
38  import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
39  import org.apache.hadoop.util.StringUtils;
40  
41  /**
42   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
43   * pairs.
44   */
45  public class TableRecordReaderImpl {
46    public static final String LOG_PER_ROW_COUNT
47      = "hbase.mapreduce.log.scanner.rowcount";
48  
49    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
50  
51    // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase
52    private static final String HBASE_COUNTER_GROUP_NAME =
53      "HBase Counters";
54    private ResultScanner scanner = null;
55    private Scan scan = null;
56    private Scan currentScan = null;
57    private HTable htable = null;
58    private byte[] lastSuccessfulRow = null;
59    private ImmutableBytesWritable key = null;
60    private Result value = null;
61    private TaskAttemptContext context = null;
62    private Method getCounter = null;
63    private long numRestarts = 0;
64    private long timestamp;
65    private int rowcount;
66    private boolean logScannerActivity = false;
67    private int logPerRowCount = 100;
68  
69    /**
70     * Restart from survivable exceptions by creating a new scanner.
71     *
72     * @param firstRow  The first row to start at.
73     * @throws IOException When restarting fails.
74     */
75    public void restart(byte[] firstRow) throws IOException {
76      currentScan = new Scan(scan);
77      currentScan.setStartRow(firstRow);
78      currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
79        Bytes.toBytes(Boolean.TRUE));
80      if (this.scanner != null) {
81        if (logScannerActivity) {
82          LOG.info("Closing the previously opened scanner object.");
83        }
84        this.scanner.close();
85      }
86      this.scanner = this.htable.getScanner(currentScan);
87      if (logScannerActivity) {
88        LOG.info("Current scan=" + currentScan.toString());
89        timestamp = System.currentTimeMillis();
90        rowcount = 0;
91      }
92    }
93  
94    /**
95     * In new mapreduce APIs, TaskAttemptContext has two getCounter methods
96     * Check if getCounter(String, String) method is available.
97     * @return The getCounter method or null if not available.
98     * @throws IOException
99     */
100   private Method retrieveGetCounterWithStringsParams(TaskAttemptContext context)
101   throws IOException {
102     Method m = null;
103     try {
104       m = context.getClass().getMethod("getCounter",
105         new Class [] {String.class, String.class});
106     } catch (SecurityException e) {
107       throw new IOException("Failed test for getCounter", e);
108     } catch (NoSuchMethodException e) {
109       // Ignore
110     }
111     return m;
112   }
113 
114   /**
115    * Sets the HBase table.
116    *
117    * @param htable  The {@link HTable} to scan.
118    */
119   public void setHTable(HTable htable) {
120     Configuration conf = htable.getConfiguration();
121     logScannerActivity = conf.getBoolean(
122       ScannerCallable.LOG_SCANNER_ACTIVITY, false);
123     logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
124     this.htable = htable;
125   }
126 
127   /**
128    * Sets the scan defining the actual details like columns etc.
129    *
130    * @param scan  The scan to set.
131    */
132   public void setScan(Scan scan) {
133     this.scan = scan;
134   }
135 
136   /**
137    * Build the scanner. Not done in constructor to allow for extension.
138    *
139    * @throws IOException, InterruptedException
140    */
141   public void initialize(InputSplit inputsplit,
142       TaskAttemptContext context) throws IOException,
143       InterruptedException {
144     if (context != null) {
145       this.context = context;
146       getCounter = retrieveGetCounterWithStringsParams(context);
147     }
148     restart(scan.getStartRow());
149   }
150 
151   /**
152    * Closes the split.
153    *
154    *
155    */
156   public void close() {
157     this.scanner.close();
158   }
159 
160   /**
161    * Returns the current key.
162    *
163    * @return The current key.
164    * @throws IOException
165    * @throws InterruptedException When the job is aborted.
166    */
167   public ImmutableBytesWritable getCurrentKey() throws IOException,
168       InterruptedException {
169     return key;
170   }
171 
172   /**
173    * Returns the current value.
174    *
175    * @return The current value.
176    * @throws IOException When the value is faulty.
177    * @throws InterruptedException When the job is aborted.
178    */
179   public Result getCurrentValue() throws IOException, InterruptedException {
180     return value;
181   }
182 
183 
184   /**
185    * Positions the record reader to the next record.
186    *
187    * @return <code>true</code> if there was another record.
188    * @throws IOException When reading the record failed.
189    * @throws InterruptedException When the job was aborted.
190    */
191   public boolean nextKeyValue() throws IOException, InterruptedException {
192     if (key == null) key = new ImmutableBytesWritable();
193     if (value == null) value = new Result();
194     try {
195       try {
196         value = this.scanner.next();
197         if (logScannerActivity) {
198           rowcount ++;
199           if (rowcount >= logPerRowCount) {
200             long now = System.currentTimeMillis();
201             LOG.info("Mapper took " + (now-timestamp)
202               + "ms to process " + rowcount + " rows");
203             timestamp = now;
204             rowcount = 0;
205           }
206         }
207       } catch (IOException e) {
208         // try to handle all IOExceptions by restarting
209         // the scanner, if the second call fails, it will be rethrown
210         LOG.info("recovered from " + StringUtils.stringifyException(e));
211         if (lastSuccessfulRow == null) {
212           LOG.warn("We are restarting the first next() invocation," +
213               " if your mapper has restarted a few other times like this" +
214               " then you should consider killing this job and investigate" +
215               " why it's taking so long.");
216         }
217         if (lastSuccessfulRow == null) {
218           restart(scan.getStartRow());
219         } else {
220           restart(lastSuccessfulRow);
221           scanner.next();    // skip presumed already mapped row
222         }
223         value = scanner.next();
224         numRestarts++;
225       }
226       if (value != null && value.size() > 0) {
227         key.set(value.getRow());
228         lastSuccessfulRow = key.get();
229         return true;
230       }
231 
232       updateCounters();
233       return false;
234     } catch (IOException ioe) {
235       if (logScannerActivity) {
236         long now = System.currentTimeMillis();
237         LOG.info("Mapper took " + (now-timestamp)
238           + "ms to process " + rowcount + " rows");
239         LOG.info(ioe);
240         String lastRow = lastSuccessfulRow == null ?
241           "null" : Bytes.toStringBinary(lastSuccessfulRow);
242         LOG.info("lastSuccessfulRow=" + lastRow);
243       }
244       throw ioe;
245     }
246   }
247 
248   /**
249    * If hbase runs on new version of mapreduce, RecordReader has access to
250    * counters thus can update counters based on scanMetrics.
251    * If hbase runs on old version of mapreduce, it won't be able to get
252    * access to counters and TableRecorderReader can't update counter values.
253    * @throws IOException
254    */
255   private void updateCounters() throws IOException {
256     // we can get access to counters only if hbase uses new mapreduce APIs
257     if (this.getCounter == null) {
258       return;
259     }
260 
261     byte[] serializedMetrics = currentScan.getAttribute(
262         Scan.SCAN_ATTRIBUTES_METRICS_DATA);
263     if (serializedMetrics == null || serializedMetrics.length == 0 ) {
264       return;
265     }
266 
267     DataInputBuffer in = new DataInputBuffer();
268     in.reset(serializedMetrics, 0, serializedMetrics.length);
269     ScanMetrics scanMetrics = new ScanMetrics();
270     scanMetrics.readFields(in);
271     MetricsTimeVaryingLong[] mlvs =
272       scanMetrics.getMetricsTimeVaryingLongArray();
273 
274     try {
275       for (MetricsTimeVaryingLong mlv : mlvs) {
276         Counter ct = (Counter)this.getCounter.invoke(context,
277           HBASE_COUNTER_GROUP_NAME, mlv.getName());
278         ct.increment(mlv.getCurrentIntervalValue());
279       }
280       ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
281           "NUM_SCANNER_RESTARTS")).increment(numRestarts);
282     } catch (Exception e) {
283       LOG.debug("can't update counter." + StringUtils.stringifyException(e));
284     }
285   }
286 
287   /**
288    * The current progress of the record reader through its data.
289    *
290    * @return A number between 0.0 and 1.0, the fraction of the data read.
291    */
292   public float getProgress() {
293     // Depends on the total number of tuples
294     return 0;
295   }
296 
297 }