001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.Arrays;
024import org.apache.hadoop.hbase.HConstants;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Scan;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.io.Writable;
029import org.apache.hadoop.io.WritableUtils;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 * A table split corresponds to a key range (low, high) and an optional scanner. All references to
037 * row below refer to the key of the row.
038 */
039@InterfaceAudience.Public
040public class TableSplit extends InputSplit implements Writable, Comparable<TableSplit> {
041  /** @deprecated LOG variable would be made private. fix in hbase 3.0 */
042  @Deprecated
043  public static final Logger LOG = LoggerFactory.getLogger(TableSplit.class);
044
045  // should be < 0 (@see #readFields(DataInput))
046  // version 1 supports Scan data member
047  enum Version {
048    UNVERSIONED(0),
049    // Initial number we put on TableSplit when we introduced versioning.
050    INITIAL(-1),
051    // Added an encoded region name field for easier identification of split -> region
052    WITH_ENCODED_REGION_NAME(-2);
053
054    final int code;
055    static final Version[] byCode;
056    static {
057      byCode = Version.values();
058      for (int i = 0; i < byCode.length; i++) {
059        if (byCode[i].code != -1 * i) {
060          throw new AssertionError("Values in this enum should be descending by one");
061        }
062      }
063    }
064
065    Version(int code) {
066      this.code = code;
067    }
068
069    boolean atLeast(Version other) {
070      return code <= other.code;
071    }
072
073    static Version fromCode(int code) {
074      return byCode[code * -1];
075    }
076  }
077
078  private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
079  private TableName tableName;
080  private byte[] startRow;
081  private byte[] endRow;
082  private String regionLocation;
083  private String encodedRegionName = "";
084
085  /**
086   * The scan object may be null but the serialized form of scan is never null or empty since we
087   * serialize the scan object with default values then. Having no scanner in TableSplit doesn't
088   * necessarily mean there is no scanner for mapreduce job, it just means that we do not need to
089   * set it for each split. For example, it is not required to have a scan object for
090   * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the job
091   * conf and scanner is supposed to be same for all the splits of table.
092   */
093  private String scan = ""; // stores the serialized form of the Scan
094  private long length; // Contains estimation of region size in bytes
095
096  /** Default constructor. */
097  public TableSplit() {
098    this((TableName) null, null, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, "");
099  }
100
101  /**
102   * Creates a new instance while assigning all variables. Length of region is set to 0 Encoded name
103   * of the region is set to blank
104   * @param tableName The name of the current table.
105   * @param scan      The scan associated with this split.
106   * @param startRow  The start row of the split.
107   * @param endRow    The end row of the split.
108   * @param location  The location of the region.
109   */
110  public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow,
111    final String location) {
112    this(tableName, scan, startRow, endRow, location, 0L);
113  }
114
115  /**
116   * Creates a new instance while assigning all variables. Encoded name of region is set to blank
117   * @param tableName The name of the current table.
118   * @param scan      The scan associated with this split.
119   * @param startRow  The start row of the split.
120   * @param endRow    The end row of the split.
121   * @param location  The location of the region.
122   */
123  public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow,
124    final String location, long length) {
125    this(tableName, scan, startRow, endRow, location, "", length);
126  }
127
128  /**
129   * Creates a new instance while assigning all variables.
130   * @param tableName         The name of the current table.
131   * @param scan              The scan associated with this split.
132   * @param startRow          The start row of the split.
133   * @param endRow            The end row of the split.
134   * @param encodedRegionName The region ID.
135   * @param location          The location of the region.
136   */
137  public TableSplit(TableName tableName, Scan scan, byte[] startRow, byte[] endRow,
138    final String location, final String encodedRegionName, long length) {
139    this.tableName = tableName;
140    try {
141      this.scan = (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
142    } catch (IOException e) {
143      LOG.warn("Failed to convert Scan to String", e);
144    }
145    this.startRow = startRow;
146    this.endRow = endRow;
147    this.regionLocation = location;
148    this.encodedRegionName = encodedRegionName;
149    this.length = length;
150  }
151
152  /**
153   * Creates a new instance without a scanner. Length of region is set to 0
154   * @param tableName The name of the current table.
155   * @param startRow  The start row of the split.
156   * @param endRow    The end row of the split.
157   * @param location  The location of the region.
158   */
159  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, final String location) {
160    this(tableName, null, startRow, endRow, location);
161  }
162
163  /**
164   * Creates a new instance without a scanner.
165   * @param tableName The name of the current table.
166   * @param startRow  The start row of the split.
167   * @param endRow    The end row of the split.
168   * @param location  The location of the region.
169   * @param length    Size of region in bytes
170   */
171  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, final String location,
172    long length) {
173    this(tableName, null, startRow, endRow, location, length);
174  }
175
176  /**
177   * Returns a Scan object from the stored string representation.
178   * @return Returns a Scan object based on the stored scanner.
179   * @throws IOException throws IOException if deserialization fails
180   */
181  public Scan getScan() throws IOException {
182    return TableMapReduceUtil.convertStringToScan(this.scan);
183  }
184
185  /**
186   * Returns a scan string
187   * @return scan as string. Should be noted that this is not same as getScan().toString() because
188   *         Scan object will have the default values when empty scan string is deserialized. Thus,
189   *         getScan().toString() can never be empty
190   */
191  @InterfaceAudience.Private
192  public String getScanAsString() {
193    return this.scan;
194  }
195
196  /**
197   * Returns the table name converted to a byte array.
198   * @see #getTable()
199   * @return The table name.
200   */
201  public byte[] getTableName() {
202    return tableName.getName();
203  }
204
205  /**
206   * Returns the table name.
207   * @return The table name.
208   */
209  public TableName getTable() {
210    // It is ugly that usually to get a TableName, the method is called getTableName. We can't do
211    // that in here though because there was an existing getTableName in place already since
212    // deprecated.
213    return tableName;
214  }
215
216  /**
217   * Returns the start row.
218   * @return The start row.
219   */
220  public byte[] getStartRow() {
221    return startRow;
222  }
223
224  /**
225   * Returns the end row.
226   * @return The end row.
227   */
228  public byte[] getEndRow() {
229    return endRow;
230  }
231
232  /**
233   * Returns the region location.
234   * @return The region's location.
235   */
236  public String getRegionLocation() {
237    return regionLocation;
238  }
239
240  /**
241   * Returns the region's location as an array.
242   * @return The array containing the region location.
243   * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
244   */
245  @Override
246  public String[] getLocations() {
247    return new String[] { regionLocation };
248  }
249
250  /**
251   * Returns the region's encoded name.
252   * @return The region's encoded name.
253   */
254  public String getEncodedRegionName() {
255    return encodedRegionName;
256  }
257
258  /**
259   * Returns the length of the split.
260   * @return The length of the split.
261   * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
262   */
263  @Override
264  public long getLength() {
265    return length;
266  }
267
268  /**
269   * Reads the values of each field.
270   * @param in The input to read from.
271   * @throws IOException When reading the input fails.
272   */
273  @Override
274  public void readFields(DataInput in) throws IOException {
275    Version version = Version.UNVERSIONED;
276    // TableSplit was not versioned in the beginning.
277    // In order to introduce it now, we make use of the fact
278    // that tableName was written with Bytes.writeByteArray,
279    // which encodes the array length as a vint which is >= 0.
280    // Hence if the vint is >= 0 we have an old version and the vint
281    // encodes the length of tableName.
282    // If < 0 we just read the version and the next vint is the length.
283    // @see Bytes#readByteArray(DataInput)
284    int len = WritableUtils.readVInt(in);
285    if (len < 0) {
286      // what we just read was the version
287      version = Version.fromCode(len);
288      len = WritableUtils.readVInt(in);
289    }
290    byte[] tableNameBytes = new byte[len];
291    in.readFully(tableNameBytes);
292    tableName = TableName.valueOf(tableNameBytes);
293    startRow = Bytes.readByteArray(in);
294    endRow = Bytes.readByteArray(in);
295    regionLocation = Bytes.toString(Bytes.readByteArray(in));
296    if (version.atLeast(Version.INITIAL)) {
297      scan = Bytes.toString(Bytes.readByteArray(in));
298    }
299    length = WritableUtils.readVLong(in);
300    if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
301      encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
302    }
303  }
304
305  /**
306   * Writes the field values to the output.
307   * @param out The output to write to.
308   * @throws IOException When writing the values to the output fails.
309   */
310  @Override
311  public void write(DataOutput out) throws IOException {
312    WritableUtils.writeVInt(out, VERSION.code);
313    Bytes.writeByteArray(out, tableName.getName());
314    Bytes.writeByteArray(out, startRow);
315    Bytes.writeByteArray(out, endRow);
316    Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
317    Bytes.writeByteArray(out, Bytes.toBytes(scan));
318    WritableUtils.writeVLong(out, length);
319    Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
320  }
321
322  /**
323   * Returns the details about this instance as a string.
324   * @return The values of this instance as a string.
325   * @see java.lang.Object#toString()
326   */
327  @Override
328  public String toString() {
329    StringBuilder sb = new StringBuilder();
330    sb.append("Split(");
331    sb.append("tablename=").append(tableName);
332    // null scan input is represented by ""
333    String printScan = "";
334    if (!scan.equals("")) {
335      try {
336        // get the real scan here in toString, not the Base64 string
337        printScan = TableMapReduceUtil.convertStringToScan(scan).toString();
338      } catch (IOException e) {
339        printScan = "";
340      }
341      sb.append(", scan=").append(printScan);
342    }
343    sb.append(", startrow=").append(Bytes.toStringBinary(startRow));
344    sb.append(", endrow=").append(Bytes.toStringBinary(endRow));
345    sb.append(", regionLocation=").append(regionLocation);
346    sb.append(", regionname=").append(encodedRegionName);
347    sb.append(")");
348    return sb.toString();
349  }
350
351  /**
352   * Compares this split against the given one.
353   * @param split The split to compare to.
354   * @return The result of the comparison.
355   * @see java.lang.Comparable#compareTo(java.lang.Object)
356   */
357  @Override
358  public int compareTo(TableSplit split) {
359    // If The table name of the two splits is the same then compare start row
360    // otherwise compare based on table names
361    int tableNameComparison = getTable().compareTo(split.getTable());
362    return tableNameComparison != 0
363      ? tableNameComparison
364      : Bytes.compareTo(getStartRow(), split.getStartRow());
365  }
366
367  @Override
368  public boolean equals(Object o) {
369    if (o == null || !(o instanceof TableSplit)) {
370      return false;
371    }
372    return tableName.equals(((TableSplit) o).tableName)
373      && Bytes.equals(startRow, ((TableSplit) o).startRow)
374      && Bytes.equals(endRow, ((TableSplit) o).endRow)
375      && regionLocation.equals(((TableSplit) o).regionLocation);
376  }
377
378  @Override
379  public int hashCode() {
380    int result = tableName != null ? tableName.hashCode() : 0;
381    result = 31 * result + (scan != null ? scan.hashCode() : 0);
382    result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
383    result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
384    result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
385    result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
386    return result;
387  }
388}