001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Method; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.ResultScanner; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.mapreduce.Counter; 033import org.apache.hadoop.mapreduce.InputSplit; 034import org.apache.hadoop.mapreduce.TaskAttemptContext; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 040 041/** 042 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 043 * pairs. 044 */ 045@InterfaceAudience.Public 046public class TableRecordReaderImpl { 047 public static final String LOG_PER_ROW_COUNT 048 = "hbase.mapreduce.log.scanner.rowcount"; 049 050 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 051 052 // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase 053 @VisibleForTesting 054 static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters"; 055 private ResultScanner scanner = null; 056 private Scan scan = null; 057 private Scan currentScan = null; 058 private Table htable = null; 059 private byte[] lastSuccessfulRow = null; 060 private ImmutableBytesWritable key = null; 061 private Result value = null; 062 private TaskAttemptContext context = null; 063 private Method getCounter = null; 064 private long numRestarts = 0; 065 private long numStale = 0; 066 private long timestamp; 067 private int rowcount; 068 private boolean logScannerActivity = false; 069 private int logPerRowCount = 100; 070 071 /** 072 * Restart from survivable exceptions by creating a new scanner. 073 * 074 * @param firstRow The first row to start at. 075 * @throws IOException When restarting fails. 076 */ 077 public void restart(byte[] firstRow) throws IOException { 078 // Update counter metrics based on current scan before reinitializing it 079 if (currentScan != null) { 080 updateCounters(); 081 } 082 currentScan = new Scan(scan); 083 currentScan.withStartRow(firstRow); 084 currentScan.setScanMetricsEnabled(true); 085 if (this.scanner != null) { 086 if (logScannerActivity) { 087 LOG.info("Closing the previously opened scanner object."); 088 } 089 this.scanner.close(); 090 } 091 this.scanner = this.htable.getScanner(currentScan); 092 if (logScannerActivity) { 093 LOG.info("Current scan=" + currentScan.toString()); 094 timestamp = System.currentTimeMillis(); 095 rowcount = 0; 096 } 097 } 098 099 /** 100 * In new mapreduce APIs, TaskAttemptContext has two getCounter methods 101 * Check if getCounter(String, String) method is available. 102 * @return The getCounter method or null if not available. 103 */ 104 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) 105 throws IOException { 106 Method m = null; 107 try { 108 m = context.getClass().getMethod("getCounter", 109 new Class [] {String.class, String.class}); 110 } catch (SecurityException e) { 111 throw new IOException("Failed test for getCounter", e); 112 } catch (NoSuchMethodException e) { 113 // Ignore 114 } 115 return m; 116 } 117 118 /** 119 * Sets the HBase table. 120 * 121 * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 122 */ 123 public void setHTable(Table htable) { 124 Configuration conf = htable.getConfiguration(); 125 logScannerActivity = conf.getBoolean( 126 "hbase.client.log.scanner.activity" /*ScannerCallable.LOG_SCANNER_ACTIVITY*/, false); 127 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 128 this.htable = htable; 129 } 130 131 /** 132 * Sets the scan defining the actual details like columns etc. 133 * 134 * @param scan The scan to set. 135 */ 136 public void setScan(Scan scan) { 137 this.scan = scan; 138 } 139 140 /** 141 * Build the scanner. Not done in constructor to allow for extension. 142 */ 143 public void initialize(InputSplit inputsplit, 144 TaskAttemptContext context) throws IOException, 145 InterruptedException { 146 if (context != null) { 147 this.context = context; 148 getCounter = retrieveGetCounterWithStringsParams(context); 149 } 150 restart(scan.getStartRow()); 151 } 152 153 /** 154 * Closes the split. 155 * 156 * 157 */ 158 public void close() { 159 if (this.scanner != null) { 160 this.scanner.close(); 161 } 162 try { 163 this.htable.close(); 164 } catch (IOException ioe) { 165 LOG.warn("Error closing table", ioe); 166 } 167 } 168 169 /** 170 * Returns the current key. 171 * 172 * @return The current key. 173 * @throws InterruptedException When the job is aborted. 174 */ 175 public ImmutableBytesWritable getCurrentKey() throws IOException, 176 InterruptedException { 177 return key; 178 } 179 180 /** 181 * Returns the current value. 182 * 183 * @return The current value. 184 * @throws IOException When the value is faulty. 185 * @throws InterruptedException When the job is aborted. 186 */ 187 public Result getCurrentValue() throws IOException, InterruptedException { 188 return value; 189 } 190 191 192 /** 193 * Positions the record reader to the next record. 194 * 195 * @return <code>true</code> if there was another record. 196 * @throws IOException When reading the record failed. 197 * @throws InterruptedException When the job was aborted. 198 */ 199 public boolean nextKeyValue() throws IOException, InterruptedException { 200 if (key == null) { 201 key = new ImmutableBytesWritable(); 202 } 203 if (value == null) { 204 value = new Result(); 205 } 206 try { 207 try { 208 value = this.scanner.next(); 209 if (value != null && value.isStale()) { 210 numStale++; 211 } 212 if (logScannerActivity) { 213 rowcount ++; 214 if (rowcount >= logPerRowCount) { 215 long now = System.currentTimeMillis(); 216 LOG.info("Mapper took " + (now-timestamp) 217 + "ms to process " + rowcount + " rows"); 218 timestamp = now; 219 rowcount = 0; 220 } 221 } 222 } catch (IOException e) { 223 // do not retry if the exception tells us not to do so 224 if (e instanceof DoNotRetryIOException) { 225 updateCounters(); 226 throw e; 227 } 228 // try to handle all other IOExceptions by restarting 229 // the scanner, if the second call fails, it will be rethrown 230 LOG.info("recovered from " + StringUtils.stringifyException(e)); 231 if (lastSuccessfulRow == null) { 232 LOG.warn("We are restarting the first next() invocation," + 233 " if your mapper has restarted a few other times like this" + 234 " then you should consider killing this job and investigate" + 235 " why it's taking so long."); 236 } 237 if (lastSuccessfulRow == null) { 238 restart(scan.getStartRow()); 239 } else { 240 restart(lastSuccessfulRow); 241 scanner.next(); // skip presumed already mapped row 242 } 243 value = scanner.next(); 244 if (value != null && value.isStale()) { 245 numStale++; 246 } 247 numRestarts++; 248 } 249 250 if (value != null && value.size() > 0) { 251 key.set(value.getRow()); 252 lastSuccessfulRow = key.get(); 253 return true; 254 } 255 256 // Need handle cursor result 257 if (value != null && value.isCursor()) { 258 key.set(value.getCursor().getRow()); 259 lastSuccessfulRow = key.get(); 260 return true; 261 } 262 263 updateCounters(); 264 return false; 265 } catch (IOException ioe) { 266 updateCounters(); 267 if (logScannerActivity) { 268 long now = System.currentTimeMillis(); 269 LOG.info("Mapper took " + (now-timestamp) 270 + "ms to process " + rowcount + " rows"); 271 LOG.info(ioe.toString(), ioe); 272 String lastRow = lastSuccessfulRow == null ? 273 "null" : Bytes.toStringBinary(lastSuccessfulRow); 274 LOG.info("lastSuccessfulRow=" + lastRow); 275 } 276 throw ioe; 277 } 278 } 279 280 /** 281 * If hbase runs on new version of mapreduce, RecordReader has access to 282 * counters thus can update counters based on scanMetrics. 283 * If hbase runs on old version of mapreduce, it won't be able to get 284 * access to counters and TableRecorderReader can't update counter values. 285 */ 286 private void updateCounters() throws IOException { 287 ScanMetrics scanMetrics = scanner.getScanMetrics(); 288 if (scanMetrics == null) { 289 return; 290 } 291 292 updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); 293 } 294 295 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 296 Method getCounter, TaskAttemptContext context, long numStale) { 297 // we can get access to counters only if hbase uses new mapreduce APIs 298 if (getCounter == null) { 299 return; 300 } 301 302 try { 303 for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { 304 Counter ct = (Counter)getCounter.invoke(context, 305 HBASE_COUNTER_GROUP_NAME, entry.getKey()); 306 307 ct.increment(entry.getValue()); 308 } 309 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, 310 "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); 311 ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, 312 "NUM_SCAN_RESULTS_STALE")).increment(numStale); 313 } catch (Exception e) { 314 LOG.debug("can't update counter." + StringUtils.stringifyException(e)); 315 } 316 } 317 318 /** 319 * The current progress of the record reader through its data. 320 * 321 * @return A number between 0.0 and 1.0, the fraction of the data read. 322 */ 323 public float getProgress() { 324 // Depends on the total number of tuples 325 return 0; 326 } 327 328}