001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Method; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.ResultScanner; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.apache.hadoop.mapreduce.Counter; 033import org.apache.hadoop.mapreduce.InputSplit; 034import org.apache.hadoop.mapreduce.TaskAttemptContext; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 042 * pairs. 043 */ 044@InterfaceAudience.Public 045public class TableRecordReaderImpl { 046 public static final String LOG_PER_ROW_COUNT 047 = "hbase.mapreduce.log.scanner.rowcount"; 048 049 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 050 051 // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase 052 @InterfaceAudience.Private 053 static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters"; 054 055 private ResultScanner scanner = null; 056 private Scan scan = null; 057 private Scan currentScan = null; 058 private Table htable = null; 059 private byte[] lastSuccessfulRow = null; 060 private ImmutableBytesWritable key = null; 061 private Result value = null; 062 private TaskAttemptContext context = null; 063 private long numRestarts = 0; 064 private long numStale = 0; 065 private long timestamp; 066 private int rowcount; 067 private boolean logScannerActivity = false; 068 private int logPerRowCount = 100; 069 070 /** 071 * Restart from survivable exceptions by creating a new scanner. 072 * 073 * @param firstRow The first row to start at. 074 * @throws IOException When restarting fails. 075 */ 076 public void restart(byte[] firstRow) throws IOException { 077 // Update counter metrics based on current scan before reinitializing it 078 if (currentScan != null) { 079 updateCounters(); 080 } 081 currentScan = new Scan(scan); 082 currentScan.withStartRow(firstRow); 083 currentScan.setScanMetricsEnabled(true); 084 if (this.scanner != null) { 085 if (logScannerActivity) { 086 LOG.info("Closing the previously opened scanner object."); 087 } 088 this.scanner.close(); 089 } 090 this.scanner = this.htable.getScanner(currentScan); 091 if (logScannerActivity) { 092 LOG.info("Current scan=" + currentScan.toString()); 093 timestamp = System.currentTimeMillis(); 094 rowcount = 0; 095 } 096 } 097 098 /** 099 * In new mapreduce APIs, TaskAttemptContext has two getCounter methods 100 * Check if getCounter(String, String) method is available. 101 * @return The getCounter method or null if not available. 102 * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0 103 */ 104 @Deprecated 105 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) 106 throws IOException { 107 Method m = null; 108 try { 109 m = context.getClass().getMethod("getCounter", 110 new Class [] {String.class, String.class}); 111 } catch (SecurityException e) { 112 throw new IOException("Failed test for getCounter", e); 113 } catch (NoSuchMethodException e) { 114 // Ignore 115 } 116 return m; 117 } 118 119 /** 120 * Sets the HBase table. 121 * 122 * @param htable The {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 123 */ 124 public void setHTable(Table htable) { 125 Configuration conf = htable.getConfiguration(); 126 logScannerActivity = conf.getBoolean( 127 "hbase.client.log.scanner.activity" /*ScannerCallable.LOG_SCANNER_ACTIVITY*/, false); 128 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 129 this.htable = htable; 130 } 131 132 /** 133 * Sets the scan defining the actual details like columns etc. 134 * 135 * @param scan The scan to set. 136 */ 137 public void setScan(Scan scan) { 138 this.scan = scan; 139 } 140 141 /** 142 * Build the scanner. Not done in constructor to allow for extension. 143 */ 144 public void initialize(InputSplit inputsplit, 145 TaskAttemptContext context) throws IOException, 146 InterruptedException { 147 if (context != null) { 148 this.context = context; 149 } 150 restart(scan.getStartRow()); 151 } 152 153 /** 154 * Closes the split. 155 * 156 * 157 */ 158 public void close() { 159 if (this.scanner != null) { 160 this.scanner.close(); 161 } 162 try { 163 this.htable.close(); 164 } catch (IOException ioe) { 165 LOG.warn("Error closing table", ioe); 166 } 167 } 168 169 /** 170 * Returns the current key. 171 * 172 * @return The current key. 173 * @throws InterruptedException When the job is aborted. 174 */ 175 public ImmutableBytesWritable getCurrentKey() throws IOException, 176 InterruptedException { 177 return key; 178 } 179 180 /** 181 * Returns the current value. 182 * 183 * @return The current value. 184 * @throws IOException When the value is faulty. 185 * @throws InterruptedException When the job is aborted. 186 */ 187 public Result getCurrentValue() throws IOException, InterruptedException { 188 return value; 189 } 190 191 192 /** 193 * Positions the record reader to the next record. 194 * 195 * @return <code>true</code> if there was another record. 196 * @throws IOException When reading the record failed. 197 * @throws InterruptedException When the job was aborted. 198 */ 199 public boolean nextKeyValue() throws IOException, InterruptedException { 200 if (key == null) { 201 key = new ImmutableBytesWritable(); 202 } 203 if (value == null) { 204 value = new Result(); 205 } 206 try { 207 try { 208 value = this.scanner.next(); 209 if (value != null && value.isStale()) { 210 numStale++; 211 } 212 if (logScannerActivity) { 213 rowcount ++; 214 if (rowcount >= logPerRowCount) { 215 long now = System.currentTimeMillis(); 216 LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount); 217 timestamp = now; 218 rowcount = 0; 219 } 220 } 221 } catch (IOException e) { 222 // do not retry if the exception tells us not to do so 223 if (e instanceof DoNotRetryIOException) { 224 updateCounters(); 225 throw e; 226 } 227 // try to handle all other IOExceptions by restarting 228 // the scanner, if the second call fails, it will be rethrown 229 LOG.info("recovered from " + StringUtils.stringifyException(e)); 230 if (lastSuccessfulRow == null) { 231 LOG.warn("We are restarting the first next() invocation," + 232 " if your mapper has restarted a few other times like this" + 233 " then you should consider killing this job and investigate" + 234 " why it's taking so long."); 235 } 236 if (lastSuccessfulRow == null) { 237 restart(scan.getStartRow()); 238 } else { 239 restart(lastSuccessfulRow); 240 scanner.next(); // skip presumed already mapped row 241 } 242 value = scanner.next(); 243 if (value != null && value.isStale()) { 244 numStale++; 245 } 246 numRestarts++; 247 } 248 249 if (value != null && value.size() > 0) { 250 key.set(value.getRow()); 251 lastSuccessfulRow = key.get(); 252 return true; 253 } 254 255 // Need handle cursor result 256 if (value != null && value.isCursor()) { 257 key.set(value.getCursor().getRow()); 258 lastSuccessfulRow = key.get(); 259 return true; 260 } 261 262 updateCounters(); 263 return false; 264 } catch (IOException ioe) { 265 updateCounters(); 266 if (logScannerActivity) { 267 long now = System.currentTimeMillis(); 268 LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount); 269 LOG.info(ioe.toString(), ioe); 270 String lastRow = lastSuccessfulRow == null ? 271 "null" : Bytes.toStringBinary(lastSuccessfulRow); 272 LOG.info("lastSuccessfulRow=" + lastRow); 273 } 274 throw ioe; 275 } 276 } 277 278 /** 279 * If hbase runs on new version of mapreduce, RecordReader has access to 280 * counters thus can update counters based on scanMetrics. 281 * If hbase runs on old version of mapreduce, it won't be able to get 282 * access to counters and TableRecorderReader can't update counter values. 283 */ 284 private void updateCounters() { 285 ScanMetrics scanMetrics = scanner.getScanMetrics(); 286 if (scanMetrics == null) { 287 return; 288 } 289 290 updateCounters(scanMetrics, numRestarts, context, numStale); 291 } 292 293 /** 294 * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0 295 * Use {@link #updateCounters(ScanMetrics, long, TaskAttemptContext, long)} instead. 296 */ 297 @Deprecated 298 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 299 Method getCounter, TaskAttemptContext context, long numStale) { 300 updateCounters(scanMetrics, numScannerRestarts, context, numStale); 301 } 302 303 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 304 TaskAttemptContext context, long numStale) { 305 // we can get access to counters only if hbase uses new mapreduce APIs 306 if (context == null) { 307 return; 308 } 309 310 for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) { 311 Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey()); 312 if (counter != null) { 313 counter.increment(entry.getValue()); 314 } 315 } 316 if (numScannerRestarts != 0L) { 317 Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS"); 318 if (counter != null) { 319 counter.increment(numScannerRestarts); 320 } 321 } 322 if (numStale != 0L) { 323 Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE"); 324 if (counter != null) { 325 counter.increment(numStale); 326 } 327 } 328 } 329 330 /** 331 * The current progress of the record reader through its data. 332 * 333 * @return A number between 0.0 and 1.0, the fraction of the data read. 334 */ 335 public float getProgress() { 336 // Depends on the total number of tuples 337 return 0; 338 } 339 340}