001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.IOException; 022import org.apache.yetus.audience.InterfaceAudience; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.ResultScanner; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.client.ScannerCallable; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.DoNotRetryIOException; 032import org.apache.hadoop.hbase.filter.Filter; 033import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 034import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.util.StringUtils; 037 038import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; 039 040/** 041 * Iterate over an HBase table data, return (Text, RowResult) pairs 042 */ 043@InterfaceAudience.Public 044public class TableRecordReaderImpl { 045 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 046 047 private byte [] startRow; 048 private byte [] endRow; 049 private byte [] lastSuccessfulRow; 050 private Filter trrRowFilter; 051 private ResultScanner scanner; 052 private Table htable; 053 private byte [][] trrInputColumns; 054 private long timestamp; 055 private int rowcount; 056 private boolean logScannerActivity = false; 057 private int logPerRowCount = 100; 058 059 /** 060 * Restart from survivable exceptions by creating a new scanner. 061 * 062 * @param firstRow 063 * @throws IOException 064 */ 065 public void restart(byte[] firstRow) throws IOException { 066 Scan currentScan; 067 if ((endRow != null) && (endRow.length > 0)) { 068 if (trrRowFilter != null) { 069 Scan scan = new Scan(firstRow, endRow); 070 TableInputFormat.addColumns(scan, trrInputColumns); 071 scan.setFilter(trrRowFilter); 072 scan.setCacheBlocks(false); 073 this.scanner = this.htable.getScanner(scan); 074 currentScan = scan; 075 } else { 076 LOG.debug("TIFB.restart, firstRow: " + 077 Bytes.toStringBinary(firstRow) + ", endRow: " + 078 Bytes.toStringBinary(endRow)); 079 Scan scan = new Scan(firstRow, endRow); 080 TableInputFormat.addColumns(scan, trrInputColumns); 081 this.scanner = this.htable.getScanner(scan); 082 currentScan = scan; 083 } 084 } else { 085 LOG.debug("TIFB.restart, firstRow: " + 086 Bytes.toStringBinary(firstRow) + ", no endRow"); 087 088 Scan scan = new Scan(firstRow); 089 TableInputFormat.addColumns(scan, trrInputColumns); 090 scan.setFilter(trrRowFilter); 091 this.scanner = this.htable.getScanner(scan); 092 currentScan = scan; 093 } 094 if (logScannerActivity) { 095 LOG.info("Current scan=" + currentScan.toString()); 096 timestamp = System.currentTimeMillis(); 097 rowcount = 0; 098 } 099 } 100 101 /** 102 * Build the scanner. Not done in constructor to allow for extension. 103 * 104 * @throws IOException 105 */ 106 public void init() throws IOException { 107 restart(startRow); 108 } 109 110 byte[] getStartRow() { 111 return this.startRow; 112 } 113 /** 114 * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 115 */ 116 public void setHTable(Table htable) { 117 Configuration conf = htable.getConfiguration(); 118 logScannerActivity = conf.getBoolean( 119 ScannerCallable.LOG_SCANNER_ACTIVITY, false); 120 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 121 this.htable = htable; 122 } 123 124 /** 125 * @param inputColumns the columns to be placed in {@link Result}. 126 */ 127 public void setInputColumns(final byte [][] inputColumns) { 128 this.trrInputColumns = inputColumns; 129 } 130 131 /** 132 * @param startRow the first row in the split 133 */ 134 public void setStartRow(final byte [] startRow) { 135 this.startRow = startRow; 136 } 137 138 /** 139 * 140 * @param endRow the last row in the split 141 */ 142 public void setEndRow(final byte [] endRow) { 143 this.endRow = endRow; 144 } 145 146 /** 147 * @param rowFilter the {@link Filter} to be used. 148 */ 149 public void setRowFilter(Filter rowFilter) { 150 this.trrRowFilter = rowFilter; 151 } 152 153 public void close() { 154 if (this.scanner != null) { 155 this.scanner.close(); 156 } 157 try { 158 this.htable.close(); 159 } catch (IOException ioe) { 160 LOG.warn("Error closing table", ioe); 161 } 162 } 163 164 /** 165 * @return ImmutableBytesWritable 166 * 167 * @see org.apache.hadoop.mapred.RecordReader#createKey() 168 */ 169 public ImmutableBytesWritable createKey() { 170 return new ImmutableBytesWritable(); 171 } 172 173 /** 174 * @return RowResult 175 * 176 * @see org.apache.hadoop.mapred.RecordReader#createValue() 177 */ 178 public Result createValue() { 179 return new Result(); 180 } 181 182 public long getPos() { 183 // This should be the ordinal tuple in the range; 184 // not clear how to calculate... 185 return 0; 186 } 187 188 public float getProgress() { 189 // Depends on the total number of tuples and getPos 190 return 0; 191 } 192 193 /** 194 * @param key HStoreKey as input key. 195 * @param value MapWritable as input value 196 * @return true if there was more data 197 * @throws IOException 198 */ 199 public boolean next(ImmutableBytesWritable key, Result value) 200 throws IOException { 201 Result result; 202 try { 203 try { 204 result = this.scanner.next(); 205 if (logScannerActivity) { 206 rowcount ++; 207 if (rowcount >= logPerRowCount) { 208 long now = System.currentTimeMillis(); 209 LOG.info("Mapper took " + (now-timestamp) 210 + "ms to process " + rowcount + " rows"); 211 timestamp = now; 212 rowcount = 0; 213 } 214 } 215 } catch (IOException e) { 216 // do not retry if the exception tells us not to do so 217 if (e instanceof DoNotRetryIOException) { 218 throw e; 219 } 220 // try to handle all other IOExceptions by restarting 221 // the scanner, if the second call fails, it will be rethrown 222 LOG.debug("recovered from " + StringUtils.stringifyException(e)); 223 if (lastSuccessfulRow == null) { 224 LOG.warn("We are restarting the first next() invocation," + 225 " if your mapper has restarted a few other times like this" + 226 " then you should consider killing this job and investigate" + 227 " why it's taking so long."); 228 } 229 if (lastSuccessfulRow == null) { 230 restart(startRow); 231 } else { 232 restart(lastSuccessfulRow); 233 this.scanner.next(); // skip presumed already mapped row 234 } 235 result = this.scanner.next(); 236 } 237 238 if (result != null && result.size() > 0) { 239 key.set(result.getRow()); 240 lastSuccessfulRow = key.get(); 241 value.copyFrom(result); 242 return true; 243 } 244 return false; 245 } catch (IOException ioe) { 246 if (logScannerActivity) { 247 long now = System.currentTimeMillis(); 248 LOG.info("Mapper took " + (now-timestamp) 249 + "ms to process " + rowcount + " rows"); 250 LOG.info(ioe.toString(), ioe); 251 String lastRow = lastSuccessfulRow == null ? 252 "null" : Bytes.toStringBinary(lastSuccessfulRow); 253 LOG.info("lastSuccessfulRow=" + lastRow); 254 } 255 throw ioe; 256 } 257 } 258}