001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; 021 022import java.io.IOException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.DoNotRetryIOException; 025import org.apache.hadoop.hbase.client.ConnectionConfiguration; 026import org.apache.hadoop.hbase.client.Result; 027import org.apache.hadoop.hbase.client.ResultScanner; 028import org.apache.hadoop.hbase.client.Scan; 029import org.apache.hadoop.hbase.client.Table; 030import org.apache.hadoop.hbase.filter.Filter; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.util.StringUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Iterate over an HBase table data, return (Text, RowResult) pairs 042 */ 043@InterfaceAudience.Public 044public class TableRecordReaderImpl { 045 private static final Logger LOG = LoggerFactory.getLogger(TableRecordReaderImpl.class); 046 047 private byte[] startRow; 048 private byte[] endRow; 049 private byte[] lastSuccessfulRow; 050 private Filter trrRowFilter; 051 private ResultScanner scanner; 052 private Table htable; 053 private byte[][] trrInputColumns; 054 private long timestamp; 055 private int rowcount; 056 private boolean logScannerActivity = false; 057 private int logPerRowCount = 100; 058 059 /** 060 * Restart from survivable exceptions by creating a new scanner. 061 */ 062 public void restart(byte[] firstRow) throws IOException { 063 Scan currentScan; 064 if ((endRow != null) && (endRow.length > 0)) { 065 if (trrRowFilter != null) { 066 Scan scan = new Scan().withStartRow(firstRow).withStopRow(endRow); 067 TableInputFormat.addColumns(scan, trrInputColumns); 068 scan.setFilter(trrRowFilter); 069 scan.setCacheBlocks(false); 070 this.scanner = this.htable.getScanner(scan); 071 currentScan = scan; 072 } else { 073 LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", endRow: " 074 + Bytes.toStringBinary(endRow)); 075 Scan scan = new Scan().withStartRow(firstRow).withStopRow(endRow); 076 TableInputFormat.addColumns(scan, trrInputColumns); 077 this.scanner = this.htable.getScanner(scan); 078 currentScan = scan; 079 } 080 } else { 081 LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", no endRow"); 082 083 Scan scan = new Scan().withStartRow(firstRow); 084 TableInputFormat.addColumns(scan, trrInputColumns); 085 scan.setFilter(trrRowFilter); 086 this.scanner = this.htable.getScanner(scan); 087 currentScan = scan; 088 } 089 if (logScannerActivity) { 090 LOG.info("Current scan=" + currentScan.toString()); 091 timestamp = EnvironmentEdgeManager.currentTime(); 092 rowcount = 0; 093 } 094 } 095 096 /** 097 * Build the scanner. Not done in constructor to allow for extension. 098 */ 099 public void init() throws IOException { 100 restart(startRow); 101 } 102 103 byte[] getStartRow() { 104 return this.startRow; 105 } 106 107 /** 108 * @param htable the table to scan. 109 */ 110 public void setHTable(Table htable) { 111 Configuration conf = htable.getConfiguration(); 112 logScannerActivity = conf.getBoolean(ConnectionConfiguration.LOG_SCANNER_ACTIVITY, false); 113 logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); 114 this.htable = htable; 115 } 116 117 /** 118 * @param inputColumns the columns to be placed in {@link Result}. 119 */ 120 public void setInputColumns(final byte[][] inputColumns) { 121 this.trrInputColumns = inputColumns; 122 } 123 124 /** 125 * @param startRow the first row in the split 126 */ 127 public void setStartRow(final byte[] startRow) { 128 this.startRow = startRow; 129 } 130 131 /** 132 * @param endRow the last row in the split 133 */ 134 public void setEndRow(final byte[] endRow) { 135 this.endRow = endRow; 136 } 137 138 /** 139 * @param rowFilter the {@link Filter} to be used. 140 */ 141 public void setRowFilter(Filter rowFilter) { 142 this.trrRowFilter = rowFilter; 143 } 144 145 public void close() { 146 if (this.scanner != null) { 147 this.scanner.close(); 148 } 149 try { 150 this.htable.close(); 151 } catch (IOException ioe) { 152 LOG.warn("Error closing table", ioe); 153 } 154 } 155 156 /** 157 * @see org.apache.hadoop.mapred.RecordReader#createKey() 158 */ 159 public ImmutableBytesWritable createKey() { 160 return new ImmutableBytesWritable(); 161 } 162 163 /** 164 * @see org.apache.hadoop.mapred.RecordReader#createValue() 165 */ 166 public Result createValue() { 167 return new Result(); 168 } 169 170 public long getPos() { 171 // This should be the ordinal tuple in the range; 172 // not clear how to calculate... 173 return 0; 174 } 175 176 public float getProgress() { 177 // Depends on the total number of tuples and getPos 178 return 0; 179 } 180 181 /** 182 * @param key HStoreKey as input key. 183 * @param value MapWritable as input value 184 * @return true if there was more data 185 */ 186 public boolean next(ImmutableBytesWritable key, Result value) throws IOException { 187 Result result; 188 try { 189 try { 190 result = this.scanner.next(); 191 if (logScannerActivity) { 192 rowcount++; 193 if (rowcount >= logPerRowCount) { 194 long now = EnvironmentEdgeManager.currentTime(); 195 LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows"); 196 timestamp = now; 197 rowcount = 0; 198 } 199 } 200 } catch (IOException e) { 201 // do not retry if the exception tells us not to do so 202 if (e instanceof DoNotRetryIOException) { 203 throw e; 204 } 205 // try to handle all other IOExceptions by restarting 206 // the scanner, if the second call fails, it will be rethrown 207 LOG.debug("recovered from " + StringUtils.stringifyException(e)); 208 if (lastSuccessfulRow == null) { 209 LOG.warn("We are restarting the first next() invocation," 210 + " if your mapper has restarted a few other times like this" 211 + " then you should consider killing this job and investigate" 212 + " why it's taking so long."); 213 } 214 if (lastSuccessfulRow == null) { 215 restart(startRow); 216 } else { 217 restart(lastSuccessfulRow); 218 this.scanner.next(); // skip presumed already mapped row 219 } 220 result = this.scanner.next(); 221 } 222 223 if (result != null && result.size() > 0) { 224 key.set(result.getRow()); 225 lastSuccessfulRow = key.get(); 226 value.copyFrom(result); 227 return true; 228 } 229 return false; 230 } catch (IOException ioe) { 231 if (logScannerActivity) { 232 long now = EnvironmentEdgeManager.currentTime(); 233 LOG.info("Mapper took " + (now - timestamp) + "ms to process " + rowcount + " rows"); 234 LOG.info(ioe.toString(), ioe); 235 String lastRow = 236 lastSuccessfulRow == null ? "null" : Bytes.toStringBinary(lastSuccessfulRow); 237 LOG.info("lastSuccessfulRow=" + lastRow); 238 } 239 throw ioe; 240 } 241 } 242}