001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.util.Arrays; 024import org.apache.hadoop.hbase.HConstants; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Scan; 027import org.apache.hadoop.hbase.util.Bytes; 028import org.apache.hadoop.io.Writable; 029import org.apache.hadoop.io.WritableUtils; 030import org.apache.hadoop.mapreduce.InputSplit; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * A table split corresponds to a key range (low, high) and an optional scanner. All references to 037 * row below refer to the key of the row. 038 */ 039@InterfaceAudience.Public 040public class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> { 041 042 private static final Logger LOG = LoggerFactory.getLogger(TableSplit.class); 043 044 // should be < 0 (@see #readFields(DataInput)) 045 // version 1 supports Scan data member 046 enum Version { 047 UNVERSIONED(0), 048 // Initial number we put on TableSplit when we introduced versioning. 049 INITIAL(-1), 050 // Added an encoded region name field for easier identification of split -> region 051 WITH_ENCODED_REGION_NAME(-2); 052 053 final int code; 054 static final Version[] byCode; 055 static { 056 byCode = Version.values(); 057 for (int i = 0; i < byCode.length; i++) { 058 if (byCode[i].code != -1 * i) { 059 throw new AssertionError("Values in this enum should be descending by one"); 060 } 061 } 062 } 063 064 Version(int code) { 065 this.code = code; 066 } 067 068 boolean atLeast(Version other) { 069 return code <= other.code; 070 } 071 072 static Version fromCode(int code) { 073 return byCode[code * -1]; 074 } 075 } 076 077 private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME; 078 private TableName tableName; 079 private byte[] startRow; 080 private byte[] endRow; 081 private String regionLocation; 082 private String encodedRegionName = ""; 083 084 /** 085 * The scan object may be null but the serialized form of scan is never null or empty since we 086 * serialize the scan object with default values then. Having no scanner in TableSplit doesn't 087 * necessarily mean there is no scanner for mapreduce job, it just means that we do not need to 088 * set it for each split. For example, it is not required to have a scan object for 089 * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the job 090 * conf and scanner is supposed to be same for all the splits of table. 091 */ 092 private String scan = ""; // stores the serialized form of the Scan 093 private long length; // Contains estimation of region size in bytes 094 095 /** Default constructor. */ 096 public TableSplit() { 097 this((TableName) null, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, ""); 098 } 099 100 /** 101 * Creates a new instance while assigning all variables. Length of region is set to 0 Encoded name 102 * of the region is set to blank 103 * @param tableName The name of the current table. 104 * @param scan The scan associated with this split. 105 * @param startRow The start row of the split. 106 * @param endRow The end row of the split. 107 * @param location The location of the region. 108 */ 109 public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow, 110 final String location) { 111 this(tableName, scan, startRow, endRow, location, 0L); 112 } 113 114 /** 115 * Creates a new instance while assigning all variables. Encoded name of region is set to blank 116 * @param tableName The name of the current table. 117 * @param scan The scan associated with this split. 118 * @param startRow The start row of the split. 119 * @param endRow The end row of the split. 120 * @param location The location of the region. 121 */ 122 public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow, 123 final String location, long length) { 124 this(tableName, scan, startRow, endRow, location, "", length); 125 } 126 127 /** 128 * Creates a new instance while assigning all variables. 129 * @param tableName The name of the current table. 130 * @param scan The scan associated with this split. 131 * @param startRow The start row of the split. 132 * @param endRow The end row of the split. 133 * @param encodedRegionName The region ID. 134 * @param location The location of the region. 135 */ 136 public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow, 137 final String location, final String encodedRegionName, long length) { 138 this.tableName = tableName; 139 try { 140 this.scan = (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan); 141 } catch (IOException e) { 142 LOG.warn("Failed to convert Scan to String", e); 143 } 144 this.startRow = startRow; 145 this.endRow = endRow; 146 this.regionLocation = location; 147 this.encodedRegionName = encodedRegionName; 148 this.length = length; 149 } 150 151 /** 152 * Creates a new instance without a scanner. Length of region is set to 0 153 * @param tableName The name of the current table. 154 * @param startRow The start row of the split. 155 * @param endRow The end row of the split. 156 * @param location The location of the region. 157 */ 158 public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, final String location) { 159 this(tableName, null, startRow, endRow, location); 160 } 161 162 /** 163 * Creates a new instance without a scanner. 164 * @param tableName The name of the current table. 165 * @param startRow The start row of the split. 166 * @param endRow The end row of the split. 167 * @param location The location of the region. 168 * @param length Size of region in bytes 169 */ 170 public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, final String location, 171 long length) { 172 this(tableName, null, startRow, endRow, location, length); 173 } 174 175 /** 176 * Returns a Scan object from the stored string representation. 177 * @return Returns a Scan object based on the stored scanner. 178 * @throws IOException throws IOException if deserialization fails 179 */ 180 public Scan getScan() throws IOException { 181 return TableMapReduceUtil.convertStringToScan(this.scan); 182 } 183 184 /** 185 * Returns a scan string 186 * @return scan as string. Should be noted that this is not same as getScan().toString() because 187 * Scan object will have the default values when empty scan string is deserialized. Thus, 188 * getScan().toString() can never be empty 189 */ 190 @InterfaceAudience.Private 191 public String getScanAsString() { 192 return this.scan; 193 } 194 195 /** 196 * Returns the table name converted to a byte array. 197 * @see #getTable() 198 * @return The table name. 199 */ 200 public byte[] getTableName() { 201 return tableName.getName(); 202 } 203 204 /** 205 * Returns the table name. 206 * @return The table name. 207 */ 208 public TableName getTable() { 209 // It is ugly that usually to get a TableName, the method is called getTableName. We can't do 210 // that in here though because there was an existing getTableName in place already since 211 // deprecated. 212 return tableName; 213 } 214 215 /** 216 * Returns the start row. 217 * @return The start row. 218 */ 219 public byte[] getStartRow() { 220 return startRow; 221 } 222 223 /** 224 * Returns the end row. 225 * @return The end row. 226 */ 227 public byte[] getEndRow() { 228 return endRow; 229 } 230 231 /** 232 * Returns the region location. 233 * @return The region's location. 234 */ 235 public String getRegionLocation() { 236 return regionLocation; 237 } 238 239 /** 240 * Returns the region's location as an array. 241 * @return The array containing the region location. 242 * @see org.apache.hadoop.mapreduce.InputSplit#getLocations() 243 */ 244 @Override 245 public String[] getLocations() { 246 return new String[] { regionLocation }; 247 } 248 249 /** 250 * Returns the region's encoded name. 251 * @return The region's encoded name. 252 */ 253 public String getEncodedRegionName() { 254 return encodedRegionName; 255 } 256 257 /** 258 * Returns the length of the split. 259 * @return The length of the split. 260 * @see org.apache.hadoop.mapreduce.InputSplit#getLength() 261 */ 262 @Override 263 public long getLength() { 264 return length; 265 } 266 267 /** 268 * Reads the values of each field. 269 * @param in The input to read from. 270 * @throws IOException When reading the input fails. 271 */ 272 @Override 273 public void readFields(DataInput in) throws IOException { 274 Version version = Version.UNVERSIONED; 275 // TableSplit was not versioned in the beginning. 276 // In order to introduce it now, we make use of the fact 277 // that tableName was written with Bytes.writeByteArray, 278 // which encodes the array length as a vint which is >= 0. 279 // Hence if the vint is >= 0 we have an old version and the vint 280 // encodes the length of tableName. 281 // If < 0 we just read the version and the next vint is the length. 282 // @see Bytes#readByteArray(DataInput) 283 int len = WritableUtils.readVInt(in); 284 if (len < 0) { 285 // what we just read was the version 286 version = Version.fromCode(len); 287 len = WritableUtils.readVInt(in); 288 } 289 byte[] tableNameBytes = new byte[len]; 290 in.readFully(tableNameBytes); 291 tableName = TableName.valueOf(tableNameBytes); 292 startRow = Bytes.readByteArray(in); 293 endRow = Bytes.readByteArray(in); 294 regionLocation = Bytes.toString(Bytes.readByteArray(in)); 295 if (version.atLeast(Version.INITIAL)) { 296 scan = Bytes.toString(Bytes.readByteArray(in)); 297 } 298 length = WritableUtils.readVLong(in); 299 if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) { 300 encodedRegionName = Bytes.toString(Bytes.readByteArray(in)); 301 } 302 } 303 304 /** 305 * Writes the field values to the output. 306 * @param out The output to write to. 307 * @throws IOException When writing the values to the output fails. 308 */ 309 @Override 310 public void write(DataOutput out) throws IOException { 311 WritableUtils.writeVInt(out, VERSION.code); 312 Bytes.writeByteArray(out, tableName.getName()); 313 Bytes.writeByteArray(out, startRow); 314 Bytes.writeByteArray(out, endRow); 315 Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); 316 Bytes.writeByteArray(out, Bytes.toBytes(scan)); 317 WritableUtils.writeVLong(out, length); 318 Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName)); 319 } 320 321 /** 322 * Returns the details about this instance as a string. 323 * @return The values of this instance as a string. 324 * @see java.lang.Object#toString() 325 */ 326 @Override 327 public String toString() { 328 StringBuilder sb = new StringBuilder(); 329 sb.append("Split("); 330 sb.append("tablename=").append(tableName); 331 // null scan input is represented by "" 332 String printScan = ""; 333 if (!scan.equals("")) { 334 try { 335 // get the real scan here in toString, not the Base64 string 336 printScan = TableMapReduceUtil.convertStringToScan(scan).toString(); 337 } catch (IOException e) { 338 printScan = ""; 339 } 340 sb.append(", scan=").append(printScan); 341 } 342 sb.append(", startrow=").append(Bytes.toStringBinary(startRow)); 343 sb.append(", endrow=").append(Bytes.toStringBinary(endRow)); 344 sb.append(", regionLocation=").append(regionLocation); 345 sb.append(", regionname=").append(encodedRegionName); 346 sb.append(")"); 347 return sb.toString(); 348 } 349 350 /** 351 * Compares this split against the given one. 352 * @param split The split to compare to. 353 * @return The result of the comparison. 354 * @see java.lang.Comparable#compareTo(java.lang.Object) 355 */ 356 @Override 357 public int compareTo(TableSplit split) { 358 // If The table name of the two splits is the same then compare start row 359 // otherwise compare based on table names 360 int tableNameComparison = getTable().compareTo(split.getTable()); 361 return tableNameComparison != 0 362 ? tableNameComparison 363 : Bytes.compareTo(getStartRow(), split.getStartRow()); 364 } 365 366 @Override 367 public boolean equals(Object o) { 368 if (o == null || !(o instanceof TableSplit)) { 369 return false; 370 } 371 return tableName.equals(((TableSplit) o).tableName) 372 && Bytes.equals(startRow, ((TableSplit) o).startRow) 373 && Bytes.equals(endRow, ((TableSplit) o).endRow) 374 && regionLocation.equals(((TableSplit) o).regionLocation); 375 } 376 377 @Override 378 public int hashCode() { 379 int result = tableName != null ? tableName.hashCode() : 0; 380 result = 31 * result + (scan != null ? scan.hashCode() : 0); 381 result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); 382 result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); 383 result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); 384 result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0); 385 return result; 386 } 387}