001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; 021 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.client.ResultScanner; 027import org.apache.hadoop.hbase.client.Scan; 028import org.apache.hadoop.hbase.client.Table; 029import org.apache.hadoop.hbase.filter.Filter; 030import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 031import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.util.StringUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Iterate over an HBase table data, return (Text, RowResult) pairs 041 */ 042@InterfaceAudience.Public 043public class TableRecordReaderImpl { 044 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 045 046 private byte[] startRow; 047 private byte[] endRow; 048 private byte[] lastSuccessfulRow; 049 private Filter trrRowFilter; 050 private ResultScanner scanner; 051 private Table htable; 052 private byte[][] trrInputColumns; 053 private long timestamp; 054 private int rowcount; 055 private boolean logScannerActivity = false; 056 private int logPerRowCount = 100; 057 058 /** 059 * Restart from survivable exceptions by creating a new scanner. 060 */ 061 public void restart(byte[] firstRow) throws IOException { 062 Scan currentScan; 063 if ((endRow != null) && (endRow.length > 0)) { 064 if (trrRowFilter != null) { 065 Scan scan = new Scan(firstRow, endRow); 066 TableInputFormat.addColumns(scan, trrInputColumns); 067 scan.setFilter(trrRowFilter); 068 scan.setCacheBlocks(false); 069 this.scanner = this.htable.getScanner(scan); 070 currentScan = scan; 071 } else { 072 LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", endRow: " 073 + Bytes.toStringBinary(endRow)); 074 Scan scan = new Scan(firstRow, endRow); 075 TableInputFormat.addColumns(scan, trrInputColumns); 076 this.scanner = this.htable.getScanner(scan); 077 currentScan = scan; 078 } 079 } else { 080 LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", no endRow"); 081 082 Scan scan = new Scan(firstRow); 083 TableInputFormat.addColumns(scan, trrInputColumns); 084 scan.setFilter(trrRowFilter); 085 this.scanner = this.htable.getScanner(scan); 086 currentScan = scan; 087 } 088 if (logScannerActivity) { 089 LOG.info("Current scan=" + currentScan.toString()); 090 timestamp = EnvironmentEdgeManager.currentTime(); 091 rowcount = 0; 092 } 093 } 094 095 /** 096 * Build the scanner. Not done in constructor to allow for extension. 097 */ 098 public void init() throws IOException { 099 restart(startRow); 100 } 101 102 byte[] getStartRow() { 103 return this.startRow; 104 } 105 106 /** 107 * @param htable the {@link org.apache.hadoop.hbase.HTableDescriptor} to scan. 108 */ 109 public void setHTable(Table htable) { 110 Configuration conf = htable.getConfiguration(); 111 logScannerActivity = conf.getBoolean( 112 "hbase.client.log.scanner.activity" /* ScannerCallable.LOG_SCANNER_ACTIVITY */, false); 113 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 114 this.htable = htable; 115 } 116 117 /** 118 * @param inputColumns the columns to be placed in {@link Result}. 119 */ 120 public void setInputColumns(final byte[][] inputColumns) { 121 this.trrInputColumns = inputColumns; 122 } 123 124 /** 125 * @param startRow the first row in the split 126 */ 127 public void setStartRow(final byte[] startRow) { 128 this.startRow = startRow; 129 } 130 131 /** 132 * @param endRow the last row in the split 133 */ 134 public void setEndRow(final byte[] endRow) { 135 this.endRow = endRow; 136 } 137 138 /** 139 * @param rowFilter the {@link Filter} to be used. 140 */ 141 public void setRowFilter(Filter rowFilter) { 142 this.trrRowFilter = rowFilter; 143 } 144 145 public void close() { 146 if (this.scanner != null) { 147 this.scanner.close(); 148 } 149 try { 150 this.htable.close(); 151 } catch (IOException ioe) { 152 LOG.warn("Error closing table", ioe); 153 } 154 } 155 156 /** 157 * n * 158 * @see org.apache.hadoop.mapred.RecordReader#createKey() 159 */ 160 public ImmutableBytesWritable createKey() { 161 return new ImmutableBytesWritable(); 162 } 163 164 /** 165 * n * 166 * @see org.apache.hadoop.mapred.RecordReader#createValue() 167 */ 168 public Result createValue() { 169 return new Result(); 170 } 171 172 public long getPos() { 173 // This should be the ordinal tuple in the range; 174 // not clear how to calculate... 175 return 0; 176 } 177 178 public float getProgress() { 179 // Depends on the total number of tuples and getPos 180 return 0; 181 } 182 183 /** 184 * @param key HStoreKey as input key. 185 * @param value MapWritable as input value 186 * @return true if there was more data 187 */ 188 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 189 Result result; 190 try { 191 try { 192 result = this.scanner.next(); 193 if (logScannerActivity) { 194 rowcount++; 195 if (rowcount >= logPerRowCount) { 196 long now = EnvironmentEdgeManager.currentTime(); 197 LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows"); 198 timestamp = now; 199 rowcount = 0; 200 } 201 } 202 } catch (IOException e) { 203 // do not retry if the exception tells us not to do so 204 if (e instanceof DoNotRetryIOException) { 205 throw e; 206 } 207 // try to handle all other IOExceptions by restarting 208 // the scanner, if the second call fails, it will be rethrown 209 LOG.debug("recovered from " + StringUtils.stringifyException(e)); 210 if (lastSuccessfulRow == null) { 211 LOG.warn("We are restarting the first next() invocation," 212 + " if your mapper has restarted a few other times like this" 213 + " then you should consider killing this job and investigate" 214 + " why it's taking so long."); 215 } 216 if (lastSuccessfulRow == null) { 217 restart(startRow); 218 } else { 219 restart(lastSuccessfulRow); 220 this.scanner.next(); // skip presumed already mapped row 221 } 222 result = this.scanner.next(); 223 } 224 225 if (result != null && result.size() > 0) { 226 key.set(result.getRow()); 227 lastSuccessfulRow = key.get(); 228 value.copyFrom(result); 229 return true; 230 } 231 return false; 232 } catch (IOException ioe) { 233 if (logScannerActivity) { 234 long now = EnvironmentEdgeManager.currentTime(); 235 LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows"); 236 LOG.info(ioe.toString(), ioe); 237 String lastRow = 238 lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow); 239 LOG.info("lastSuccessfulRow=" + lastRow); 240 } 241 throw ioe; 242 } 243 } 244}