001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Method; 022import java.util.Map; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.ResultScanner; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.client.ScannerCallable; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.mapreduce.Counter; 037import org.apache.hadoop.mapreduce.InputSplit; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.util.StringUtils; 040 041import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 042 043/** 044 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 045 * pairs. 046 */ 047@InterfaceAudience.Public 048public class TableRecordReaderImpl { 049 public static final String LOG_PER_ROW_COUNT 050 = "hbase.mapreduce.log.scanner.rowcount"; 051 052 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 053 054 // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase 055 @VisibleForTesting 056 static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters"; 057 private ResultScanner scanner = null; 058 private Scan scan = null; 059 private Scan currentScan = null; 060 private Table htable = null; 061 private byte[] lastSuccessfulRow = null; 062 private ImmutableBytesWritable key = null; 063 private Result value = null; 064 private TaskAttemptContext context = null; 065 private Method getCounter = null; 066 private long numRestarts = 0; 067 private long numStale = 0; 068 private long timestamp; 069 private int rowcount; 070 private boolean logScannerActivity = false; 071 private int logPerRowCount = 100; 072 073 /** 074 * Restart from survivable exceptions by creating a new scanner. 075 * 076 * @param firstRow The first row to start at. 077 * @throws IOException When restarting fails. 078 */ 079 public void restart(byte[] firstRow) throws IOException { 080 currentScan = new Scan(scan); 081 currentScan.withStartRow(firstRow); 082 currentScan.setScanMetricsEnabled(true); 083 if (this.scanner != null) { 084 if (logScannerActivity) { 085 LOG.info("Closing the previously opened scanner object."); 086 } 087 this.scanner.close(); 088 } 089 this.scanner = this.htable.getScanner(currentScan); 090 if (logScannerActivity) { 091 LOG.info("Current scan=" + currentScan.toString()); 092 timestamp = System.currentTimeMillis(); 093 rowcount = 0; 094 } 095 } 096 097 /** 098 * In new mapreduce APIs, TaskAttemptContext has two getCounter methods 099 * Check if getCounter(String, String) method is available. 100 * @return The getCounter method or null if not available. 101 * @throws IOException 102 */ 103 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) 104 throws IOException { 105 Method m = null; 106 try { 107 m = context.getClass().getMethod("getCounter", 108 new Class [] {String.class, String.class}); 109 } catch (SecurityException e) { 110 throw new IOException("Failed test for getCounter", e); 111 } catch (NoSuchMethodException e) { 112 // Ignore 113 } 114 return m; 115 } 116 117 /** 118 * Sets the HBase table. 119 * 120 * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 121 */ 122 public void setHTable(Table htable) { 123 Configuration conf = htable.getConfiguration(); 124 logScannerActivity = conf.getBoolean( 125 ScannerCallable.LOG_SCANNER_ACTIVITY, false); 126 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 127 this.htable = htable; 128 } 129 130 /** 131 * Sets the scan defining the actual details like columns etc. 132 * 133 * @param scan The scan to set. 134 */ 135 public void setScan(Scan scan) { 136 this.scan = scan; 137 } 138 139 /** 140 * Build the scanner. Not done in constructor to allow for extension. 141 * 142 * @throws IOException 143 * @throws InterruptedException 144 */ 145 public void initialize(InputSplit inputsplit, 146 TaskAttemptContext context) throws IOException, 147 InterruptedException { 148 if (context != null) { 149 this.context = context; 150 getCounter = retrieveGetCounterWithStringsParams(context); 151 } 152 restart(scan.getStartRow()); 153 } 154 155 /** 156 * Closes the split. 157 * 158 * 159 */ 160 public void close() { 161 if (this.scanner != null) { 162 this.scanner.close(); 163 } 164 try { 165 this.htable.close(); 166 } catch (IOException ioe) { 167 LOG.warn("Error closing table", ioe); 168 } 169 } 170 171 /** 172 * Returns the current key. 173 * 174 * @return The current key. 175 * @throws IOException 176 * @throws InterruptedException When the job is aborted. 177 */ 178 public ImmutableBytesWritable getCurrentKey() throws IOException, 179 InterruptedException { 180 return key; 181 } 182 183 /** 184 * Returns the current value. 185 * 186 * @return The current value. 187 * @throws IOException When the value is faulty. 188 * @throws InterruptedException When the job is aborted. 189 */ 190 public Result getCurrentValue() throws IOException, InterruptedException { 191 return value; 192 } 193 194 195 /** 196 * Positions the record reader to the next record. 197 * 198 * @return <code>true</code> if there was another record. 199 * @throws IOException When reading the record failed. 200 * @throws InterruptedException When the job was aborted. 201 */ 202 public boolean nextKeyValue() throws IOException, InterruptedException { 203 if (key == null) key = new ImmutableBytesWritable(); 204 if (value == null) value = new Result(); 205 try { 206 try { 207 value = this.scanner.next(); 208 if (value != null && value.isStale()) numStale++; 209 if (logScannerActivity) { 210 rowcount ++; 211 if (rowcount >= logPerRowCount) { 212 long now = System.currentTimeMillis(); 213 LOG.info("Mapper took " + (now-timestamp) 214 + "ms to process " + rowcount + " rows"); 215 timestamp = now; 216 rowcount = 0; 217 } 218 } 219 } catch (IOException e) { 220 // do not retry if the exception tells us not to do so 221 if (e instanceof DoNotRetryIOException) { 222 throw e; 223 } 224 // try to handle all other IOExceptions by restarting 225 // the scanner, if the second call fails, it will be rethrown 226 LOG.info("recovered from " + StringUtils.stringifyException(e)); 227 if (lastSuccessfulRow == null) { 228 LOG.warn("We are restarting the first next() invocation," + 229 " if your mapper has restarted a few other times like this" + 230 " then you should consider killing this job and investigate" + 231 " why it's taking so long."); 232 } 233 if (lastSuccessfulRow == null) { 234 restart(scan.getStartRow()); 235 } else { 236 restart(lastSuccessfulRow); 237 scanner.next(); // skip presumed already mapped row 238 } 239 value = scanner.next(); 240 if (value != null && value.isStale()) numStale++; 241 numRestarts++; 242 } 243 244 if (value != null && value.size() > 0) { 245 key.set(value.getRow()); 246 lastSuccessfulRow = key.get(); 247 return true; 248 } 249 250 // Need handle cursor result 251 if (value != null && value.isCursor()) { 252 key.set(value.getCursor().getRow()); 253 lastSuccessfulRow = key.get(); 254 return true; 255 } 256 257 updateCounters(); 258 return false; 259 } catch (IOException ioe) { 260 if (logScannerActivity) { 261 long now = System.currentTimeMillis(); 262 LOG.info("Mapper took " + (now-timestamp) 263 + "ms to process " + rowcount + " rows"); 264 LOG.info(ioe.toString(), ioe); 265 String lastRow = lastSuccessfulRow == null ? 266 "null" : Bytes.toStringBinary(lastSuccessfulRow); 267 LOG.info("lastSuccessfulRow=" + lastRow); 268 } 269 throw ioe; 270 } 271 } 272 273 /** 274 * If hbase runs on new version of mapreduce, RecordReader has access to 275 * counters thus can update counters based on scanMetrics. 276 * If hbase runs on old version of mapreduce, it won't be able to get 277 * access to counters and TableRecorderReader can't update counter values. 278 * @throws IOException 279 */ 280 private void updateCounters() throws IOException { 281 ScanMetrics scanMetrics = scanner.getScanMetrics(); 282 if (scanMetrics == null) { 283 return; 284 } 285 286 updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); 287 } 288 289 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 290 Method getCounter, TaskAttemptContext context, long numStale) { 291 // we can get access to counters only if hbase uses new mapreduce APIs 292 if (getCounter == null) { 293 return; 294 } 295 296 try { 297 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { 298 Counter ct = (Counter)getCounter.invoke(context, 299 HBASE_COUNTER_GROUP_NAME, entry.getKey()); 300 301 ct.increment(entry.getValue()); 302 } 303 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, 304 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); 305 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, 306 "NUM_SCAN_RESULTS_STALE")).increment(numStale); 307 } catch (Exception e) { 308 LOG.debug("can't update counter." + StringUtils.stringifyException(e)); 309 } 310 } 311 312 /** 313 * The current progress of the record reader through its data. 314 * 315 * @return A number between 0.0 and 1.0, the fraction of the data read. 316 */ 317 public float getProgress() { 318 // Depends on the total number of tuples 319 return 0; 320 } 321 322}