001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.text.MessageFormat; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.apache.hadoop.hbase.HRegionInfo; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.RegionLocator; 034import org.apache.hadoop.hbase.client.Result; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.Pair; 040import org.apache.hadoop.mapreduce.InputFormat; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.JobContext; 043import org.apache.hadoop.mapreduce.RecordReader; 044import org.apache.hadoop.mapreduce.TaskAttemptContext; 045 046import java.util.Map; 047import java.util.HashMap; 048import java.util.Iterator; 049/** 050 * A base for {@link MultiTableInputFormat}s. Receives a list of 051 * {@link Scan} instances that define the input tables and 052 * filters etc. Subclasses may use other TableRecordReader implementations. 053 */ 054@InterfaceAudience.Public 055public abstract class MultiTableInputFormatBase extends 056 InputFormat<ImmutableBytesWritable, Result> { 057 058 private static final Logger LOG = LoggerFactory.getLogger(MultiTableInputFormatBase.class); 059 060 /** Holds the set of scans used to define the input. */ 061 private List<Scan> scans; 062 063 /** The reader scanning the table, can be a custom one. */ 064 private TableRecordReader tableRecordReader = null; 065 066 /** 067 * Builds a TableRecordReader. If no TableRecordReader was provided, uses the 068 * default. 069 * 070 * @param split The split to work with. 071 * @param context The current context. 072 * @return The newly created record reader. 073 * @throws IOException When creating the reader fails. 074 * @throws InterruptedException when record reader initialization fails 075 * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader( 076 * org.apache.hadoop.mapreduce.InputSplit, 077 * org.apache.hadoop.mapreduce.TaskAttemptContext) 078 */ 079 @Override 080 public RecordReader<ImmutableBytesWritable, Result> createRecordReader( 081 InputSplit split, TaskAttemptContext context) 082 throws IOException, InterruptedException { 083 TableSplit tSplit = (TableSplit) split; 084 LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength())); 085 086 if (tSplit.getTable() == null) { 087 throw new IOException("Cannot create a record reader because of a" 088 + " previous error. Please look at the previous logs lines from" 089 + " the task's full log for more details."); 090 } 091 final Connection connection = ConnectionFactory.createConnection(context.getConfiguration()); 092 Table table = connection.getTable(tSplit.getTable()); 093 094 if (this.tableRecordReader == null) { 095 this.tableRecordReader = new TableRecordReader(); 096 } 097 final TableRecordReader trr = this.tableRecordReader; 098 099 try { 100 Scan sc = tSplit.getScan(); 101 sc.setStartRow(tSplit.getStartRow()); 102 sc.setStopRow(tSplit.getEndRow()); 103 trr.setScan(sc); 104 trr.setTable(table); 105 return new RecordReader<ImmutableBytesWritable, Result>() { 106 107 @Override 108 public void close() throws IOException { 109 trr.close(); 110 connection.close(); 111 } 112 113 @Override 114 public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { 115 return trr.getCurrentKey(); 116 } 117 118 @Override 119 public Result getCurrentValue() throws IOException, InterruptedException { 120 return trr.getCurrentValue(); 121 } 122 123 @Override 124 public float getProgress() throws IOException, InterruptedException { 125 return trr.getProgress(); 126 } 127 128 @Override 129 public void initialize(InputSplit inputsplit, TaskAttemptContext context) 130 throws IOException, InterruptedException { 131 trr.initialize(inputsplit, context); 132 } 133 134 @Override 135 public boolean nextKeyValue() throws IOException, InterruptedException { 136 return trr.nextKeyValue(); 137 } 138 }; 139 } catch (IOException ioe) { 140 // If there is an exception make sure that all 141 // resources are closed and released. 142 trr.close(); 143 connection.close(); 144 throw ioe; 145 } 146 } 147 148 /** 149 * Calculates the splits that will serve as input for the map tasks. The 150 * number of splits matches the number of regions in a table. 151 * 152 * @param context The current job context. 153 * @return The list of input splits. 154 * @throws IOException When creating the list of splits fails. 155 * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext) 156 */ 157 @Override 158 public List<InputSplit> getSplits(JobContext context) throws IOException { 159 if (scans.isEmpty()) { 160 throw new IOException("No scans were provided."); 161 } 162 163 Map<TableName, List<Scan>> tableMaps = new HashMap<>(); 164 for (Scan scan : scans) { 165 byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME); 166 if (tableNameBytes == null) 167 throw new IOException("A scan object did not have a table name"); 168 169 TableName tableName = TableName.valueOf(tableNameBytes); 170 171 List<Scan> scanList = tableMaps.get(tableName); 172 if (scanList == null) { 173 scanList = new ArrayList<>(); 174 tableMaps.put(tableName, scanList); 175 } 176 scanList.add(scan); 177 } 178 179 List<InputSplit> splits = new ArrayList<>(); 180 Iterator iter = tableMaps.entrySet().iterator(); 181 // Make a single Connection to the Cluster and use it across all tables. 182 try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) { 183 while (iter.hasNext()) { 184 Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); 185 TableName tableName = entry.getKey(); 186 List<Scan> scanList = entry.getValue(); 187 try (Table table = conn.getTable(tableName); 188 RegionLocator regionLocator = conn.getRegionLocator(tableName)) { 189 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( 190 regionLocator, conn.getAdmin()); 191 Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); 192 for (Scan scan : scanList) { 193 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { 194 throw new IOException("Expecting at least one region for table : " 195 + tableName.getNameAsString()); 196 } 197 int count = 0; 198 199 byte[] startRow = scan.getStartRow(); 200 byte[] stopRow = scan.getStopRow(); 201 202 for (int i = 0; i < keys.getFirst().length; i++) { 203 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { 204 continue; 205 } 206 207 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || 208 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) && 209 (stopRow.length == 0 || Bytes.compareTo(stopRow, 210 keys.getFirst()[i]) > 0)) { 211 byte[] splitStart = startRow.length == 0 || 212 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ? 213 keys.getFirst()[i] : startRow; 214 byte[] splitStop = (stopRow.length == 0 || 215 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && 216 keys.getSecond()[i].length > 0 ? 217 keys.getSecond()[i] : stopRow; 218 219 HRegionLocation hregionLocation = regionLocator.getRegionLocation( 220 keys.getFirst()[i], false); 221 String regionHostname = hregionLocation.getHostname(); 222 HRegionInfo regionInfo = hregionLocation.getRegionInfo(); 223 String encodedRegionName = regionInfo.getEncodedName(); 224 long regionSize = sizeCalculator.getRegionSize( 225 regionInfo.getRegionName()); 226 227 TableSplit split = new TableSplit(table.getName(), 228 scan, splitStart, splitStop, regionHostname, 229 encodedRegionName, regionSize); 230 231 splits.add(split); 232 233 if (LOG.isDebugEnabled()) { 234 LOG.debug("getSplits: split -> " + (count++) + " -> " + split); 235 } 236 } 237 } 238 } 239 } 240 } 241 } 242 243 return splits; 244 } 245 246 /** 247 * Test if the given region is to be included in the InputSplit while 248 * splitting the regions of a table. 249 * <p> 250 * This optimization is effective when there is a specific reasoning to 251 * exclude an entire region from the M-R job, (and hence, not contributing to 252 * the InputSplit), given the start and end keys of the same. <br> 253 * Useful when we need to remember the last-processed top record and revisit 254 * the [last, current) interval for M-R processing, continuously. In addition 255 * to reducing InputSplits, reduces the load on the region server as well, due 256 * to the ordering of the keys. <br> 257 * <br> 258 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last 259 * (recent) region. <br> 260 * Override this method, if you want to bulk exclude regions altogether from 261 * M-R. By default, no region is excluded( i.e. all regions are included). 262 * 263 * @param startKey Start key of the region 264 * @param endKey End key of the region 265 * @return true, if this region needs to be included as part of the input 266 * (default). 267 */ 268 protected boolean includeRegionInSplit(final byte[] startKey, 269 final byte[] endKey) { 270 return true; 271 } 272 273 /** 274 * Allows subclasses to get the list of {@link Scan} objects. 275 */ 276 protected List<Scan> getScans() { 277 return this.scans; 278 } 279 280 /** 281 * Allows subclasses to set the list of {@link Scan} objects. 282 * 283 * @param scans The list of {@link Scan} used to define the input 284 */ 285 protected void setScans(List<Scan> scans) { 286 this.scans = scans; 287 } 288 289 /** 290 * Allows subclasses to set the {@link TableRecordReader}. 291 * 292 * @param tableRecordReader A different {@link TableRecordReader} 293 * implementation. 294 */ 295 protected void setTableRecordReader(TableRecordReader tableRecordReader) { 296 this.tableRecordReader = tableRecordReader; 297 } 298}