001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Method; 022import java.util.Map; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.ResultScanner; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.client.ScannerCallable; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.mapreduce.Counter; 037import org.apache.hadoop.mapreduce.InputSplit; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.util.StringUtils; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042 043/** 044 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 045 * pairs. 046 */ 047@InterfaceAudience.Public 048public class TableRecordReaderImpl { 049 public static final String LOG_PER_ROW_COUNT 050 = "hbase.mapreduce.log.scanner.rowcount"; 051 052 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 053 054 // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase 055 @VisibleForTesting 056 static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters"; 057 private ResultScanner scanner = null; 058 private Scan scan = null; 059 private Scan currentScan = null; 060 private Table htable = null; 061 private byte[] lastSuccessfulRow = null; 062 private ImmutableBytesWritable key = null; 063 private Result value = null; 064 private TaskAttemptContext context = null; 065 private Method getCounter = null; 066 private long numRestarts = 0; 067 private long numStale = 0; 068 private long timestamp; 069 private int rowcount; 070 private boolean logScannerActivity = false; 071 private int logPerRowCount = 100; 072 073 /** 074 * Restart from survivable exceptions by creating a new scanner. 075 * 076 * @param firstRow The first row to start at. 077 * @throws IOException When restarting fails. 078 */ 079 public void restart(byte[] firstRow) throws IOException { 080 // Update counter metrics based on current scan before reinitializing it 081 if (currentScan != null) { 082 updateCounters(); 083 } 084 currentScan = new Scan(scan); 085 currentScan.withStartRow(firstRow); 086 currentScan.setScanMetricsEnabled(true); 087 if (this.scanner != null) { 088 if (logScannerActivity) { 089 LOG.info("Closing the previously opened scanner object."); 090 } 091 this.scanner.close(); 092 } 093 this.scanner = this.htable.getScanner(currentScan); 094 if (logScannerActivity) { 095 LOG.info("Current scan=" + currentScan.toString()); 096 timestamp = System.currentTimeMillis(); 097 rowcount = 0; 098 } 099 } 100 101 /** 102 * In new mapreduce APIs, TaskAttemptContext has two getCounter methods 103 * Check if getCounter(String, String) method is available. 104 * @return The getCounter method or null if not available. 105 * @throws IOException 106 */ 107 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) 108 throws IOException { 109 Method m = null; 110 try { 111 m = context.getClass().getMethod("getCounter", 112 new Class [] {String.class, String.class}); 113 } catch (SecurityException e) { 114 throw new IOException("Failed test for getCounter", e); 115 } catch (NoSuchMethodException e) { 116 // Ignore 117 } 118 return m; 119 } 120 121 /** 122 * Sets the HBase table. 123 * 124 * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 125 */ 126 public void setHTable(Table htable) { 127 Configuration conf = htable.getConfiguration(); 128 logScannerActivity = conf.getBoolean( 129 ScannerCallable.LOG_SCANNER_ACTIVITY, false); 130 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 131 this.htable = htable; 132 } 133 134 /** 135 * Sets the scan defining the actual details like columns etc. 136 * 137 * @param scan The scan to set. 138 */ 139 public void setScan(Scan scan) { 140 this.scan = scan; 141 } 142 143 /** 144 * Build the scanner. Not done in constructor to allow for extension. 145 * 146 * @throws IOException 147 * @throws InterruptedException 148 */ 149 public void initialize(InputSplit inputsplit, 150 TaskAttemptContext context) throws IOException, 151 InterruptedException { 152 if (context != null) { 153 this.context = context; 154 getCounter = retrieveGetCounterWithStringsParams(context); 155 } 156 restart(scan.getStartRow()); 157 } 158 159 /** 160 * Closes the split. 161 * 162 * 163 */ 164 public void close() { 165 if (this.scanner != null) { 166 this.scanner.close(); 167 } 168 try { 169 this.htable.close(); 170 } catch (IOException ioe) { 171 LOG.warn("Error closing table", ioe); 172 } 173 } 174 175 /** 176 * Returns the current key. 177 * 178 * @return The current key. 179 * @throws IOException 180 * @throws InterruptedException When the job is aborted. 181 */ 182 public ImmutableBytesWritable getCurrentKey() throws IOException, 183 InterruptedException { 184 return key; 185 } 186 187 /** 188 * Returns the current value. 189 * 190 * @return The current value. 191 * @throws IOException When the value is faulty. 192 * @throws InterruptedException When the job is aborted. 193 */ 194 public Result getCurrentValue() throws IOException, InterruptedException { 195 return value; 196 } 197 198 199 /** 200 * Positions the record reader to the next record. 201 * 202 * @return <code>true</code> if there was another record. 203 * @throws IOException When reading the record failed. 204 * @throws InterruptedException When the job was aborted. 205 */ 206 public boolean nextKeyValue() throws IOException, InterruptedException { 207 if (key == null) key = new ImmutableBytesWritable(); 208 if (value == null) value = new Result(); 209 try { 210 try { 211 value = this.scanner.next(); 212 if (value != null && value.isStale()) numStale++; 213 if (logScannerActivity) { 214 rowcount ++; 215 if (rowcount >= logPerRowCount) { 216 long now = System.currentTimeMillis(); 217 LOG.info("Mapper took " + (now-timestamp) 218 + "ms to process " + rowcount + " rows"); 219 timestamp = now; 220 rowcount = 0; 221 } 222 } 223 } catch (IOException e) { 224 // do not retry if the exception tells us not to do so 225 if (e instanceof DoNotRetryIOException) { 226 updateCounters(); 227 throw e; 228 } 229 // try to handle all other IOExceptions by restarting 230 // the scanner, if the second call fails, it will be rethrown 231 LOG.info("recovered from " + StringUtils.stringifyException(e)); 232 if (lastSuccessfulRow == null) { 233 LOG.warn("We are restarting the first next() invocation," + 234 " if your mapper has restarted a few other times like this" + 235 " then you should consider killing this job and investigate" + 236 " why it's taking so long."); 237 } 238 if (lastSuccessfulRow == null) { 239 restart(scan.getStartRow()); 240 } else { 241 restart(lastSuccessfulRow); 242 scanner.next(); // skip presumed already mapped row 243 } 244 value = scanner.next(); 245 if (value != null && value.isStale()) numStale++; 246 numRestarts++; 247 } 248 249 if (value != null && value.size() > 0) { 250 key.set(value.getRow()); 251 lastSuccessfulRow = key.get(); 252 return true; 253 } 254 255 // Need handle cursor result 256 if (value != null && value.isCursor()) { 257 key.set(value.getCursor().getRow()); 258 lastSuccessfulRow = key.get(); 259 return true; 260 } 261 262 updateCounters(); 263 return false; 264 } catch (IOException ioe) { 265 updateCounters(); 266 if (logScannerActivity) { 267 long now = System.currentTimeMillis(); 268 LOG.info("Mapper took " + (now-timestamp) 269 + "ms to process " + rowcount + " rows"); 270 LOG.info(ioe.toString(), ioe); 271 String lastRow = lastSuccessfulRow == null ? 272 "null" : Bytes.toStringBinary(lastSuccessfulRow); 273 LOG.info("lastSuccessfulRow=" + lastRow); 274 } 275 throw ioe; 276 } 277 } 278 279 /** 280 * If hbase runs on new version of mapreduce, RecordReader has access to 281 * counters thus can update counters based on scanMetrics. 282 * If hbase runs on old version of mapreduce, it won't be able to get 283 * access to counters and TableRecorderReader can't update counter values. 284 * @throws IOException 285 */ 286 private void updateCounters() throws IOException { 287 ScanMetrics scanMetrics = scanner.getScanMetrics(); 288 if (scanMetrics == null) { 289 return; 290 } 291 292 updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); 293 } 294 295 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 296 Method getCounter, TaskAttemptContext context, long numStale) { 297 // we can get access to counters only if hbase uses new mapreduce APIs 298 if (getCounter == null) { 299 return; 300 } 301 302 try { 303 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { 304 Counter ct = (Counter)getCounter.invoke(context, 305 HBASE_COUNTER_GROUP_NAME, entry.getKey()); 306 307 ct.increment(entry.getValue()); 308 } 309 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, 310 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); 311 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, 312 "NUM_SCAN_RESULTS_STALE")).increment(numStale); 313 } catch (Exception e) { 314 LOG.debug("can't update counter." + StringUtils.stringifyException(e)); 315 } 316 } 317 318 /** 319 * The current progress of the record reader through its data. 320 * 321 * @return A number between 0.0 and 1.0, the fraction of the data read. 322 */ 323 public float getProgress() { 324 // Depends on the total number of tuples 325 return 0; 326 } 327 328}