001/**
002 * Copyright 2009 The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020package org.apache.hadoop.hbase;
021
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.nio.ByteBuffer;
026import java.util.Comparator;
027import java.util.HashMap;
028import java.util.Map;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.apache.hadoop.hbase.io.HeapSize;
033import org.apache.hadoop.hbase.io.hfile.HFile;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ClassSize;
036import org.apache.hadoop.io.RawComparator;
037import org.apache.hadoop.io.Writable;
038
039import com.google.common.primitives.Longs;
040
041/**
042 * An HBase Key/Value.  This is the fundamental HBase Type.
043 *
044 * <p>If being used client-side, the primary methods to access individual fields
045 * are {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()},
046 * {@link #getTimestamp()}, and {@link #getValue()}.  These methods allocate new
047 * byte arrays and return copies. Avoid their use server-side.
048 *
049 * <p>Instances of this class are immutable.  They do not implement Comparable
050 * but Comparators are provided.  Comparators change with context,
051 * whether user table or a catalog table comparison.  Its critical you use the
052 * appropriate comparator.  There are Comparators for KeyValue instances and
053 * then for just the Key portion of a KeyValue used mostly by {@link HFile}.
054 *
055 * <p>KeyValue wraps a byte array and takes offsets and lengths into passed
056 * array at where to start interpreting the content as KeyValue.  The KeyValue
057 * format inside a byte array is:
058 * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code>
059 * Key is further decomposed as:
060 * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier> &lt;timestamp> &lt;keytype></code>
061 * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>,
062 * column family length maximum is
063 * <code>Byte.MAX_SIZE</code>, and column qualifier + key length must
064 * be < <code>Integer.MAX_SIZE</code>.
065 * The column does not contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}
066 */
067public class KeyValue implements Writable, HeapSize, Cloneable {
068  static final Log LOG = LogFactory.getLog(KeyValue.class);
069  // TODO: Group Key-only comparators and operations into a Key class, just
070  // for neatness sake, if can figure what to call it.
071
072  /**
073   * Colon character in UTF-8
074   */
075  public static final char COLUMN_FAMILY_DELIMITER = ':';
076
077  public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
078    new byte[]{COLUMN_FAMILY_DELIMITER};
079
080  /**
081   * Comparator for plain key/values; i.e. non-catalog table key/values.
082   */
083  public static KVComparator COMPARATOR = new KVComparator();
084
085  /**
086   * Comparator for plain key; i.e. non-catalog table key.  Works on Key portion
087   * of KeyValue only.
088   */
089  public static KeyComparator KEY_COMPARATOR = new KeyComparator();
090
091  /**
092   * A {@link KVComparator} for <code>.META.</code> catalog table
093   * {@link KeyValue}s.
094   */
095  public static KVComparator META_COMPARATOR = new MetaComparator();
096
097  /**
098   * A {@link KVComparator} for <code>.META.</code> catalog table
099   * {@link KeyValue} keys.
100   */
101  public static KeyComparator META_KEY_COMPARATOR = new MetaKeyComparator();
102
103  /**
104   * A {@link KVComparator} for <code>-ROOT-</code> catalog table
105   * {@link KeyValue}s.
106   */
107  public static KVComparator ROOT_COMPARATOR = new RootComparator();
108
109  /**
110   * A {@link KVComparator} for <code>-ROOT-</code> catalog table
111   * {@link KeyValue} keys.
112   */
113  public static KeyComparator ROOT_KEY_COMPARATOR = new RootKeyComparator();
114
115  /**
116   * Get the appropriate row comparator for the specified table.
117   *
118   * Hopefully we can get rid of this, I added this here because it's replacing
119   * something in HSK.  We should move completely off of that.
120   *
121   * @param tableName  The table name.
122   * @return The comparator.
123   */
124  public static KeyComparator getRowComparator(byte [] tableName) {
125    if(Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) {
126      return ROOT_COMPARATOR.getRawComparator();
127    }
128    if(Bytes.equals(HTableDescriptor.META_TABLEDESC.getName(), tableName)) {
129      return META_COMPARATOR.getRawComparator();
130    }
131    return COMPARATOR.getRawComparator();
132  }
133
134  /** Size of the key length field in bytes*/
135  public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
136
137  /** Size of the key type field in bytes */
138  public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
139
140  /** Size of the row length field in bytes */
141  public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
142
143  /** Size of the family length field in bytes */
144  public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
145
146  /** Size of the timestamp field in bytes */
147  public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
148
149  // Size of the timestamp and type byte on end of a key -- a long + a byte.
150  public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
151
152  // Size of the length shorts and bytes in key.
153  public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
154      + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
155
156  // How far into the key the row starts at. First thing to read is the short
157  // that says how long the row is.
158  public static final int ROW_OFFSET =
159    Bytes.SIZEOF_INT /*keylength*/ +
160    Bytes.SIZEOF_INT /*valuelength*/;
161
162  // Size of the length ints in a KeyValue datastructure.
163  public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
164
165  /**
166   * Key type.
167   * Has space for other key types to be added later.  Cannot rely on
168   * enum ordinals . They change if item is removed or moved.  Do our own codes.
169   */
170  public static enum Type {
171    Minimum((byte)0),
172    Put((byte)4),
173
174    Delete((byte)8),
175    DeleteColumn((byte)12),
176    DeleteFamily((byte)14),
177
178    // Maximum is used when searching; you look from maximum on down.
179    Maximum((byte)255);
180
181    private final byte code;
182
183    Type(final byte c) {
184      this.code = c;
185    }
186
187    public byte getCode() {
188      return this.code;
189    }
190
191    /**
192     * Cannot rely on enum ordinals . They change if item is removed or moved.
193     * Do our own codes.
194     * @param b
195     * @return Type associated with passed code.
196     */
197    public static Type codeToType(final byte b) {
198      for (Type t : Type.values()) {
199        if (t.getCode() == b) {
200          return t;
201        }
202      }
203      throw new RuntimeException("Unknown code " + b);
204    }
205  }
206
207  /**
208   * Lowest possible key.
209   * Makes a Key with highest possible Timestamp, empty row and column.  No
210   * key can be equal or lower than this one in memstore or in store file.
211   */
212  public static final KeyValue LOWESTKEY =
213    new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
214
215  private byte [] bytes = null;
216  private int offset = 0;
217  private int length = 0;
218
219  /**
220   * @return True if a delete type, a {@link KeyValue.Type#Delete} or
221   * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
222   * KeyValue type.
223   */
224  public static boolean isDelete(byte t) {
225    return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
226  }
227
228  /** Here be dragons **/
229
230  // used to achieve atomic operations in the memstore.
231  public long getMemstoreTS() {
232    return memstoreTS;
233  }
234
235  public void setMemstoreTS(long memstoreTS) {
236    this.memstoreTS = memstoreTS;
237  }
238
239  // default value is 0, aka DNC
240  private long memstoreTS = 0;
241
242  /** Dragon time over, return to normal business */
243
244
245  /** Writable Constructor -- DO NOT USE */
246  public KeyValue() {}
247
248  /**
249   * Creates a KeyValue from the start of the specified byte array.
250   * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
251   * @param bytes byte array
252   */
253  public KeyValue(final byte [] bytes) {
254    this(bytes, 0);
255  }
256
257  /**
258   * Creates a KeyValue from the specified byte array and offset.
259   * Presumes <code>bytes</code> content starting at <code>offset</code> is
260   * formatted as a KeyValue blob.
261   * @param bytes byte array
262   * @param offset offset to start of KeyValue
263   */
264  public KeyValue(final byte [] bytes, final int offset) {
265    this(bytes, offset, getLength(bytes, offset));
266  }
267
268  /**
269   * Creates a KeyValue from the specified byte array, starting at offset, and
270   * for length <code>length</code>.
271   * @param bytes byte array
272   * @param offset offset to start of the KeyValue
273   * @param length length of the KeyValue
274   */
275  public KeyValue(final byte [] bytes, final int offset, final int length) {
276    this.bytes = bytes;
277    this.offset = offset;
278    this.length = length;
279  }
280
281  /** Constructors that build a new backing byte array from fields */
282
283  /**
284   * Constructs KeyValue structure filled with null value.
285   * Sets type to {@link KeyValue.Type#Maximum}
286   * @param row - row key (arbitrary byte array)
287   * @param timestamp
288   */
289  public KeyValue(final byte [] row, final long timestamp) {
290    this(row, timestamp, Type.Maximum);
291  }
292
293  /**
294   * Constructs KeyValue structure filled with null value.
295   * @param row - row key (arbitrary byte array)
296   * @param timestamp
297   */
298  public KeyValue(final byte [] row, final long timestamp, Type type) {
299    this(row, null, null, timestamp, type, null);
300  }
301
302  /**
303   * Constructs KeyValue structure filled with null value.
304   * Sets type to {@link KeyValue.Type#Maximum}
305   * @param row - row key (arbitrary byte array)
306   * @param family family name
307   * @param qualifier column qualifier
308   */
309  public KeyValue(final byte [] row, final byte [] family,
310      final byte [] qualifier) {
311    this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
312  }
313
314  /**
315   * Constructs KeyValue structure filled with null value.
316   * @param row - row key (arbitrary byte array)
317   * @param family family name
318   * @param qualifier column qualifier
319   */
320  public KeyValue(final byte [] row, final byte [] family,
321      final byte [] qualifier, final byte [] value) {
322    this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
323  }
324
325  /**
326   * Constructs KeyValue structure filled with specified values.
327   * @param row row key
328   * @param family family name
329   * @param qualifier column qualifier
330   * @param timestamp version timestamp
331   * @param type key type
332   * @throws IllegalArgumentException
333   */
334  public KeyValue(final byte[] row, final byte[] family,
335      final byte[] qualifier, final long timestamp, Type type) {
336    this(row, family, qualifier, timestamp, type, null);
337  }
338
339  /**
340   * Constructs KeyValue structure filled with specified values.
341   * @param row row key
342   * @param family family name
343   * @param qualifier column qualifier
344   * @param timestamp version timestamp
345   * @param value column value
346   * @throws IllegalArgumentException
347   */
348  public KeyValue(final byte[] row, final byte[] family,
349      final byte[] qualifier, final long timestamp, final byte[] value) {
350    this(row, family, qualifier, timestamp, Type.Put, value);
351  }
352
353  /**
354   * Constructs KeyValue structure filled with specified values.
355   * @param row row key
356   * @param family family name
357   * @param qualifier column qualifier
358   * @param timestamp version timestamp
359   * @param type key type
360   * @param value column value
361   * @throws IllegalArgumentException
362   */
363  public KeyValue(final byte[] row, final byte[] family,
364      final byte[] qualifier, final long timestamp, Type type,
365      final byte[] value) {
366    this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
367        timestamp, type, value, 0, value==null ? 0 : value.length);
368  }
369
370  /**
371   * Constructs KeyValue structure filled with specified values.
372   * @param row row key
373   * @param family family name
374   * @param qualifier column qualifier
375   * @param qoffset qualifier offset
376   * @param qlength qualifier length
377   * @param timestamp version timestamp
378   * @param type key type
379   * @param value column value
380   * @param voffset value offset
381   * @param vlength value length
382   * @throws IllegalArgumentException
383   */
384  public KeyValue(byte [] row, byte [] family,
385      byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
386      byte [] value, int voffset, int vlength) {
387    this(row, 0, row==null ? 0 : row.length,
388        family, 0, family==null ? 0 : family.length,
389        qualifier, qoffset, qlength, timestamp, type,
390        value, voffset, vlength);
391  }
392
393  /**
394   * Constructs KeyValue structure filled with specified values.
395   * <p>
396   * Column is split into two fields, family and qualifier.
397   * @param row row key
398   * @param roffset row offset
399   * @param rlength row length
400   * @param family family name
401   * @param foffset family offset
402   * @param flength family length
403   * @param qualifier column qualifier
404   * @param qoffset qualifier offset
405   * @param qlength qualifier length
406   * @param timestamp version timestamp
407   * @param type key type
408   * @param value column value
409   * @param voffset value offset
410   * @param vlength value length
411   * @throws IllegalArgumentException
412   */
413  public KeyValue(final byte [] row, final int roffset, final int rlength,
414      final byte [] family, final int foffset, final int flength,
415      final byte [] qualifier, final int qoffset, final int qlength,
416      final long timestamp, final Type type,
417      final byte [] value, final int voffset, final int vlength) {
418    this.bytes = createByteArray(row, roffset, rlength,
419        family, foffset, flength, qualifier, qoffset, qlength,
420        timestamp, type, value, voffset, vlength);
421    this.length = bytes.length;
422    this.offset = 0;
423  }
424
425  /**
426   * Constructs an empty KeyValue structure, with specified sizes.
427   * This can be used to partially fill up KeyValues.
428   * <p>
429   * Column is split into two fields, family and qualifier.
430   * @param rlength row length
431   * @param flength family length
432   * @param qlength qualifier length
433   * @param timestamp version timestamp
434   * @param type key type
435   * @param vlength value length
436   * @throws IllegalArgumentException
437   */
438  public KeyValue(final int rlength,
439      final int flength,
440      final int qlength,
441      final long timestamp, final Type type,
442      final int vlength) {
443    this.bytes = createEmptyByteArray(rlength,
444        flength, qlength,
445        timestamp, type, vlength);
446    this.length = bytes.length;
447    this.offset = 0;
448  }
449
450  /**
451   * Create an empty byte[] representing a KeyValue
452   * All lengths are preset and can be filled in later.
453   * @param rlength
454   * @param flength
455   * @param qlength
456   * @param timestamp
457   * @param type
458   * @param vlength
459   * @return The newly created byte array.
460   */
461  static byte[] createEmptyByteArray(final int rlength, int flength,
462      int qlength, final long timestamp, final Type type, int vlength) {
463    if (rlength > Short.MAX_VALUE) {
464      throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
465    }
466    if (flength > Byte.MAX_VALUE) {
467      throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
468    }
469    // Qualifier length
470    if (qlength > Integer.MAX_VALUE - rlength - flength) {
471      throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
472    }
473    // Key length
474    long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
475    if (longkeylength > Integer.MAX_VALUE) {
476      throw new IllegalArgumentException("keylength " + longkeylength + " > " +
477        Integer.MAX_VALUE);
478    }
479    int keylength = (int)longkeylength;
480    // Value length
481    if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
482      throw new IllegalArgumentException("Valuer > " +
483          HConstants.MAXIMUM_VALUE_LENGTH);
484    }
485
486    // Allocate right-sized byte array.
487    byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
488    // Write the correct size markers
489    int pos = 0;
490    pos = Bytes.putInt(bytes, pos, keylength);
491    pos = Bytes.putInt(bytes, pos, vlength);
492    pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
493    pos += rlength;
494    pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
495    pos += flength + qlength;
496    pos = Bytes.putLong(bytes, pos, timestamp);
497    pos = Bytes.putByte(bytes, pos, type.getCode());
498    return bytes;
499  }
500
501  /**
502   * Write KeyValue format into a byte array.
503   *
504   * @param row row key
505   * @param roffset row offset
506   * @param rlength row length
507   * @param family family name
508   * @param foffset family offset
509   * @param flength family length
510   * @param qualifier column qualifier
511   * @param qoffset qualifier offset
512   * @param qlength qualifier length
513   * @param timestamp version timestamp
514   * @param type key type
515   * @param value column value
516   * @param voffset value offset
517   * @param vlength value length
518   * @return The newly created byte array.
519   */
520  static byte [] createByteArray(final byte [] row, final int roffset,
521      final int rlength, final byte [] family, final int foffset, int flength,
522      final byte [] qualifier, final int qoffset, int qlength,
523      final long timestamp, final Type type,
524      final byte [] value, final int voffset, int vlength) {
525    if (rlength > Short.MAX_VALUE) {
526      throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
527    }
528    if (row == null) {
529      throw new IllegalArgumentException("Row is null");
530    }
531    // Family length
532    flength = family == null ? 0 : flength;
533    if (flength > Byte.MAX_VALUE) {
534      throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
535    }
536    // Qualifier length
537    qlength = qualifier == null ? 0 : qlength;
538    if (qlength > Integer.MAX_VALUE - rlength - flength) {
539      throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
540    }
541    // Key length
542    long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
543    if (longkeylength > Integer.MAX_VALUE) {
544      throw new IllegalArgumentException("keylength " + longkeylength + " > " +
545        Integer.MAX_VALUE);
546    }
547    int keylength = (int)longkeylength;
548    // Value length
549    vlength = value == null? 0 : vlength;
550    if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
551      throw new IllegalArgumentException("Valuer > " +
552          HConstants.MAXIMUM_VALUE_LENGTH);
553    }
554
555    // Allocate right-sized byte array.
556    byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
557    // Write key, value and key row length.
558    int pos = 0;
559    pos = Bytes.putInt(bytes, pos, keylength);
560    pos = Bytes.putInt(bytes, pos, vlength);
561    pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
562    pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
563    pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
564    if(flength != 0) {
565      pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
566    }
567    if(qlength != 0) {
568      pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
569    }
570    pos = Bytes.putLong(bytes, pos, timestamp);
571    pos = Bytes.putByte(bytes, pos, type.getCode());
572    if (value != null && value.length > 0) {
573      pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
574    }
575    return bytes;
576  }
577
578  /**
579   * Write KeyValue format into a byte array.
580   * <p>
581   * Takes column in the form <code>family:qualifier</code>
582   * @param row - row key (arbitrary byte array)
583   * @param roffset
584   * @param rlength
585   * @param column
586   * @param coffset
587   * @param clength
588   * @param timestamp
589   * @param type
590   * @param value
591   * @param voffset
592   * @param vlength
593   * @return The newly created byte array.
594   */
595  static byte [] createByteArray(final byte [] row, final int roffset,
596        final int rlength,
597      final byte [] column, final int coffset, int clength,
598      final long timestamp, final Type type,
599      final byte [] value, final int voffset, int vlength) {
600    // If column is non-null, figure where the delimiter is at.
601    int delimiteroffset = 0;
602    if (column != null && column.length > 0) {
603      delimiteroffset = getFamilyDelimiterIndex(column, coffset, clength);
604      if (delimiteroffset > Byte.MAX_VALUE) {
605        throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
606      }
607    } else {
608      return createByteArray(row,roffset,rlength,null,0,0,null,0,0,timestamp,
609          type,value,voffset,vlength);
610    }
611    int flength = delimiteroffset-coffset;
612    int qlength = clength - flength - 1;
613    return createByteArray(row, roffset, rlength, column, coffset,
614        flength, column, delimiteroffset+1, qlength, timestamp, type,
615        value, voffset, vlength);
616  }
617
618  // Needed doing 'contains' on List.  Only compares the key portion, not the
619  // value.
620  @Override
621  public boolean equals(Object other) {
622    if (!(other instanceof KeyValue)) {
623      return false;
624    }
625    KeyValue kv = (KeyValue)other;
626    // Comparing bytes should be fine doing equals test.  Shouldn't have to
627    // worry about special .META. comparators doing straight equals.
628    return Bytes.equals(getBuffer(), getKeyOffset(), getKeyLength(),
629      kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
630  }
631
632  @Override
633  public int hashCode() {
634    byte[] b = getBuffer();
635    int start = getOffset(), end = getOffset() + getLength();
636    int h = b[start++];
637    for (int i = start; i < end; i++) {
638      h = (h * 13) ^ b[i];
639    }
640    return h;
641  }
642
643  //---------------------------------------------------------------------------
644  //
645  //  KeyValue cloning
646  //
647  //---------------------------------------------------------------------------
648
649  /**
650   * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
651   * @return Fully copied clone of this KeyValue
652   */
653  public KeyValue clone() {
654    byte [] b = new byte[this.length];
655    System.arraycopy(this.bytes, this.offset, b, 0, this.length);
656    KeyValue ret = new KeyValue(b, 0, b.length);
657    // Important to clone the memstoreTS as well - otherwise memstore's
658    // update-in-place methods (eg increment) will end up creating
659    // new entries
660    ret.setMemstoreTS(memstoreTS);
661    return ret;
662  }
663
664  /**
665   * Creates a deep copy of this KeyValue, re-allocating the buffer.
666   * Same function as {@link #clone()}.  Added for clarity vs shallowCopy()
667   * @return Deep copy of this KeyValue
668   */
669  public KeyValue deepCopy() {
670    return clone();
671  }
672
673  /**
674   * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
675   * http://en.wikipedia.org/wiki/Object_copy
676   * @return Shallow copy of this KeyValue
677   */
678  public KeyValue shallowCopy() {
679    KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
680    shallowCopy.setMemstoreTS(this.memstoreTS);
681    return shallowCopy;
682  }
683
684  //---------------------------------------------------------------------------
685  //
686  //  String representation
687  //
688  //---------------------------------------------------------------------------
689
690  public String toString() {
691    if (this.bytes == null || this.bytes.length == 0) {
692      return "empty";
693    }
694    return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
695      "/vlen=" + getValueLength() + "/ts=" + memstoreTS;
696  }
697
698  /**
699   * @param k Key portion of a KeyValue.
700   * @return Key as a String.
701   */
702  public static String keyToString(final byte [] k) {
703    if (k == null) {
704      return "";
705    }
706    return keyToString(k, 0, k.length);
707  }
708
709  /**
710   * Use for logging.
711   * @param b Key portion of a KeyValue.
712   * @param o Offset to start of key
713   * @param l Length of key.
714   * @return Key as a String.
715   */
716  /**
717   * Produces a string map for this key/value pair. Useful for programmatic use
718   * and manipulation of the data stored in an HLogKey, for example, printing
719   * as JSON. Values are left out due to their tendency to be large. If needed,
720   * they can be added manually.
721   *
722   * @return the Map<String,?> containing data from this key
723   */
724  public Map<String, Object> toStringMap() {
725    Map<String, Object> stringMap = new HashMap<String, Object>();
726    stringMap.put("row", Bytes.toStringBinary(getRow()));
727    stringMap.put("family", Bytes.toStringBinary(getFamily()));
728    stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
729    stringMap.put("timestamp", getTimestamp());
730    stringMap.put("vlen", getValueLength());
731    return stringMap;
732  }
733
734  public static String keyToString(final byte [] b, final int o, final int l) {
735    if (b == null) return "";
736    int rowlength = Bytes.toShort(b, o);
737    String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
738    int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
739    int familylength = b[columnoffset - 1];
740    int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
741    String family = familylength == 0? "":
742      Bytes.toStringBinary(b, columnoffset, familylength);
743    String qualifier = columnlength == 0? "":
744      Bytes.toStringBinary(b, columnoffset + familylength,
745      columnlength - familylength);
746    long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
747    String timestampStr = humanReadableTimestamp(timestamp);
748    byte type = b[o + l - 1];
749    return row + "/" + family +
750      (family != null && family.length() > 0? ":" :"") +
751      qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
752  }
753
754  public static String humanReadableTimestamp(final long timestamp) {
755    if (timestamp == HConstants.LATEST_TIMESTAMP) {
756      return "LATEST_TIMESTAMP";
757    }
758    if (timestamp == HConstants.OLDEST_TIMESTAMP) {
759      return "OLDEST_TIMESTAMP";
760    }
761    return String.valueOf(timestamp);
762  }
763
764  //---------------------------------------------------------------------------
765  //
766  //  Public Member Accessors
767  //
768  //---------------------------------------------------------------------------
769
770  /**
771   * @return The byte array backing this KeyValue.
772   */
773  public byte [] getBuffer() {
774    return this.bytes;
775  }
776
777  /**
778   * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
779   */
780  public int getOffset() {
781    return this.offset;
782  }
783
784  /**
785   * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
786   */
787  public int getLength() {
788    return length;
789  }
790
791  //---------------------------------------------------------------------------
792  //
793  //  Length and Offset Calculators
794  //
795  //---------------------------------------------------------------------------
796
797  /**
798   * Determines the total length of the KeyValue stored in the specified
799   * byte array and offset.  Includes all headers.
800   * @param bytes byte array
801   * @param offset offset to start of the KeyValue
802   * @return length of entire KeyValue, in bytes
803   */
804  private static int getLength(byte [] bytes, int offset) {
805    return ROW_OFFSET +
806        Bytes.toInt(bytes, offset) +
807        Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
808  }
809
810  /**
811   * @return Key offset in backing buffer..
812   */
813  public int getKeyOffset() {
814    return this.offset + ROW_OFFSET;
815  }
816
817  public String getKeyString() {
818    return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
819  }
820
821  /**
822   * @return Length of key portion.
823   */
824  public int getKeyLength() {
825    return Bytes.toInt(this.bytes, this.offset);
826  }
827
828  /**
829   * @return Value offset
830   */
831  public int getValueOffset() {
832    return getKeyOffset() + getKeyLength();
833  }
834
835  /**
836   * @return Value length
837   */
838  public int getValueLength() {
839    return Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
840  }
841
842  /**
843   * @return Row offset
844   */
845  public int getRowOffset() {
846    return getKeyOffset() + Bytes.SIZEOF_SHORT;
847  }
848
849  /**
850   * @return Row length
851   */
852  public short getRowLength() {
853    return Bytes.toShort(this.bytes, getKeyOffset());
854  }
855
856  /**
857   * @return Family offset
858   */
859  public int getFamilyOffset() {
860    return getFamilyOffset(getRowLength());
861  }
862
863  /**
864   * @return Family offset
865   */
866  public int getFamilyOffset(int rlength) {
867    return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
868  }
869
870  /**
871   * @return Family length
872   */
873  public byte getFamilyLength() {
874    return getFamilyLength(getFamilyOffset());
875  }
876
877  /**
878   * @return Family length
879   */
880  public byte getFamilyLength(int foffset) {
881    return this.bytes[foffset-1];
882  }
883
884  /**
885   * @return Qualifier offset
886   */
887  public int getQualifierOffset() {
888    return getQualifierOffset(getFamilyOffset());
889  }
890
891  /**
892   * @return Qualifier offset
893   */
894  public int getQualifierOffset(int foffset) {
895    return foffset + getFamilyLength(foffset);
896  }
897
898  /**
899   * @return Qualifier length
900   */
901  public int getQualifierLength() {
902    return getQualifierLength(getRowLength(),getFamilyLength());
903  }
904
905  /**
906   * @return Qualifier length
907   */
908  public int getQualifierLength(int rlength, int flength) {
909    return getKeyLength() -
910      (KEY_INFRASTRUCTURE_SIZE + rlength + flength);
911  }
912
913  /**
914   * @return Column (family + qualifier) length
915   */
916  public int getTotalColumnLength() {
917    int rlength = getRowLength();
918    int foffset = getFamilyOffset(rlength);
919    return getTotalColumnLength(rlength,foffset);
920  }
921
922  /**
923   * @return Column (family + qualifier) length
924   */
925  public int getTotalColumnLength(int rlength, int foffset) {
926    int flength = getFamilyLength(foffset);
927    int qlength = getQualifierLength(rlength,flength);
928    return flength + qlength;
929  }
930
931  /**
932   * @return Timestamp offset
933   */
934  public int getTimestampOffset() {
935    return getTimestampOffset(getKeyLength());
936  }
937
938  /**
939   * @param keylength Pass if you have it to save on a int creation.
940   * @return Timestamp offset
941   */
942  public int getTimestampOffset(final int keylength) {
943    return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
944  }
945
946  /**
947   * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
948   */
949  public boolean isLatestTimestamp() {
950    return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
951      HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
952  }
953
954  /**
955   * @return True if this is a "fake" KV created for internal seeking purposes,
956   * which should not be seen by user code
957   */
958  public boolean isInternal() {
959    byte type = getType();
960    return type == Type.Minimum.code || type == Type.Maximum.code;
961  }
962  /**
963   * @param now Time to set into <code>this</code> IFF timestamp ==
964   * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
965   * @return True is we modified this.
966   */
967  public boolean updateLatestStamp(final byte [] now) {
968    if (this.isLatestTimestamp()) {
969      int tsOffset = getTimestampOffset();
970      System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
971      // clear cache or else getTimestamp() possibly returns an old value
972      return true;
973    }
974    return false;
975  }
976
977  //---------------------------------------------------------------------------
978  //
979  //  Methods that return copies of fields
980  //
981  //---------------------------------------------------------------------------
982
983  /**
984   * Do not use unless you have to.  Used internally for compacting and testing.
985   *
986   * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
987   * {@link #getValue()} if accessing a KeyValue client-side.
988   * @return Copy of the key portion only.
989   */
990  public byte [] getKey() {
991    int keylength = getKeyLength();
992    byte [] key = new byte[keylength];
993    System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
994    return key;
995  }
996
997  /**
998   * Returns value in a new byte array.
999   * Primarily for use client-side. If server-side, use
1000   * {@link #getBuffer()} with appropriate offsets and lengths instead to
1001   * save on allocations.
1002   * @return Value in a new byte array.
1003   */
1004  public byte [] getValue() {
1005    int o = getValueOffset();
1006    int l = getValueLength();
1007    byte [] result = new byte[l];
1008    System.arraycopy(getBuffer(), o, result, 0, l);
1009    return result;
1010  }
1011
1012  /**
1013   * Primarily for use client-side.  Returns the row of this KeyValue in a new
1014   * byte array.<p>
1015   *
1016   * If server-side, use {@link #getBuffer()} with appropriate offsets and
1017   * lengths instead.
1018   * @return Row in a new byte array.
1019   */
1020  public byte [] getRow() {
1021    int o = getRowOffset();
1022    short l = getRowLength();
1023    byte result[] = new byte[l];
1024    System.arraycopy(getBuffer(), o, result, 0, l);
1025    return result;
1026  }
1027
1028  /**
1029   *
1030   * @return Timestamp
1031   */
1032  public long getTimestamp() {
1033    return getTimestamp(getKeyLength());
1034  }
1035
1036  /**
1037   * @param keylength Pass if you have it to save on a int creation.
1038   * @return Timestamp
1039   */
1040  long getTimestamp(final int keylength) {
1041    int tsOffset = getTimestampOffset(keylength);
1042    return Bytes.toLong(this.bytes, tsOffset);
1043  }
1044
1045  /**
1046   * @return Type of this KeyValue.
1047   */
1048  public byte getType() {
1049    return getType(getKeyLength());
1050  }
1051
1052  /**
1053   * @param keylength Pass if you have it to save on a int creation.
1054   * @return Type of this KeyValue.
1055   */
1056  byte getType(final int keylength) {
1057    return this.bytes[this.offset + keylength - 1 + ROW_OFFSET];
1058  }
1059
1060  /**
1061   * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1062   * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1063   * KeyValue type.
1064   */
1065  public boolean isDelete() {
1066    return KeyValue.isDelete(getType());
1067  }
1068
1069  /**
1070   * @return True if this KV is a {@link KeyValue.Type#Delete} type.
1071   */
1072  public boolean isDeleteType() {
1073    // TODO: Fix this method name vis-a-vis isDelete!
1074    return getType() == Type.Delete.getCode();
1075  }
1076
1077  /**
1078   * @return True if this KV is a delete family type.
1079   */
1080  public boolean isDeleteFamily() {
1081    return getType() == Type.DeleteFamily.getCode();
1082  }
1083
1084  /**
1085   *
1086   * @return True if this KV is a delete family or column type.
1087   */
1088  public boolean isDeleteColumnOrFamily() {
1089    int t = getType();
1090    return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
1091  }
1092
1093  /**
1094   * Primarily for use client-side.  Returns the family of this KeyValue in a
1095   * new byte array.<p>
1096   *
1097   * If server-side, use {@link #getBuffer()} with appropriate offsets and
1098   * lengths instead.
1099   * @return Returns family. Makes a copy.
1100   */
1101  public byte [] getFamily() {
1102    int o = getFamilyOffset();
1103    int l = getFamilyLength(o);
1104    byte [] result = new byte[l];
1105    System.arraycopy(this.bytes, o, result, 0, l);
1106    return result;
1107  }
1108
1109  /**
1110   * Primarily for use client-side.  Returns the column qualifier of this
1111   * KeyValue in a new byte array.<p>
1112   *
1113   * If server-side, use {@link #getBuffer()} with appropriate offsets and
1114   * lengths instead.
1115   * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1116   * @return Returns qualifier. Makes a copy.
1117   */
1118  public byte [] getQualifier() {
1119    int o = getQualifierOffset();
1120    int l = getQualifierLength();
1121    byte [] result = new byte[l];
1122    System.arraycopy(this.bytes, o, result, 0, l);
1123    return result;
1124  }
1125
1126  //---------------------------------------------------------------------------
1127  //
1128  //  KeyValue splitter
1129  //
1130  //---------------------------------------------------------------------------
1131
1132  /**
1133   * Utility class that splits a KeyValue buffer into separate byte arrays.
1134   * <p>
1135   * Should get rid of this if we can, but is very useful for debugging.
1136   */
1137  public static class SplitKeyValue {
1138    private byte [][] split;
1139    SplitKeyValue() {
1140      this.split = new byte[6][];
1141    }
1142    public void setRow(byte [] value) { this.split[0] = value; }
1143    public void setFamily(byte [] value) { this.split[1] = value; }
1144    public void setQualifier(byte [] value) { this.split[2] = value; }
1145    public void setTimestamp(byte [] value) { this.split[3] = value; }
1146    public void setType(byte [] value) { this.split[4] = value; }
1147    public void setValue(byte [] value) { this.split[5] = value; }
1148    public byte [] getRow() { return this.split[0]; }
1149    public byte [] getFamily() { return this.split[1]; }
1150    public byte [] getQualifier() { return this.split[2]; }
1151    public byte [] getTimestamp() { return this.split[3]; }
1152    public byte [] getType() { return this.split[4]; }
1153    public byte [] getValue() { return this.split[5]; }
1154  }
1155
1156  public SplitKeyValue split() {
1157    SplitKeyValue split = new SplitKeyValue();
1158    int splitOffset = this.offset;
1159    int keyLen = Bytes.toInt(bytes, splitOffset);
1160    splitOffset += Bytes.SIZEOF_INT;
1161    int valLen = Bytes.toInt(bytes, splitOffset);
1162    splitOffset += Bytes.SIZEOF_INT;
1163    short rowLen = Bytes.toShort(bytes, splitOffset);
1164    splitOffset += Bytes.SIZEOF_SHORT;
1165    byte [] row = new byte[rowLen];
1166    System.arraycopy(bytes, splitOffset, row, 0, rowLen);
1167    splitOffset += rowLen;
1168    split.setRow(row);
1169    byte famLen = bytes[splitOffset];
1170    splitOffset += Bytes.SIZEOF_BYTE;
1171    byte [] family = new byte[famLen];
1172    System.arraycopy(bytes, splitOffset, family, 0, famLen);
1173    splitOffset += famLen;
1174    split.setFamily(family);
1175    int colLen = keyLen -
1176      (rowLen + famLen + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BYTE +
1177      Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE);
1178    byte [] qualifier = new byte[colLen];
1179    System.arraycopy(bytes, splitOffset, qualifier, 0, colLen);
1180    splitOffset += colLen;
1181    split.setQualifier(qualifier);
1182    byte [] timestamp = new byte[Bytes.SIZEOF_LONG];
1183    System.arraycopy(bytes, splitOffset, timestamp, 0, Bytes.SIZEOF_LONG);
1184    splitOffset += Bytes.SIZEOF_LONG;
1185    split.setTimestamp(timestamp);
1186    byte [] type = new byte[1];
1187    type[0] = bytes[splitOffset];
1188    splitOffset += Bytes.SIZEOF_BYTE;
1189    split.setType(type);
1190    byte [] value = new byte[valLen];
1191    System.arraycopy(bytes, splitOffset, value, 0, valLen);
1192    split.setValue(value);
1193    return split;
1194  }
1195
1196  //---------------------------------------------------------------------------
1197  //
1198  //  Compare specified fields against those contained in this KeyValue
1199  //
1200  //---------------------------------------------------------------------------
1201
1202  /**
1203   * @param family
1204   * @return True if matching families.
1205   */
1206  public boolean matchingFamily(final byte [] family) {
1207    return matchingFamily(family, 0, family.length);
1208  }
1209
1210  public boolean matchingFamily(final byte[] family, int offset, int length) {
1211    if (this.length == 0 || this.bytes.length == 0) {
1212      return false;
1213    }
1214    return Bytes.equals(family, offset, length,
1215        this.bytes, getFamilyOffset(), getFamilyLength());
1216  }
1217
1218  public boolean matchingFamily(final KeyValue other) {
1219    return matchingFamily(other.getBuffer(), other.getFamilyOffset(),
1220        other.getFamilyLength());
1221  }
1222
1223  /**
1224   * @param qualifier
1225   * @return True if matching qualifiers.
1226   */
1227  public boolean matchingQualifier(final byte [] qualifier) {
1228    return matchingQualifier(qualifier, 0, qualifier.length);
1229  }
1230
1231  public boolean matchingQualifier(final byte [] qualifier, int offset, int length) {
1232    return Bytes.equals(qualifier, offset, length,
1233        this.bytes, getQualifierOffset(), getQualifierLength());
1234  }
1235
1236  public boolean matchingQualifier(final KeyValue other) {
1237    return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
1238        other.getQualifierLength());
1239  }
1240
1241  public boolean matchingRow(final byte [] row) {
1242    return matchingRow(row, 0, row.length);
1243  }
1244
1245  public boolean matchingRow(final byte[] row, int offset, int length) {
1246    return Bytes.equals(row, offset, length,
1247        this.bytes, getRowOffset(), getRowLength());
1248  }
1249
1250  public boolean matchingRow(KeyValue other) {
1251    return matchingRow(other.getBuffer(), other.getRowOffset(),
1252        other.getRowLength());
1253  }
1254
1255  /**
1256   * @param column Column minus its delimiter
1257   * @return True if column matches.
1258   */
1259  public boolean matchingColumnNoDelimiter(final byte [] column) {
1260    int rl = getRowLength();
1261    int o = getFamilyOffset(rl);
1262    int fl = getFamilyLength(o);
1263    int l = fl + getQualifierLength(rl,fl);
1264    return Bytes.equals(column, 0, column.length, this.bytes, o, l);
1265  }
1266
1267  /**
1268   *
1269   * @param family column family
1270   * @param qualifier column qualifier
1271   * @return True if column matches
1272   */
1273  public boolean matchingColumn(final byte[] family, final byte[] qualifier) {
1274    int rl = getRowLength();
1275    int o = getFamilyOffset(rl);
1276    int fl = getFamilyLength(o);
1277    int ql = getQualifierLength(rl,fl);
1278    if (!Bytes.equals(family, 0, family.length, this.bytes, o, fl)) {
1279      return false;
1280    }
1281    if (qualifier == null || qualifier.length == 0) {
1282      if (ql == 0) {
1283        return true;
1284      }
1285      return false;
1286    }
1287    return Bytes.equals(qualifier, 0, qualifier.length,
1288        this.bytes, o + fl, ql);
1289  }
1290
1291  /**
1292   * @param left
1293   * @param loffset
1294   * @param llength
1295   * @param lfamilylength Offset of family delimiter in left column.
1296   * @param right
1297   * @param roffset
1298   * @param rlength
1299   * @param rfamilylength Offset of family delimiter in right column.
1300   * @return The result of the comparison.
1301   */
1302  static int compareColumns(final byte [] left, final int loffset,
1303      final int llength, final int lfamilylength,
1304      final byte [] right, final int roffset, final int rlength,
1305      final int rfamilylength) {
1306    // Compare family portion first.
1307    int diff = Bytes.compareTo(left, loffset, lfamilylength,
1308      right, roffset, rfamilylength);
1309    if (diff != 0) {
1310      return diff;
1311    }
1312    // Compare qualifier portion
1313    return Bytes.compareTo(left, loffset + lfamilylength,
1314      llength - lfamilylength,
1315      right, roffset + rfamilylength, rlength - rfamilylength);
1316  }
1317
1318  /**
1319   * @return True if non-null row and column.
1320   */
1321  public boolean nonNullRowAndColumn() {
1322    return getRowLength() > 0 && !isEmptyColumn();
1323  }
1324
1325  /**
1326   * @return True if column is empty.
1327   */
1328  public boolean isEmptyColumn() {
1329    return getQualifierLength() == 0;
1330  }
1331
1332  /**
1333   * Creates a new KeyValue that only contains the key portion (the value is
1334   * set to be null).
1335   * @param lenAsVal replace value with the actual value length (false=empty)
1336   */
1337  public KeyValue createKeyOnly(boolean lenAsVal) {
1338    // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1339    // Rebuild as: <keylen:4><0:4><key:keylen>
1340    int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1341    byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1342    System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1343        Math.min(newBuffer.length,this.length));
1344    Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1345    if (lenAsVal) {
1346      Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1347    }
1348    return new KeyValue(newBuffer);
1349  }
1350
1351  /**
1352   * Splits a column in family:qualifier form into separate byte arrays.
1353   * <p>
1354   * Not recommend to be used as this is old-style API.
1355   * @param c  The column.
1356   * @return The parsed column.
1357   */
1358  public static byte [][] parseColumn(byte [] c) {
1359    final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1360    if (index == -1) {
1361      // If no delimiter, return array of size 1
1362      return new byte [][] { c };
1363    } else if(index == c.length - 1) {
1364      // Only a family, return array size 1
1365      byte [] family = new byte[c.length-1];
1366      System.arraycopy(c, 0, family, 0, family.length);
1367      return new byte [][] { family };
1368    }
1369    // Family and column, return array size 2
1370    final byte [][] result = new byte [2][];
1371    result[0] = new byte [index];
1372    System.arraycopy(c, 0, result[0], 0, index);
1373    final int len = c.length - (index + 1);
1374    result[1] = new byte[len];
1375    System.arraycopy(c, index + 1 /*Skip delimiter*/, result[1], 0,
1376      len);
1377    return result;
1378  }
1379
1380  /**
1381   * Makes a column in family:qualifier form from separate byte arrays.
1382   * <p>
1383   * Not recommended for usage as this is old-style API.
1384   * @param family
1385   * @param qualifier
1386   * @return family:qualifier
1387   */
1388  public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1389    return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1390  }
1391
1392  /**
1393   * @param b
1394   * @return Index of the family-qualifier colon delimiter character in passed
1395   * buffer.
1396   */
1397  public static int getFamilyDelimiterIndex(final byte [] b, final int offset,
1398      final int length) {
1399    return getRequiredDelimiter(b, offset, length, COLUMN_FAMILY_DELIMITER);
1400  }
1401
1402  private static int getRequiredDelimiter(final byte [] b,
1403      final int offset, final int length, final int delimiter) {
1404    int index = getDelimiter(b, offset, length, delimiter);
1405    if (index < 0) {
1406      throw new IllegalArgumentException("No " + (char)delimiter + " in <" +
1407        Bytes.toString(b) + ">" + ", length=" + length + ", offset=" + offset);
1408    }
1409    return index;
1410  }
1411
1412  /**
1413   * This function is only used in Meta key comparisons so its error message
1414   * is specific for meta key errors.
1415   */
1416  static int getRequiredDelimiterInReverse(final byte [] b,
1417      final int offset, final int length, final int delimiter) {
1418    int index = getDelimiterInReverse(b, offset, length, delimiter);
1419    if (index < 0) {
1420      throw new IllegalArgumentException(".META. key must have two '" + (char)delimiter + "' "
1421        + "delimiters and have the following format: '<table>,<key>,<etc>'");
1422    }
1423    return index;
1424  }
1425
1426  /**
1427   * @param b
1428   * @param delimiter
1429   * @return Index of delimiter having started from start of <code>b</code>
1430   * moving rightward.
1431   */
1432  public static int getDelimiter(final byte [] b, int offset, final int length,
1433      final int delimiter) {
1434    if (b == null) {
1435      throw new IllegalArgumentException("Passed buffer is null");
1436    }
1437    int result = -1;
1438    for (int i = offset; i < length + offset; i++) {
1439      if (b[i] == delimiter) {
1440        result = i;
1441        break;
1442      }
1443    }
1444    return result;
1445  }
1446
1447  /**
1448   * Find index of passed delimiter walking from end of buffer backwards.
1449   * @param b
1450   * @param delimiter
1451   * @return Index of delimiter
1452   */
1453  public static int getDelimiterInReverse(final byte [] b, final int offset,
1454      final int length, final int delimiter) {
1455    if (b == null) {
1456      throw new IllegalArgumentException("Passed buffer is null");
1457    }
1458    int result = -1;
1459    for (int i = (offset + length) - 1; i >= offset; i--) {
1460      if (b[i] == delimiter) {
1461        result = i;
1462        break;
1463      }
1464    }
1465    return result;
1466  }
1467
1468  /**
1469   * A {@link KVComparator} for <code>-ROOT-</code> catalog table
1470   * {@link KeyValue}s.
1471   */
1472  public static class RootComparator extends MetaComparator {
1473    private final KeyComparator rawcomparator = new RootKeyComparator();
1474
1475    public KeyComparator getRawComparator() {
1476      return this.rawcomparator;
1477    }
1478
1479    @Override
1480    protected Object clone() throws CloneNotSupportedException {
1481      return new RootComparator();
1482    }
1483  }
1484
1485  /**
1486   * A {@link KVComparator} for <code>.META.</code> catalog table
1487   * {@link KeyValue}s.
1488   */
1489  public static class MetaComparator extends KVComparator {
1490    private final KeyComparator rawcomparator = new MetaKeyComparator();
1491
1492    public KeyComparator getRawComparator() {
1493      return this.rawcomparator;
1494    }
1495
1496    @Override
1497    protected Object clone() throws CloneNotSupportedException {
1498      return new MetaComparator();
1499    }
1500  }
1501
1502  /**
1503   * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1504   * portion.  This means two KeyValues with same Key but different Values are
1505   * considered the same as far as this Comparator is concerned.
1506   * Hosts a {@link KeyComparator}.
1507   */
1508  public static class KVComparator implements java.util.Comparator<KeyValue> {
1509    private final KeyComparator rawcomparator = new KeyComparator();
1510
1511    /**
1512     * @return RawComparator that can compare the Key portion of a KeyValue.
1513     * Used in hfile where indices are the Key portion of a KeyValue.
1514     */
1515    public KeyComparator getRawComparator() {
1516      return this.rawcomparator;
1517    }
1518
1519    public int compare(final KeyValue left, final KeyValue right) {
1520      int ret = getRawComparator().compare(left.getBuffer(),
1521          left.getOffset() + ROW_OFFSET, left.getKeyLength(),
1522          right.getBuffer(), right.getOffset() + ROW_OFFSET,
1523          right.getKeyLength());
1524      if (ret != 0) return ret;
1525      // Negate this comparison so later edits show up first
1526      return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS());
1527    }
1528
1529    public int compareTimestamps(final KeyValue left, final KeyValue right) {
1530      return compareTimestamps(left, left.getKeyLength(), right,
1531        right.getKeyLength());
1532    }
1533
1534    int compareTimestamps(final KeyValue left, final int lkeylength,
1535        final KeyValue right, final int rkeylength) {
1536      // Compare timestamps
1537      long ltimestamp = left.getTimestamp(lkeylength);
1538      long rtimestamp = right.getTimestamp(rkeylength);
1539      return getRawComparator().compareTimestamps(ltimestamp, rtimestamp);
1540    }
1541
1542    /**
1543     * @param left
1544     * @param right
1545     * @return Result comparing rows.
1546     */
1547    public int compareRows(final KeyValue left, final KeyValue right) {
1548      return compareRows(left, left.getRowLength(), right,
1549          right.getRowLength());
1550    }
1551
1552    /**
1553     * @param left
1554     * @param lrowlength Length of left row.
1555     * @param right
1556     * @param rrowlength Length of right row.
1557     * @return Result comparing rows.
1558     */
1559    public int compareRows(final KeyValue left, final short lrowlength,
1560        final KeyValue right, final short rrowlength) {
1561      return getRawComparator().compareRows(left.getBuffer(),
1562          left.getRowOffset(), lrowlength,
1563        right.getBuffer(), right.getRowOffset(), rrowlength);
1564    }
1565
1566    /**
1567     * @param left
1568     * @param row - row key (arbitrary byte array)
1569     * @return RawComparator
1570     */
1571    public int compareRows(final KeyValue left, final byte [] row) {
1572      return getRawComparator().compareRows(left.getBuffer(),
1573          left.getRowOffset(), left.getRowLength(), row, 0, row.length);
1574    }
1575
1576    public int compareRows(byte [] left, int loffset, int llength,
1577        byte [] right, int roffset, int rlength) {
1578      return getRawComparator().compareRows(left, loffset, llength,
1579        right, roffset, rlength);
1580    }
1581
1582    public int compareColumns(final KeyValue left, final byte [] right,
1583        final int roffset, final int rlength, final int rfamilyoffset) {
1584      int offset = left.getFamilyOffset();
1585      int length = left.getFamilyLength() + left.getQualifierLength();
1586      return getRawComparator().compareColumns(left.getBuffer(), offset, length,
1587        left.getFamilyLength(offset),
1588        right, roffset, rlength, rfamilyoffset);
1589    }
1590
1591    int compareColumns(final KeyValue left, final short lrowlength,
1592        final KeyValue right, final short rrowlength) {
1593      int lfoffset = left.getFamilyOffset(lrowlength);
1594      int rfoffset = right.getFamilyOffset(rrowlength);
1595      int lclength = left.getTotalColumnLength(lrowlength,lfoffset);
1596      int rclength = right.getTotalColumnLength(rrowlength, rfoffset);
1597      int lfamilylength = left.getFamilyLength(lfoffset);
1598      int rfamilylength = right.getFamilyLength(rfoffset);
1599      return getRawComparator().compareColumns(left.getBuffer(), lfoffset,
1600          lclength, lfamilylength,
1601        right.getBuffer(), rfoffset, rclength, rfamilylength);
1602    }
1603
1604    /**
1605     * Compares the row and column of two keyvalues for equality
1606     * @param left
1607     * @param right
1608     * @return True if same row and column.
1609     */
1610    public boolean matchingRowColumn(final KeyValue left,
1611        final KeyValue right) {
1612      short lrowlength = left.getRowLength();
1613      short rrowlength = right.getRowLength();
1614      // TsOffset = end of column data. just comparing Row+CF length of each
1615      return ((left.getTimestampOffset() - left.getOffset()) ==
1616              (right.getTimestampOffset() - right.getOffset())) &&
1617        matchingRows(left, lrowlength, right, rrowlength) &&
1618        compareColumns(left, lrowlength, right, rrowlength) == 0;
1619    }
1620
1621    /**
1622     * @param left
1623     * @param right
1624     * @return True if rows match.
1625     */
1626    public boolean matchingRows(final KeyValue left, final byte [] right) {
1627      return Bytes.equals(left.getBuffer(), left.getRowOffset(), left.getRowLength(),
1628          right, 0, right.length);
1629    }
1630
1631    /**
1632     * Compares the row of two keyvalues for equality
1633     * @param left
1634     * @param right
1635     * @return True if rows match.
1636     */
1637    public boolean matchingRows(final KeyValue left, final KeyValue right) {
1638      short lrowlength = left.getRowLength();
1639      short rrowlength = right.getRowLength();
1640      return matchingRows(left, lrowlength, right, rrowlength);
1641    }
1642
1643    /**
1644     * @param left
1645     * @param lrowlength
1646     * @param right
1647     * @param rrowlength
1648     * @return True if rows match.
1649     */
1650    public boolean matchingRows(final KeyValue left, final short lrowlength,
1651        final KeyValue right, final short rrowlength) {
1652      return lrowlength == rrowlength &&
1653          Bytes.equals(left.getBuffer(), left.getRowOffset(), lrowlength,
1654              right.getBuffer(), right.getRowOffset(), rrowlength);
1655    }
1656
1657    public boolean matchingRows(final byte [] left, final int loffset,
1658        final int llength,
1659        final byte [] right, final int roffset, final int rlength) {
1660      return Bytes.equals(left, loffset, llength,
1661          right, roffset, rlength);
1662    }
1663
1664    /**
1665     * Compares the row and timestamp of two keys
1666     * Was called matchesWithoutColumn in HStoreKey.
1667     * @param right Key to compare against.
1668     * @return True if same row and timestamp is greater than the timestamp in
1669     * <code>right</code>
1670     */
1671    public boolean matchingRowsGreaterTimestamp(final KeyValue left,
1672        final KeyValue right) {
1673      short lrowlength = left.getRowLength();
1674      short rrowlength = right.getRowLength();
1675      if (!matchingRows(left, lrowlength, right, rrowlength)) {
1676        return false;
1677      }
1678      return left.getTimestamp() >= right.getTimestamp();
1679    }
1680
1681    @Override
1682    protected Object clone() throws CloneNotSupportedException {
1683      return new KVComparator();
1684    }
1685
1686    /**
1687     * @return Comparator that ignores timestamps; useful counting versions.
1688     */
1689    public KVComparator getComparatorIgnoringTimestamps() {
1690      KVComparator c = null;
1691      try {
1692        c = (KVComparator)this.clone();
1693        c.getRawComparator().ignoreTimestamp = true;
1694      } catch (CloneNotSupportedException e) {
1695        LOG.error("Not supported", e);
1696      }
1697      return c;
1698    }
1699
1700    /**
1701     * @return Comparator that ignores key type; useful checking deletes
1702     */
1703    public KVComparator getComparatorIgnoringType() {
1704      KVComparator c = null;
1705      try {
1706        c = (KVComparator)this.clone();
1707        c.getRawComparator().ignoreType = true;
1708      } catch (CloneNotSupportedException e) {
1709        LOG.error("Not supported", e);
1710      }
1711      return c;
1712    }
1713  }
1714
1715  /**
1716   * Creates a KeyValue that is last on the specified row id. That is,
1717   * every other possible KeyValue for the given row would compareTo()
1718   * less than the result of this call.
1719   * @param row row key
1720   * @return Last possible KeyValue on passed <code>row</code>
1721   */
1722  public static KeyValue createLastOnRow(final byte[] row) {
1723    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
1724  }
1725
1726  /**
1727   * Create a KeyValue that is smaller than all other possible KeyValues
1728   * for the given row. That is any (valid) KeyValue on 'row' would sort
1729   * _after_ the result.
1730   *
1731   * @param row - row key (arbitrary byte array)
1732   * @return First possible KeyValue on passed <code>row</code>
1733   */
1734  public static KeyValue createFirstOnRow(final byte [] row) {
1735    return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
1736  }
1737
1738  /**
1739   * Create a KeyValue that is smaller than all other possible KeyValues
1740   * for the given row. That is any (valid) KeyValue on 'row' would sort
1741   * _after_ the result.
1742   *
1743   * @param row - row key (arbitrary byte array)
1744   * @return First possible KeyValue on passed <code>row</code>
1745   */
1746  public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
1747    return new KeyValue(row, roffset, rlength,
1748        null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
1749  }
1750
1751  /**
1752   * Creates a KeyValue that is smaller than all other KeyValues that
1753   * are older than the passed timestamp.
1754   * @param row - row key (arbitrary byte array)
1755   * @param ts - timestamp
1756   * @return First possible key on passed <code>row</code> and timestamp.
1757   */
1758  public static KeyValue createFirstOnRow(final byte [] row,
1759      final long ts) {
1760    return new KeyValue(row, null, null, ts, Type.Maximum);
1761  }
1762
1763  /**
1764   * Create a KeyValue for the specified row, family and qualifier that would be
1765   * smaller than all other possible KeyValues that have the same row,family,qualifier.
1766   * Used for seeking.
1767   * @param row - row key (arbitrary byte array)
1768   * @param family - family name
1769   * @param qualifier - column qualifier
1770   * @return First possible key on passed <code>row</code>, and column.
1771   */
1772  public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
1773      final byte [] qualifier) {
1774    return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
1775  }
1776
1777  /**
1778   * Create a Delete Family KeyValue for the specified row and family that would
1779   * be smaller than all other possible Delete Family KeyValues that have the
1780   * same row and family.
1781   * Used for seeking.
1782   * @param row - row key (arbitrary byte array)
1783   * @param family - family name
1784   * @return First Delete Family possible key on passed <code>row</code>.
1785   */
1786  public static KeyValue createFirstDeleteFamilyOnRow(final byte [] row,
1787      final byte [] family) {
1788    return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
1789        Type.DeleteFamily);
1790  }
1791
1792  /**
1793   * @param row - row key (arbitrary byte array)
1794   * @param f - family name
1795   * @param q - column qualifier
1796   * @param ts - timestamp
1797   * @return First possible key on passed <code>row</code>, column and timestamp
1798   */
1799  public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
1800      final byte [] q, final long ts) {
1801    return new KeyValue(row, f, q, ts, Type.Maximum);
1802  }
1803
1804  /**
1805   * Create a KeyValue for the specified row, family and qualifier that would be
1806   * smaller than all other possible KeyValues that have the same row,
1807   * family, qualifier.
1808   * Used for seeking.
1809   * @param row row key
1810   * @param roffset row offset
1811   * @param rlength row length
1812   * @param family family name
1813   * @param foffset family offset
1814   * @param flength family length
1815   * @param qualifier column qualifier
1816   * @param qoffset qualifier offset
1817   * @param qlength qualifier length
1818   * @return First possible key on passed Row, Family, Qualifier.
1819   */
1820  public static KeyValue createFirstOnRow(final byte [] row,
1821      final int roffset, final int rlength, final byte [] family,
1822      final int foffset, final int flength, final byte [] qualifier,
1823      final int qoffset, final int qlength) {
1824    return new KeyValue(row, roffset, rlength, family,
1825        foffset, flength, qualifier, qoffset, qlength,
1826        HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
1827  }
1828
1829  /**
1830   * Create a KeyValue for the specified row, family and qualifier that would be
1831   * larger than or equal to all other possible KeyValues that have the same
1832   * row, family, qualifier.
1833   * Used for reseeking.
1834   * @param row row key
1835   * @param roffset row offset
1836   * @param rlength row length
1837   * @param family family name
1838   * @param foffset family offset
1839   * @param flength family length
1840   * @param qualifier column qualifier
1841   * @param qoffset qualifier offset
1842   * @param qlength qualifier length
1843   * @return Last possible key on passed row, family, qualifier.
1844   */
1845  public static KeyValue createLastOnRow(final byte [] row,
1846      final int roffset, final int rlength, final byte [] family,
1847      final int foffset, final int flength, final byte [] qualifier,
1848      final int qoffset, final int qlength) {
1849    return new KeyValue(row, roffset, rlength, family,
1850        foffset, flength, qualifier, qoffset, qlength,
1851        HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
1852  }
1853
1854  /**
1855   * Similar to {@link #createLastOnRow(byte[], int, int, byte[], int, int,
1856   * byte[], int, int)} but creates the last key on the row/column of this KV
1857   * (the value part of the returned KV is always empty). Used in creating
1858   * "fake keys" for the multi-column Bloom filter optimization to skip the
1859   * row/column we already know is not in the file.
1860   * @return the last key on the row/column of the given key-value pair
1861   */
1862  public KeyValue createLastOnRowCol() {
1863    return new KeyValue(
1864        bytes, getRowOffset(), getRowLength(),
1865        bytes, getFamilyOffset(), getFamilyLength(),
1866        bytes, getQualifierOffset(), getQualifierLength(),
1867        HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
1868  }
1869
1870  /**
1871   * Creates the first KV with the row/family/qualifier of this KV and the
1872   * given timestamp. Uses the "maximum" KV type that guarantees that the new
1873   * KV is the lowest possible for this combination of row, family, qualifier,
1874   * and timestamp. This KV's own timestamp is ignored. While this function
1875   * copies the value from this KV, it is normally used on key-only KVs.
1876   */
1877  public KeyValue createFirstOnRowColTS(long ts) {
1878    return new KeyValue(
1879        bytes, getRowOffset(), getRowLength(),
1880        bytes, getFamilyOffset(), getFamilyLength(),
1881        bytes, getQualifierOffset(), getQualifierLength(),
1882        ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
1883  }
1884
1885  /**
1886   * @param b
1887   * @return A KeyValue made of a byte array that holds the key-only part.
1888   * Needed to convert hfile index members to KeyValues.
1889   */
1890  public static KeyValue createKeyValueFromKey(final byte [] b) {
1891    return createKeyValueFromKey(b, 0, b.length);
1892  }
1893
1894  /**
1895   * @param bb
1896   * @return A KeyValue made of a byte buffer that holds the key-only part.
1897   * Needed to convert hfile index members to KeyValues.
1898   */
1899  public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
1900    return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
1901  }
1902
1903  /**
1904   * @param b
1905   * @param o
1906   * @param l
1907   * @return A KeyValue made of a byte array that holds the key-only part.
1908   * Needed to convert hfile index members to KeyValues.
1909   */
1910  public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
1911      final int l) {
1912    byte [] newb = new byte[l + ROW_OFFSET];
1913    System.arraycopy(b, o, newb, ROW_OFFSET, l);
1914    Bytes.putInt(newb, 0, l);
1915    Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
1916    return new KeyValue(newb);
1917  }
1918
1919  /**
1920   * Compare key portion of a {@link KeyValue} for keys in <code>-ROOT-<code>
1921   * table.
1922   */
1923  public static class RootKeyComparator extends MetaKeyComparator {
1924    public int compareRows(byte [] left, int loffset, int llength,
1925        byte [] right, int roffset, int rlength) {
1926      // Rows look like this: .META.,ROW_FROM_META,RID
1927      //        LOG.info("ROOT " + Bytes.toString(left, loffset, llength) +
1928      //          "---" + Bytes.toString(right, roffset, rlength));
1929      final int metalength = 7; // '.META.' length
1930      int lmetaOffsetPlusDelimiter = loffset + metalength;
1931      int leftFarDelimiter = getDelimiterInReverse(left,
1932          lmetaOffsetPlusDelimiter,
1933          llength - metalength, HRegionInfo.DELIMITER);
1934      int rmetaOffsetPlusDelimiter = roffset + metalength;
1935      int rightFarDelimiter = getDelimiterInReverse(right,
1936          rmetaOffsetPlusDelimiter, rlength - metalength,
1937          HRegionInfo.DELIMITER);
1938      if (leftFarDelimiter < 0 && rightFarDelimiter >= 0) {
1939        // Nothing between .META. and regionid.  Its first key.
1940        return -1;
1941      } else if (rightFarDelimiter < 0 && leftFarDelimiter >= 0) {
1942        return 1;
1943      } else if (leftFarDelimiter < 0 && rightFarDelimiter < 0) {
1944        return 0;
1945      }
1946      int result = super.compareRows(left, lmetaOffsetPlusDelimiter,
1947          leftFarDelimiter - lmetaOffsetPlusDelimiter,
1948          right, rmetaOffsetPlusDelimiter,
1949          rightFarDelimiter - rmetaOffsetPlusDelimiter);
1950      if (result != 0) {
1951        return result;
1952      }
1953      // Compare last part of row, the rowid.
1954      leftFarDelimiter++;
1955      rightFarDelimiter++;
1956      result = compareRowid(left, leftFarDelimiter,
1957          llength - (leftFarDelimiter - loffset),
1958          right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1959      return result;
1960    }
1961  }
1962
1963  /**
1964   * Comparator that compares row component only of a KeyValue.
1965   */
1966  public static class RowComparator implements Comparator<KeyValue> {
1967    final KVComparator comparator;
1968
1969    public RowComparator(final KVComparator c) {
1970      this.comparator = c;
1971    }
1972
1973    public int compare(KeyValue left, KeyValue right) {
1974      return comparator.compareRows(left, right);
1975    }
1976  }
1977
1978  /**
1979   * Compare key portion of a {@link KeyValue} for keys in <code>.META.</code>
1980   * table.
1981   */
1982  public static class MetaKeyComparator extends KeyComparator {
1983    public int compareRows(byte [] left, int loffset, int llength,
1984        byte [] right, int roffset, int rlength) {
1985      //        LOG.info("META " + Bytes.toString(left, loffset, llength) +
1986      //          "---" + Bytes.toString(right, roffset, rlength));
1987      int leftDelimiter = getDelimiter(left, loffset, llength,
1988          HRegionInfo.DELIMITER);
1989      int rightDelimiter = getDelimiter(right, roffset, rlength,
1990          HRegionInfo.DELIMITER);
1991      if (leftDelimiter < 0 && rightDelimiter >= 0) {
1992        // Nothing between .META. and regionid.  Its first key.
1993        return -1;
1994      } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1995        return 1;
1996      } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1997        return 0;
1998      }
1999      // Compare up to the delimiter
2000      int result = Bytes.compareTo(left, loffset, leftDelimiter - loffset,
2001          right, roffset, rightDelimiter - roffset);
2002      if (result != 0) {
2003        return result;
2004      }
2005      // Compare middle bit of the row.
2006      // Move past delimiter
2007      leftDelimiter++;
2008      rightDelimiter++;
2009      int leftFarDelimiter = getRequiredDelimiterInReverse(left, leftDelimiter,
2010          llength - (leftDelimiter - loffset), HRegionInfo.DELIMITER);
2011      int rightFarDelimiter = getRequiredDelimiterInReverse(right,
2012          rightDelimiter, rlength - (rightDelimiter - roffset),
2013          HRegionInfo.DELIMITER);
2014      // Now compare middlesection of row.
2015      result = super.compareRows(left, leftDelimiter,
2016          leftFarDelimiter - leftDelimiter, right, rightDelimiter,
2017          rightFarDelimiter - rightDelimiter);
2018      if (result != 0) {
2019        return result;
2020      }
2021      // Compare last part of row, the rowid.
2022      leftFarDelimiter++;
2023      rightFarDelimiter++;
2024      result = compareRowid(left, leftFarDelimiter,
2025          llength - (leftFarDelimiter - loffset),
2026          right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
2027      return result;
2028    }
2029
2030    protected int compareRowid(byte[] left, int loffset, int llength,
2031        byte[] right, int roffset, int rlength) {
2032      return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2033    }
2034  }
2035
2036  /**
2037   * Avoids redundant comparisons for better performance.
2038   */
2039  public static interface SamePrefixComparator<T> {
2040    /**
2041     * Compare two keys assuming that the first n bytes are the same.
2042     * @param commonPrefix How many bytes are the same.
2043     */
2044    public int compareIgnoringPrefix(int commonPrefix,
2045        T left, int loffset, int llength,
2046        T right, int roffset, int rlength);
2047  }
2048
2049  /**
2050   * Compare key portion of a {@link KeyValue}.
2051   */
2052  public static class KeyComparator
2053      implements RawComparator<byte []>, SamePrefixComparator<byte[]> {
2054    volatile boolean ignoreTimestamp = false;
2055    volatile boolean ignoreType = false;
2056
2057    public int compare(byte[] left, int loffset, int llength, byte[] right,
2058        int roffset, int rlength) {
2059      // Compare row
2060      short lrowlength = Bytes.toShort(left, loffset);
2061      short rrowlength = Bytes.toShort(right, roffset);
2062      int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
2063          lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
2064      if (compare != 0) {
2065        return compare;
2066      }
2067
2068      // Compare the rest of the two KVs without making any assumptions about
2069      // the common prefix. This function will not compare rows anyway, so we
2070      // don't need to tell it that the common prefix includes the row.
2071      return compareWithoutRow(0, left, loffset, llength, right, roffset,
2072          rlength, rrowlength);
2073    }
2074
2075    /**
2076     * Compare the two key-values, ignoring the prefix of the given length
2077     * that is known to be the same between the two.
2078     * @param commonPrefix the prefix length to ignore
2079     */
2080    @Override
2081    public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2082        int loffset, int llength, byte[] right, int roffset, int rlength) {
2083      // Compare row
2084      short lrowlength = Bytes.toShort(left, loffset);
2085      short rrowlength;
2086
2087      int comparisonResult = 0;
2088      if (commonPrefix < ROW_LENGTH_SIZE) {
2089        // almost nothing in common
2090        rrowlength = Bytes.toShort(right, roffset);
2091        comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2092            lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2093      } else { // the row length is the same
2094        rrowlength = lrowlength;
2095        if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2096          // The rows are not the same. Exclude the common prefix and compare
2097          // the rest of the two rows.
2098          int common = commonPrefix - ROW_LENGTH_SIZE;
2099          comparisonResult = compareRows(
2100              left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2101              right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2102        }
2103      }
2104      if (comparisonResult != 0) {
2105        return comparisonResult;
2106      }
2107
2108      assert lrowlength == rrowlength;
2109
2110      return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2111          roffset, rlength, lrowlength);
2112    }
2113
2114    /**
2115     * Compare columnFamily, qualifier, timestamp, and key type (everything
2116     * except the row). This method is used both in the normal comparator and
2117     * the "same-prefix" comparator. Note that we are assuming that row portions
2118     * of both KVs have already been parsed and found identical, and we don't
2119     * validate that assumption here.
2120     * @param commonPrefix
2121     *          the length of the common prefix of the two key-values being
2122     *          compared, including row length and row
2123     */
2124    private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2125        int llength, byte[] right, int roffset, int rlength, short rowlength) {
2126      /***
2127       * KeyValue Format and commonLength:
2128       * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2129       * ------------------|-------commonLength--------|--------------
2130       */
2131      int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2132
2133      // commonLength + TIMESTAMP_TYPE_SIZE
2134      int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2135      // ColumnFamily + Qualifier length.
2136      int lcolumnlength = llength - commonLengthWithTSAndType;
2137      int rcolumnlength = rlength - commonLengthWithTSAndType;
2138
2139      byte ltype = left[loffset + (llength - 1)];
2140      byte rtype = right[roffset + (rlength - 1)];
2141
2142      // If the column is not specified, the "minimum" key type appears the
2143      // latest in the sorted order, regardless of the timestamp. This is used
2144      // for specifying the last key/value in a given row, because there is no
2145      // "lexicographically last column" (it would be infinitely long). The
2146      // "maximum" key type does not need this behavior.
2147      if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2148        // left is "bigger", i.e. it appears later in the sorted order
2149        return 1;
2150      }
2151      if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2152        return -1;
2153      }
2154
2155      int lfamilyoffset = commonLength + loffset;
2156      int rfamilyoffset = commonLength + roffset;
2157
2158      // Column family length.
2159      int lfamilylength = left[lfamilyoffset - 1];
2160      int rfamilylength = right[rfamilyoffset - 1];
2161      // If left family size is not equal to right family size, we need not
2162      // compare the qualifiers. 
2163      boolean sameFamilySize = (lfamilylength == rfamilylength);
2164      int common = 0;
2165      if (commonPrefix > 0) {
2166        common = Math.max(0, commonPrefix - commonLength);
2167        if (!sameFamilySize) {
2168          // Common should not be larger than Math.min(lfamilylength,
2169          // rfamilylength).
2170          common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2171        } else {
2172          common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2173        }
2174      }
2175      if (!sameFamilySize) {
2176        // comparing column family is enough.
2177        return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2178            - common, right, rfamilyoffset + common, rfamilylength - common);
2179      }
2180      // Compare family & qualifier together.
2181      final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2182          lcolumnlength - common, right, rfamilyoffset + common,
2183          rcolumnlength - common);
2184      if (comparison != 0) {
2185        return comparison;
2186      }
2187      return compareTimestampAndType(left, loffset, llength, right, roffset,
2188          rlength, ltype, rtype);
2189    }
2190
2191    private int compareTimestampAndType(byte[] left, int loffset, int llength,
2192        byte[] right, int roffset, int rlength, byte ltype, byte rtype) {
2193      int compare;
2194      if (!this.ignoreTimestamp) {
2195        // Get timestamps.
2196        long ltimestamp = Bytes.toLong(left,
2197            loffset + (llength - TIMESTAMP_TYPE_SIZE));
2198        long rtimestamp = Bytes.toLong(right,
2199            roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2200        compare = compareTimestamps(ltimestamp, rtimestamp);
2201        if (compare != 0) {
2202          return compare;
2203        }
2204      }
2205
2206      if (!this.ignoreType) {
2207        // Compare types. Let the delete types sort ahead of puts; i.e. types
2208        // of higher numbers sort before those of lesser numbers. Maximum (255)
2209        // appears ahead of everything, and minimum (0) appears after
2210        // everything.
2211        return (0xff & rtype) - (0xff & ltype);
2212      }
2213      return 0;
2214    }
2215
2216    public int compare(byte[] left, byte[] right) {
2217      return compare(left, 0, left.length, right, 0, right.length);
2218    }
2219
2220    public int compareRows(byte [] left, int loffset, int llength,
2221        byte [] right, int roffset, int rlength) {
2222      return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2223    }
2224
2225    protected int compareColumns(
2226        byte [] left, int loffset, int llength, final int lfamilylength,
2227        byte [] right, int roffset, int rlength, final int rfamilylength) {
2228      return KeyValue.compareColumns(left, loffset, llength, lfamilylength,
2229        right, roffset, rlength, rfamilylength);
2230    }
2231
2232    int compareTimestamps(final long ltimestamp, final long rtimestamp) {
2233      // The below older timestamps sorting ahead of newer timestamps looks
2234      // wrong but it is intentional. This way, newer timestamps are first
2235      // found when we iterate over a memstore and newer versions are the
2236      // first we trip over when reading from a store file.
2237      if (ltimestamp < rtimestamp) {
2238        return 1;
2239      } else if (ltimestamp > rtimestamp) {
2240        return -1;
2241      }
2242      return 0;
2243    }
2244  }
2245
2246  // HeapSize
2247  public long heapSize() {
2248    return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE
2249        + ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length)
2250        + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_LONG);
2251  }
2252
2253  // this overload assumes that the length bytes have already been read,
2254  // and it expects the length of the KeyValue to be explicitly passed
2255  // to it.
2256  public void readFields(int length, final DataInput in) throws IOException {
2257
2258    if (length <= 0) {
2259      throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2260    }
2261
2262    this.length = length;
2263    this.offset = 0;
2264    this.bytes = new byte[this.length];
2265    in.readFully(this.bytes, 0, this.length);
2266  }
2267
2268  // Writable
2269  public void readFields(final DataInput in) throws IOException {
2270    int length = in.readInt();
2271    readFields(length, in);
2272  }
2273
2274  public void write(final DataOutput out) throws IOException {
2275    out.writeInt(this.length);
2276    out.write(this.bytes, this.offset, this.length);
2277  }
2278}