001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.lang.reflect.Method; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.ConnectionConfiguration; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.ResultScanner; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.client.Table; 030import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.mapreduce.Counter; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.util.StringUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) pairs. 044 */ 045@InterfaceAudience.Public 046public class TableRecordReaderImpl { 047 public static final String LOG_PER_ROW_COUNT = "hbase.mapreduce.log.scanner.rowcount"; 048 049 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 050 051 // HBASE_COUNTER_GROUP_NAME is the name of mapreduce counter group for HBase 052 @InterfaceAudience.Private 053 static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters"; 054 055 private ResultScanner scanner = null; 056 private Scan scan = null; 057 private Scan currentScan = null; 058 private Table htable = null; 059 private byte[] lastSuccessfulRow = null; 060 private ImmutableBytesWritable key = null; 061 private Result value = null; 062 private TaskAttemptContext context = null; 063 private long numRestarts = 0; 064 private long numStale = 0; 065 private long timestamp; 066 private int rowcount; 067 private boolean logScannerActivity = false; 068 private int logPerRowCount = 100; 069 070 /** 071 * Restart from survivable exceptions by creating a new scanner. 072 * @param firstRow The first row to start at. 073 * @throws IOException When restarting fails. 074 */ 075 public void restart(byte[] firstRow) throws IOException { 076 // Update counter metrics based on current scan before reinitializing it 077 if (currentScan != null) { 078 updateCounters(); 079 } 080 currentScan = new Scan(scan); 081 currentScan.withStartRow(firstRow); 082 currentScan.setScanMetricsEnabled(true); 083 if (this.scanner != null) { 084 if (logScannerActivity) { 085 LOG.info("Closing the previously opened scanner object."); 086 } 087 this.scanner.close(); 088 } 089 this.scanner = this.htable.getScanner(currentScan); 090 if (logScannerActivity) { 091 LOG.info("Current scan=" + currentScan.toString()); 092 timestamp = EnvironmentEdgeManager.currentTime(); 093 rowcount = 0; 094 } 095 } 096 097 /** 098 * In new mapreduce APIs, TaskAttemptContext has two getCounter methods Check if 099 * getCounter(String, String) method is available. 100 * @return The getCounter method or null if not available. 101 * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0 102 */ 103 @Deprecated 104 protected static Method retrieveGetCounterWithStringsParams(TaskAttemptContext context) 105 throws IOException { 106 Method m = null; 107 try { 108 m = context.getClass().getMethod("getCounter", new Class[] { String.class, String.class }); 109 } catch (SecurityException e) { 110 throw new IOException("Failed test for getCounter", e); 111 } catch (NoSuchMethodException e) { 112 // Ignore 113 } 114 return m; 115 } 116 117 /** 118 * Sets the HBase table. 119 * @param htable The table to scan. 120 */ 121 public void setHTable(Table htable) { 122 Configuration conf = htable.getConfiguration(); 123 logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false); 124 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 125 this.htable = htable; 126 } 127 128 /** 129 * Sets the scan defining the actual details like columns etc. 130 * @param scan The scan to set. 131 */ 132 public void setScan(Scan scan) { 133 this.scan = scan; 134 } 135 136 /** 137 * Build the scanner. Not done in constructor to allow for extension. 138 */ 139 public void initialize(InputSplit inputsplit, TaskAttemptContext context) 140 throws IOException, InterruptedException { 141 if (context != null) { 142 this.context = context; 143 } 144 restart(scan.getStartRow()); 145 } 146 147 /** 148 * Closes the split. 149 */ 150 public void close() { 151 if (this.scanner != null) { 152 this.scanner.close(); 153 } 154 try { 155 this.htable.close(); 156 } catch (IOException ioe) { 157 LOG.warn("Error closing table", ioe); 158 } 159 } 160 161 /** 162 * Returns the current key. 163 * @return The current key. 164 * @throws InterruptedException When the job is aborted. 165 */ 166 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 167 return key; 168 } 169 170 /** 171 * Returns the current value. 172 * @return The current value. 173 * @throws IOException When the value is faulty. 174 * @throws InterruptedException When the job is aborted. 175 */ 176 public Result getCurrentValue() throws IOException, InterruptedException { 177 return value; 178 } 179 180 /** 181 * Positions the record reader to the next record. 182 * @return <code>true</code> if there was another record. 183 * @throws IOException When reading the record failed. 184 * @throws InterruptedException When the job was aborted. 185 */ 186 public boolean nextKeyValue() throws IOException, InterruptedException { 187 if (key == null) { 188 key = new ImmutableBytesWritable(); 189 } 190 if (value == null) { 191 value = new Result(); 192 } 193 try { 194 try { 195 value = this.scanner.next(); 196 if (value != null && value.isStale()) { 197 numStale++; 198 } 199 if (logScannerActivity) { 200 rowcount++; 201 if (rowcount >= logPerRowCount) { 202 long now = EnvironmentEdgeManager.currentTime(); 203 LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount); 204 timestamp = now; 205 rowcount = 0; 206 } 207 } 208 } catch (IOException e) { 209 // do not retry if the exception tells us not to do so 210 if (e instanceof DoNotRetryIOException) { 211 updateCounters(); 212 throw e; 213 } 214 // try to handle all other IOExceptions by restarting 215 // the scanner, if the second call fails, it will be rethrown 216 LOG.info("recovered from " + StringUtils.stringifyException(e)); 217 if (lastSuccessfulRow == null) { 218 LOG.warn("We are restarting the first next() invocation," 219 + " if your mapper has restarted a few other times like this" 220 + " then you should consider killing this job and investigate" 221 + " why it's taking so long."); 222 } 223 if (lastSuccessfulRow == null) { 224 restart(scan.getStartRow()); 225 } else { 226 restart(lastSuccessfulRow); 227 scanner.next(); // skip presumed already mapped row 228 } 229 value = scanner.next(); 230 if (value != null && value.isStale()) { 231 numStale++; 232 } 233 numRestarts++; 234 } 235 236 if (value != null && value.size() > 0) { 237 key.set(value.getRow()); 238 lastSuccessfulRow = key.get(); 239 return true; 240 } 241 242 // Need handle cursor result 243 if (value != null && value.isCursor()) { 244 key.set(value.getCursor().getRow()); 245 lastSuccessfulRow = key.get(); 246 return true; 247 } 248 249 updateCounters(); 250 return false; 251 } catch (IOException ioe) { 252 updateCounters(); 253 if (logScannerActivity) { 254 long now = EnvironmentEdgeManager.currentTime(); 255 LOG.info("Mapper took {}ms to process {} rows", (now - timestamp), rowcount); 256 LOG.info(ioe.toString(), ioe); 257 String lastRow = 258 lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow); 259 LOG.info("lastSuccessfulRow=" + lastRow); 260 } 261 throw ioe; 262 } 263 } 264 265 /** 266 * If hbase runs on new version of mapreduce, RecordReader has access to counters thus can update 267 * counters based on scanMetrics. If hbase runs on old version of mapreduce, it won't be able to 268 * get access to counters and TableRecorderReader can't update counter values. 269 */ 270 private void updateCounters() { 271 ScanMetrics scanMetrics = scanner.getScanMetrics(); 272 if (scanMetrics == null) { 273 return; 274 } 275 276 updateCounters(scanMetrics, numRestarts, context, numStale); 277 } 278 279 /** 280 * @deprecated since 2.4.0 and 2.3.2, will be removed in 4.0.0 Use 281 * {@link #updateCounters(ScanMetrics, long, TaskAttemptContext, long)} instead. 282 */ 283 @Deprecated 284 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 285 Method getCounter, TaskAttemptContext context, long numStale) { 286 updateCounters(scanMetrics, numScannerRestarts, context, numStale); 287 } 288 289 protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, 290 TaskAttemptContext context, long numStale) { 291 // we can get access to counters only if hbase uses new mapreduce APIs 292 if (context == null) { 293 return; 294 } 295 296 for (Map.Entry<String, Long> entry : scanMetrics.getMetricsMap().entrySet()) { 297 Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, entry.getKey()); 298 if (counter != null) { 299 counter.increment(entry.getValue()); 300 } 301 } 302 if (numScannerRestarts != 0L) { 303 Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS"); 304 if (counter != null) { 305 counter.increment(numScannerRestarts); 306 } 307 } 308 if (numStale != 0L) { 309 Counter counter = context.getCounter(HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE"); 310 if (counter != null) { 311 counter.increment(numStale); 312 } 313 } 314 } 315 316 /** 317 * The current progress of the record reader through its data. 318 * @return A number between 0.0 and 1.0, the fraction of the data read. 319 */ 320 public float getProgress() { 321 // Depends on the total number of tuples 322 return 0; 323 } 324 325}