001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.ResultScanner; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.filter.Filter; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.util.StringUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Iterate over an HBase table data, return (Text, RowResult) pairs 040 */ 041@InterfaceAudience.Public 042public class TableRecordReaderImpl { 043 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 044 045 private byte [] startRow; 046 private byte [] endRow; 047 private byte [] lastSuccessfulRow; 048 private Filter trrRowFilter; 049 private ResultScanner scanner; 050 private Table htable; 051 private byte [][] trrInputColumns; 052 private long timestamp; 053 private int rowcount; 054 private boolean logScannerActivity = false; 055 private int logPerRowCount = 100; 056 057 /** 058 * Restart from survivable exceptions by creating a new scanner. 059 */ 060 public void restart(byte[] firstRow) throws IOException { 061 Scan currentScan; 062 if ((endRow != null) && (endRow.length > 0)) { 063 if (trrRowFilter != null) { 064 Scan scan = new Scan(firstRow, endRow); 065 TableInputFormat.addColumns(scan, trrInputColumns); 066 scan.setFilter(trrRowFilter); 067 scan.setCacheBlocks(false); 068 this.scanner = this.htable.getScanner(scan); 069 currentScan = scan; 070 } else { 071 LOG.debug("TIFB.restart, firstRow: " + 072 Bytes.toStringBinary(firstRow) + ", endRow: " + 073 Bytes.toStringBinary(endRow)); 074 Scan scan = new Scan(firstRow, endRow); 075 TableInputFormat.addColumns(scan, trrInputColumns); 076 this.scanner = this.htable.getScanner(scan); 077 currentScan = scan; 078 } 079 } else { 080 LOG.debug("TIFB.restart, firstRow: " + 081 Bytes.toStringBinary(firstRow) + ", no endRow"); 082 083 Scan scan = new Scan(firstRow); 084 TableInputFormat.addColumns(scan, trrInputColumns); 085 scan.setFilter(trrRowFilter); 086 this.scanner = this.htable.getScanner(scan); 087 currentScan = scan; 088 } 089 if (logScannerActivity) { 090 LOG.info("Current scan=" + currentScan.toString()); 091 timestamp = System.currentTimeMillis(); 092 rowcount = 0; 093 } 094 } 095 096 /** 097 * Build the scanner. Not done in constructor to allow for extension. 098 */ 099 public void init() throws IOException { 100 restart(startRow); 101 } 102 103 byte[] getStartRow() { 104 return this.startRow; 105 } 106 /** 107 * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 108 */ 109 public void setHTable(Table htable) { 110 Configuration conf = htable.getConfiguration(); 111 logScannerActivity = conf.getBoolean( 112 "hbase.client.log.scanner.activity" /*ScannerCallable.LOG_SCANNER_ACTIVITY*/, false); 113 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 114 this.htable = htable; 115 } 116 117 /** 118 * @param inputColumns the columns to be placed in {@link Result}. 119 */ 120 public void setInputColumns(final byte [][] inputColumns) { 121 this.trrInputColumns = inputColumns; 122 } 123 124 /** 125 * @param startRow the first row in the split 126 */ 127 public void setStartRow(final byte [] startRow) { 128 this.startRow = startRow; 129 } 130 131 /** 132 * 133 * @param endRow the last row in the split 134 */ 135 public void setEndRow(final byte [] endRow) { 136 this.endRow = endRow; 137 } 138 139 /** 140 * @param rowFilter the {@link Filter} to be used. 141 */ 142 public void setRowFilter(Filter rowFilter) { 143 this.trrRowFilter = rowFilter; 144 } 145 146 public void close() { 147 if (this.scanner != null) { 148 this.scanner.close(); 149 } 150 try { 151 this.htable.close(); 152 } catch (IOException ioe) { 153 LOG.warn("Error closing table", ioe); 154 } 155 } 156 157 /** 158 * @return ImmutableBytesWritable 159 * 160 * @see org.apache.hadoop.mapred.RecordReader#createKey() 161 */ 162 public ImmutableBytesWritable createKey() { 163 return new ImmutableBytesWritable(); 164 } 165 166 /** 167 * @return RowResult 168 * 169 * @see org.apache.hadoop.mapred.RecordReader#createValue() 170 */ 171 public Result createValue() { 172 return new Result(); 173 } 174 175 public long getPos() { 176 // This should be the ordinal tuple in the range; 177 // not clear how to calculate... 178 return 0; 179 } 180 181 public float getProgress() { 182 // Depends on the total number of tuples and getPos 183 return 0; 184 } 185 186 /** 187 * @param key HStoreKey as input key. 188 * @param value MapWritable as input value 189 * @return true if there was more data 190 */ 191 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 192 Result result; 193 try { 194 try { 195 result = this.scanner.next(); 196 if (logScannerActivity) { 197 rowcount ++; 198 if (rowcount >= logPerRowCount) { 199 long now = System.currentTimeMillis(); 200 LOG.info("Mapper took " + (now-timestamp) 201 + "ms to process " + rowcount + " rows"); 202 timestamp = now; 203 rowcount = 0; 204 } 205 } 206 } catch (IOException e) { 207 // do not retry if the exception tells us not to do so 208 if (e instanceof DoNotRetryIOException) { 209 throw e; 210 } 211 // try to handle all other IOExceptions by restarting 212 // the scanner, if the second call fails, it will be rethrown 213 LOG.debug("recovered from " + StringUtils.stringifyException(e)); 214 if (lastSuccessfulRow == null) { 215 LOG.warn("We are restarting the first next() invocation," + 216 " if your mapper has restarted a few other times like this" + 217 " then you should consider killing this job and investigate" + 218 " why it's taking so long."); 219 } 220 if (lastSuccessfulRow == null) { 221 restart(startRow); 222 } else { 223 restart(lastSuccessfulRow); 224 this.scanner.next(); // skip presumed already mapped row 225 } 226 result = this.scanner.next(); 227 } 228 229 if (result != null && result.size() > 0) { 230 key.set(result.getRow()); 231 lastSuccessfulRow = key.get(); 232 value.copyFrom(result); 233 return true; 234 } 235 return false; 236 } catch (IOException ioe) { 237 if (logScannerActivity) { 238 long now = System.currentTimeMillis(); 239 LOG.info("Mapper took " + (now-timestamp) 240 + "ms to process " + rowcount + " rows"); 241 LOG.info(ioe.toString(), ioe); 242 String lastRow = lastSuccessfulRow == null ? 243 "null" : Bytes.toStringBinary(lastSuccessfulRow); 244 LOG.info("lastSuccessfulRow=" + lastRow); 245 } 246 throw ioe; 247 } 248 } 249}