001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.util.Arrays;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.io.Writable;
030import org.apache.hadoop.io.WritableUtils;
031import org.apache.hadoop.mapreduce.InputSplit;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A table split corresponds to a key range (low, high) and an optional scanner.
038 * All references to row below refer to the key of the row.
039 */
040@InterfaceAudience.Public
041public class TableSplit extends InputSplit
042  implements Writable, Comparable<TableSplit> {
043  /** @deprecated LOG variable would be made private. fix in hbase 3.0 */
044  @Deprecated
045  public static final Logger LOG = LoggerFactory.getLogger(TableSplit.class);
046
047  // should be < 0 (@see #readFields(DataInput))
048  // version 1 supports Scan data member
049  enum Version {
050    UNVERSIONED(0),
051    // Initial number we put on TableSplit when we introduced versioning.
052    INITIAL(-1),
053    // Added an encoded region name field for easier identification of split -> region
054    WITH_ENCODED_REGION_NAME(-2);
055
056    final int code;
057    static final Version[] byCode;
058    static {
059      byCode = Version.values();
060      for (int i = 0; i < byCode.length; i++) {
061        if (byCode[i].code != -1 * i) {
062          throw new AssertionError("Values in this enum should be descending by one");
063        }
064      }
065    }
066
067    Version(int code) {
068      this.code = code;
069    }
070
071    boolean atLeast(Version other) {
072      return code <= other.code;
073    }
074
075    static Version fromCode(int code) {
076      return byCode[code * -1];
077    }
078  }
079
080  private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
081  private TableName tableName;
082  private byte [] startRow;
083  private byte [] endRow;
084  private String regionLocation;
085  private String encodedRegionName = "";
086
087  /**
088   * The scan object may be null but the serialized form of scan is never null
089   * or empty since we serialize the scan object with default values then.
090   * Having no scanner in TableSplit doesn't necessarily mean there is no scanner
091   * for mapreduce job, it just means that we do not need to set it for each split.
092   * For example, it is not required to have a scan object for
093   * {@link org.apache.hadoop.hbase.mapred.TableInputFormatBase} since we use the scan from the
094   * job conf and scanner is supposed to be same for all the splits of table.
095   */
096  private String scan = ""; // stores the serialized form of the Scan
097  private long length; // Contains estimation of region size in bytes
098
099  /** Default constructor. */
100  public TableSplit() {
101    this((TableName)null, null, HConstants.EMPTY_BYTE_ARRAY,
102      HConstants.EMPTY_BYTE_ARRAY, "");
103  }
104
105  /**
106   * Creates a new instance while assigning all variables.
107   * Length of region is set to 0
108   * Encoded name of the region is set to blank
109   *
110   * @param tableName  The name of the current table.
111   * @param scan The scan associated with this split.
112   * @param startRow  The start row of the split.
113   * @param endRow  The end row of the split.
114   * @param location  The location of the region.
115   */
116  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
117                    final String location) {
118    this(tableName, scan, startRow, endRow, location, 0L);
119  }
120
121  /**
122   * Creates a new instance while assigning all variables.
123   * Encoded name of region is set to blank
124   *
125   * @param tableName  The name of the current table.
126   * @param scan The scan associated with this split.
127   * @param startRow  The start row of the split.
128   * @param endRow  The end row of the split.
129   * @param location  The location of the region.
130   */
131  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
132      final String location, long length) {
133    this(tableName, scan, startRow, endRow, location, "", length);
134  }
135
136  /**
137   * Creates a new instance while assigning all variables.
138   *
139   * @param tableName  The name of the current table.
140   * @param scan The scan associated with this split.
141   * @param startRow  The start row of the split.
142   * @param endRow  The end row of the split.
143   * @param encodedRegionName The region ID.
144   * @param location  The location of the region.
145   */
146  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
147      final String location, final String encodedRegionName, long length) {
148    this.tableName = tableName;
149    try {
150      this.scan =
151        (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
152    } catch (IOException e) {
153      LOG.warn("Failed to convert Scan to String", e);
154    }
155    this.startRow = startRow;
156    this.endRow = endRow;
157    this.regionLocation = location;
158    this.encodedRegionName = encodedRegionName;
159    this.length = length;
160  }
161
162  /**
163   * Creates a new instance without a scanner.
164   * Length of region is set to 0
165   *
166   * @param tableName The name of the current table.
167   * @param startRow The start row of the split.
168   * @param endRow The end row of the split.
169   * @param location The location of the region.
170   */
171  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
172      final String location) {
173    this(tableName, null, startRow, endRow, location);
174  }
175
176  /**
177   * Creates a new instance without a scanner.
178   *
179   * @param tableName The name of the current table.
180   * @param startRow The start row of the split.
181   * @param endRow The end row of the split.
182   * @param location The location of the region.
183   * @param length Size of region in bytes
184   */
185  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
186                    final String location, long length) {
187    this(tableName, null, startRow, endRow, location, length);
188  }
189
190  /**
191   * Returns a Scan object from the stored string representation.
192   *
193   * @return Returns a Scan object based on the stored scanner.
194   * @throws IOException throws IOException if deserialization fails
195   */
196  public Scan getScan() throws IOException {
197    return TableMapReduceUtil.convertStringToScan(this.scan);
198  }
199
200  /**
201   * Returns a scan string
202   * @return scan as string. Should be noted that this is not same as getScan().toString()
203   *    because Scan object will have the default values when empty scan string is
204   *    deserialized. Thus, getScan().toString() can never be empty
205   */
206  @InterfaceAudience.Private
207  public String getScanAsString() {
208    return this.scan;
209  }
210
211  /**
212   * Returns the table name converted to a byte array.
213   * @see #getTable()
214   * @return The table name.
215   */
216  public byte [] getTableName() {
217    return tableName.getName();
218  }
219
220  /**
221   * Returns the table name.
222   *
223   * @return The table name.
224   */
225  public TableName getTable() {
226    // It is ugly that usually to get a TableName, the method is called getTableName.  We can't do
227    // that in here though because there was an existing getTableName in place already since
228    // deprecated.
229    return tableName;
230  }
231
232  /**
233   * Returns the start row.
234   *
235   * @return The start row.
236   */
237  public byte [] getStartRow() {
238    return startRow;
239  }
240
241  /**
242   * Returns the end row.
243   *
244   * @return The end row.
245   */
246  public byte [] getEndRow() {
247    return endRow;
248  }
249
250  /**
251   * Returns the region location.
252   *
253   * @return The region's location.
254   */
255  public String getRegionLocation() {
256    return regionLocation;
257  }
258
259  /**
260   * Returns the region's location as an array.
261   *
262   * @return The array containing the region location.
263   * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
264   */
265  @Override
266  public String[] getLocations() {
267    return new String[] {regionLocation};
268  }
269
270  /**
271   * Returns the region's encoded name.
272   *
273   * @return The region's encoded name.
274   */
275  public String getEncodedRegionName() {
276    return encodedRegionName;
277  }
278
279  /**
280   * Returns the length of the split.
281   *
282   * @return The length of the split.
283   * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
284   */
285  @Override
286  public long getLength() {
287    return length;
288  }
289
290  /**
291   * Reads the values of each field.
292   *
293   * @param in  The input to read from.
294   * @throws IOException When reading the input fails.
295   */
296  @Override
297  public void readFields(DataInput in) throws IOException {
298    Version version = Version.UNVERSIONED;
299    // TableSplit was not versioned in the beginning.
300    // In order to introduce it now, we make use of the fact
301    // that tableName was written with Bytes.writeByteArray,
302    // which encodes the array length as a vint which is >= 0.
303    // Hence if the vint is >= 0 we have an old version and the vint
304    // encodes the length of tableName.
305    // If < 0 we just read the version and the next vint is the length.
306    // @see Bytes#readByteArray(DataInput)
307    int len = WritableUtils.readVInt(in);
308    if (len < 0) {
309      // what we just read was the version
310      version = Version.fromCode(len);
311      len = WritableUtils.readVInt(in);
312    }
313    byte[] tableNameBytes = new byte[len];
314    in.readFully(tableNameBytes);
315    tableName = TableName.valueOf(tableNameBytes);
316    startRow = Bytes.readByteArray(in);
317    endRow = Bytes.readByteArray(in);
318    regionLocation = Bytes.toString(Bytes.readByteArray(in));
319    if (version.atLeast(Version.INITIAL)) {
320      scan = Bytes.toString(Bytes.readByteArray(in));
321    }
322    length = WritableUtils.readVLong(in);
323    if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
324      encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
325    }
326  }
327
328  /**
329   * Writes the field values to the output.
330   *
331   * @param out  The output to write to.
332   * @throws IOException When writing the values to the output fails.
333   */
334  @Override
335  public void write(DataOutput out) throws IOException {
336    WritableUtils.writeVInt(out, VERSION.code);
337    Bytes.writeByteArray(out, tableName.getName());
338    Bytes.writeByteArray(out, startRow);
339    Bytes.writeByteArray(out, endRow);
340    Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
341    Bytes.writeByteArray(out, Bytes.toBytes(scan));
342    WritableUtils.writeVLong(out, length);
343    Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
344  }
345
346  /**
347   * Returns the details about this instance as a string.
348   *
349   * @return The values of this instance as a string.
350   * @see java.lang.Object#toString()
351   */
352  @Override
353  public String toString() {
354    StringBuilder sb = new StringBuilder();
355    sb.append("HBase table split(");
356    sb.append("table name: ").append(tableName);
357    // null scan input is represented by ""
358    String printScan = "";
359    if (!scan.equals("")) {
360      try {
361        // get the real scan here in toString, not the Base64 string
362        printScan = TableMapReduceUtil.convertStringToScan(scan).toString();
363      }
364      catch (IOException e) {
365        printScan = "";
366      }
367    }
368    sb.append(", scan: ").append(printScan);
369    sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
370    sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
371    sb.append(", region location: ").append(regionLocation);
372    sb.append(", encoded region name: ").append(encodedRegionName);
373    sb.append(")");
374    return sb.toString();
375  }
376
377  /**
378   * Compares this split against the given one.
379   *
380   * @param split  The split to compare to.
381   * @return The result of the comparison.
382   * @see java.lang.Comparable#compareTo(java.lang.Object)
383   */
384  @Override
385  public int compareTo(TableSplit split) {
386    // If The table name of the two splits is the same then compare start row
387    // otherwise compare based on table names
388    int tableNameComparison =
389        getTable().compareTo(split.getTable());
390    return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
391        getStartRow(), split.getStartRow());
392  }
393
394  @Override
395  public boolean equals(Object o) {
396    if (o == null || !(o instanceof TableSplit)) {
397      return false;
398    }
399    return tableName.equals(((TableSplit)o).tableName) &&
400      Bytes.equals(startRow, ((TableSplit)o).startRow) &&
401      Bytes.equals(endRow, ((TableSplit)o).endRow) &&
402      regionLocation.equals(((TableSplit)o).regionLocation);
403  }
404
405  @Override
406  public int hashCode() {
407    int result = tableName != null ? tableName.hashCode() : 0;
408    result = 31 * result + (scan != null ? scan.hashCode() : 0);
409    result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
410    result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
411    result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
412    result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
413    return result;
414  }
415}