View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.util.Comparator;
27  import java.util.HashMap;
28  import java.util.Map;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.io.HeapSize;
33  import org.apache.hadoop.hbase.io.hfile.HFile;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.ClassSize;
36  import org.apache.hadoop.io.RawComparator;
37  import org.apache.hadoop.io.Writable;
38  
39  import com.google.common.primitives.Longs;
40  
41  /**
42   * An HBase Key/Value.  This is the fundamental HBase Type.
43   *
44   * <p>If being used client-side, the primary methods to access individual fields
45   * are {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()},
46   * {@link #getTimestamp()}, and {@link #getValue()}.  These methods allocate new
47   * byte arrays and return copies. Avoid their use server-side.
48   *
49   * <p>Instances of this class are immutable.  They do not implement Comparable
50   * but Comparators are provided.  Comparators change with context,
51   * whether user table or a catalog table comparison.  Its critical you use the
52   * appropriate comparator.  There are Comparators for KeyValue instances and
53   * then for just the Key portion of a KeyValue used mostly by {@link HFile}.
54   *
55   * <p>KeyValue wraps a byte array and takes offsets and lengths into passed
56   * array at where to start interpreting the content as KeyValue.  The KeyValue
57   * format inside a byte array is:
58   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code>
59   * Key is further decomposed as:
60   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier> &lt;timestamp> &lt;keytype></code>
61   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>,
62   * column family length maximum is
63   * <code>Byte.MAX_SIZE</code>, and column qualifier + key length must
64   * be < <code>Integer.MAX_SIZE</code>.
65   * The column does not contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}
66   */
67  public class KeyValue implements Writable, HeapSize, Cloneable {
68    static final Log LOG = LogFactory.getLog(KeyValue.class);
69    // TODO: Group Key-only comparators and operations into a Key class, just
70    // for neatness sake, if can figure what to call it.
71  
72    /**
73     * Colon character in UTF-8
74     */
75    public static final char COLUMN_FAMILY_DELIMITER = ':';
76  
77    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
78      new byte[]{COLUMN_FAMILY_DELIMITER};
79  
80    /**
81     * Comparator for plain key/values; i.e. non-catalog table key/values.
82     */
83    public static KVComparator COMPARATOR = new KVComparator();
84  
85    /**
86     * Comparator for plain key; i.e. non-catalog table key.  Works on Key portion
87     * of KeyValue only.
88     */
89    public static KeyComparator KEY_COMPARATOR = new KeyComparator();
90  
91    /**
92     * A {@link KVComparator} for <code>.META.</code> catalog table
93     * {@link KeyValue}s.
94     */
95    public static KVComparator META_COMPARATOR = new MetaComparator();
96  
97    /**
98     * A {@link KVComparator} for <code>.META.</code> catalog table
99     * {@link KeyValue} keys.
100    */
101   public static KeyComparator META_KEY_COMPARATOR = new MetaKeyComparator();
102 
103   /**
104    * A {@link KVComparator} for <code>-ROOT-</code> catalog table
105    * {@link KeyValue}s.
106    */
107   public static KVComparator ROOT_COMPARATOR = new RootComparator();
108 
109   /**
110    * A {@link KVComparator} for <code>-ROOT-</code> catalog table
111    * {@link KeyValue} keys.
112    */
113   public static KeyComparator ROOT_KEY_COMPARATOR = new RootKeyComparator();
114 
115   /**
116    * Get the appropriate row comparator for the specified table.
117    *
118    * Hopefully we can get rid of this, I added this here because it's replacing
119    * something in HSK.  We should move completely off of that.
120    *
121    * @param tableName  The table name.
122    * @return The comparator.
123    */
124   public static KeyComparator getRowComparator(byte [] tableName) {
125     if(Bytes.equals(HTableDescriptor.ROOT_TABLEDESC.getName(),tableName)) {
126       return ROOT_COMPARATOR.getRawComparator();
127     }
128     if(Bytes.equals(HTableDescriptor.META_TABLEDESC.getName(), tableName)) {
129       return META_COMPARATOR.getRawComparator();
130     }
131     return COMPARATOR.getRawComparator();
132   }
133 
134   /** Size of the key length field in bytes*/
135   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
136 
137   /** Size of the key type field in bytes */
138   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
139 
140   /** Size of the row length field in bytes */
141   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
142 
143   /** Size of the family length field in bytes */
144   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
145 
146   /** Size of the timestamp field in bytes */
147   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
148 
149   // Size of the timestamp and type byte on end of a key -- a long + a byte.
150   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
151 
152   // Size of the length shorts and bytes in key.
153   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
154       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
155 
156   // How far into the key the row starts at. First thing to read is the short
157   // that says how long the row is.
158   public static final int ROW_OFFSET =
159     Bytes.SIZEOF_INT /*keylength*/ +
160     Bytes.SIZEOF_INT /*valuelength*/;
161 
162   // Size of the length ints in a KeyValue datastructure.
163   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
164 
165   /**
166    * Key type.
167    * Has space for other key types to be added later.  Cannot rely on
168    * enum ordinals . They change if item is removed or moved.  Do our own codes.
169    */
170   public static enum Type {
171     Minimum((byte)0),
172     Put((byte)4),
173 
174     Delete((byte)8),
175     DeleteColumn((byte)12),
176     DeleteFamily((byte)14),
177 
178     // Maximum is used when searching; you look from maximum on down.
179     Maximum((byte)255);
180 
181     private final byte code;
182 
183     Type(final byte c) {
184       this.code = c;
185     }
186 
187     public byte getCode() {
188       return this.code;
189     }
190 
191     /**
192      * Cannot rely on enum ordinals . They change if item is removed or moved.
193      * Do our own codes.
194      * @param b
195      * @return Type associated with passed code.
196      */
197     public static Type codeToType(final byte b) {
198       for (Type t : Type.values()) {
199         if (t.getCode() == b) {
200           return t;
201         }
202       }
203       throw new RuntimeException("Unknown code " + b);
204     }
205   }
206 
207   /**
208    * Lowest possible key.
209    * Makes a Key with highest possible Timestamp, empty row and column.  No
210    * key can be equal or lower than this one in memstore or in store file.
211    */
212   public static final KeyValue LOWESTKEY =
213     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
214 
215   private byte [] bytes = null;
216   private int offset = 0;
217   private int length = 0;
218 
219   /**
220    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
221    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
222    * KeyValue type.
223    */
224   public static boolean isDelete(byte t) {
225     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
226   }
227 
228   /** Here be dragons **/
229 
230   // used to achieve atomic operations in the memstore.
231   public long getMemstoreTS() {
232     return memstoreTS;
233   }
234 
235   public void setMemstoreTS(long memstoreTS) {
236     this.memstoreTS = memstoreTS;
237   }
238 
239   // default value is 0, aka DNC
240   private long memstoreTS = 0;
241 
242   /** Dragon time over, return to normal business */
243 
244 
245   /** Writable Constructor -- DO NOT USE */
246   public KeyValue() {}
247 
248   /**
249    * Creates a KeyValue from the start of the specified byte array.
250    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
251    * @param bytes byte array
252    */
253   public KeyValue(final byte [] bytes) {
254     this(bytes, 0);
255   }
256 
257   /**
258    * Creates a KeyValue from the specified byte array and offset.
259    * Presumes <code>bytes</code> content starting at <code>offset</code> is
260    * formatted as a KeyValue blob.
261    * @param bytes byte array
262    * @param offset offset to start of KeyValue
263    */
264   public KeyValue(final byte [] bytes, final int offset) {
265     this(bytes, offset, getLength(bytes, offset));
266   }
267 
268   /**
269    * Creates a KeyValue from the specified byte array, starting at offset, and
270    * for length <code>length</code>.
271    * @param bytes byte array
272    * @param offset offset to start of the KeyValue
273    * @param length length of the KeyValue
274    */
275   public KeyValue(final byte [] bytes, final int offset, final int length) {
276     this.bytes = bytes;
277     this.offset = offset;
278     this.length = length;
279   }
280 
281   /** Constructors that build a new backing byte array from fields */
282 
283   /**
284    * Constructs KeyValue structure filled with null value.
285    * Sets type to {@link KeyValue.Type#Maximum}
286    * @param row - row key (arbitrary byte array)
287    * @param timestamp
288    */
289   public KeyValue(final byte [] row, final long timestamp) {
290     this(row, timestamp, Type.Maximum);
291   }
292 
293   /**
294    * Constructs KeyValue structure filled with null value.
295    * @param row - row key (arbitrary byte array)
296    * @param timestamp
297    */
298   public KeyValue(final byte [] row, final long timestamp, Type type) {
299     this(row, null, null, timestamp, type, null);
300   }
301 
302   /**
303    * Constructs KeyValue structure filled with null value.
304    * Sets type to {@link KeyValue.Type#Maximum}
305    * @param row - row key (arbitrary byte array)
306    * @param family family name
307    * @param qualifier column qualifier
308    */
309   public KeyValue(final byte [] row, final byte [] family,
310       final byte [] qualifier) {
311     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
312   }
313 
314   /**
315    * Constructs KeyValue structure filled with null value.
316    * @param row - row key (arbitrary byte array)
317    * @param family family name
318    * @param qualifier column qualifier
319    */
320   public KeyValue(final byte [] row, final byte [] family,
321       final byte [] qualifier, final byte [] value) {
322     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
323   }
324 
325   /**
326    * Constructs KeyValue structure filled with specified values.
327    * @param row row key
328    * @param family family name
329    * @param qualifier column qualifier
330    * @param timestamp version timestamp
331    * @param type key type
332    * @throws IllegalArgumentException
333    */
334   public KeyValue(final byte[] row, final byte[] family,
335       final byte[] qualifier, final long timestamp, Type type) {
336     this(row, family, qualifier, timestamp, type, null);
337   }
338 
339   /**
340    * Constructs KeyValue structure filled with specified values.
341    * @param row row key
342    * @param family family name
343    * @param qualifier column qualifier
344    * @param timestamp version timestamp
345    * @param value column value
346    * @throws IllegalArgumentException
347    */
348   public KeyValue(final byte[] row, final byte[] family,
349       final byte[] qualifier, final long timestamp, final byte[] value) {
350     this(row, family, qualifier, timestamp, Type.Put, value);
351   }
352 
353   /**
354    * Constructs KeyValue structure filled with specified values.
355    * @param row row key
356    * @param family family name
357    * @param qualifier column qualifier
358    * @param timestamp version timestamp
359    * @param type key type
360    * @param value column value
361    * @throws IllegalArgumentException
362    */
363   public KeyValue(final byte[] row, final byte[] family,
364       final byte[] qualifier, final long timestamp, Type type,
365       final byte[] value) {
366     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
367         timestamp, type, value, 0, value==null ? 0 : value.length);
368   }
369 
370   /**
371    * Constructs KeyValue structure filled with specified values.
372    * @param row row key
373    * @param family family name
374    * @param qualifier column qualifier
375    * @param qoffset qualifier offset
376    * @param qlength qualifier length
377    * @param timestamp version timestamp
378    * @param type key type
379    * @param value column value
380    * @param voffset value offset
381    * @param vlength value length
382    * @throws IllegalArgumentException
383    */
384   public KeyValue(byte [] row, byte [] family,
385       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
386       byte [] value, int voffset, int vlength) {
387     this(row, 0, row==null ? 0 : row.length,
388         family, 0, family==null ? 0 : family.length,
389         qualifier, qoffset, qlength, timestamp, type,
390         value, voffset, vlength);
391   }
392 
393   /**
394    * Constructs KeyValue structure filled with specified values.
395    * <p>
396    * Column is split into two fields, family and qualifier.
397    * @param row row key
398    * @param roffset row offset
399    * @param rlength row length
400    * @param family family name
401    * @param foffset family offset
402    * @param flength family length
403    * @param qualifier column qualifier
404    * @param qoffset qualifier offset
405    * @param qlength qualifier length
406    * @param timestamp version timestamp
407    * @param type key type
408    * @param value column value
409    * @param voffset value offset
410    * @param vlength value length
411    * @throws IllegalArgumentException
412    */
413   public KeyValue(final byte [] row, final int roffset, final int rlength,
414       final byte [] family, final int foffset, final int flength,
415       final byte [] qualifier, final int qoffset, final int qlength,
416       final long timestamp, final Type type,
417       final byte [] value, final int voffset, final int vlength) {
418     this.bytes = createByteArray(row, roffset, rlength,
419         family, foffset, flength, qualifier, qoffset, qlength,
420         timestamp, type, value, voffset, vlength);
421     this.length = bytes.length;
422     this.offset = 0;
423   }
424 
425   /**
426    * Constructs an empty KeyValue structure, with specified sizes.
427    * This can be used to partially fill up KeyValues.
428    * <p>
429    * Column is split into two fields, family and qualifier.
430    * @param rlength row length
431    * @param flength family length
432    * @param qlength qualifier length
433    * @param timestamp version timestamp
434    * @param type key type
435    * @param vlength value length
436    * @throws IllegalArgumentException
437    */
438   public KeyValue(final int rlength,
439       final int flength,
440       final int qlength,
441       final long timestamp, final Type type,
442       final int vlength) {
443     this.bytes = createEmptyByteArray(rlength,
444         flength, qlength,
445         timestamp, type, vlength);
446     this.length = bytes.length;
447     this.offset = 0;
448   }
449 
450   /**
451    * Create an empty byte[] representing a KeyValue
452    * All lengths are preset and can be filled in later.
453    * @param rlength
454    * @param flength
455    * @param qlength
456    * @param timestamp
457    * @param type
458    * @param vlength
459    * @return The newly created byte array.
460    */
461   static byte[] createEmptyByteArray(final int rlength, int flength,
462       int qlength, final long timestamp, final Type type, int vlength) {
463     if (rlength > Short.MAX_VALUE) {
464       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
465     }
466     if (flength > Byte.MAX_VALUE) {
467       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
468     }
469     // Qualifier length
470     if (qlength > Integer.MAX_VALUE - rlength - flength) {
471       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
472     }
473     // Key length
474     long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
475     if (longkeylength > Integer.MAX_VALUE) {
476       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
477         Integer.MAX_VALUE);
478     }
479     int keylength = (int)longkeylength;
480     // Value length
481     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
482       throw new IllegalArgumentException("Valuer > " +
483           HConstants.MAXIMUM_VALUE_LENGTH);
484     }
485 
486     // Allocate right-sized byte array.
487     byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
488     // Write the correct size markers
489     int pos = 0;
490     pos = Bytes.putInt(bytes, pos, keylength);
491     pos = Bytes.putInt(bytes, pos, vlength);
492     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
493     pos += rlength;
494     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
495     pos += flength + qlength;
496     pos = Bytes.putLong(bytes, pos, timestamp);
497     pos = Bytes.putByte(bytes, pos, type.getCode());
498     return bytes;
499   }
500 
501   /**
502    * Write KeyValue format into a byte array.
503    *
504    * @param row row key
505    * @param roffset row offset
506    * @param rlength row length
507    * @param family family name
508    * @param foffset family offset
509    * @param flength family length
510    * @param qualifier column qualifier
511    * @param qoffset qualifier offset
512    * @param qlength qualifier length
513    * @param timestamp version timestamp
514    * @param type key type
515    * @param value column value
516    * @param voffset value offset
517    * @param vlength value length
518    * @return The newly created byte array.
519    */
520   static byte [] createByteArray(final byte [] row, final int roffset,
521       final int rlength, final byte [] family, final int foffset, int flength,
522       final byte [] qualifier, final int qoffset, int qlength,
523       final long timestamp, final Type type,
524       final byte [] value, final int voffset, int vlength) {
525     if (rlength > Short.MAX_VALUE) {
526       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
527     }
528     if (row == null) {
529       throw new IllegalArgumentException("Row is null");
530     }
531     // Family length
532     flength = family == null ? 0 : flength;
533     if (flength > Byte.MAX_VALUE) {
534       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
535     }
536     // Qualifier length
537     qlength = qualifier == null ? 0 : qlength;
538     if (qlength > Integer.MAX_VALUE - rlength - flength) {
539       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
540     }
541     // Key length
542     long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
543     if (longkeylength > Integer.MAX_VALUE) {
544       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
545         Integer.MAX_VALUE);
546     }
547     int keylength = (int)longkeylength;
548     // Value length
549     vlength = value == null? 0 : vlength;
550     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
551       throw new IllegalArgumentException("Valuer > " +
552           HConstants.MAXIMUM_VALUE_LENGTH);
553     }
554 
555     // Allocate right-sized byte array.
556     byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength];
557     // Write key, value and key row length.
558     int pos = 0;
559     pos = Bytes.putInt(bytes, pos, keylength);
560     pos = Bytes.putInt(bytes, pos, vlength);
561     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
562     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
563     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
564     if(flength != 0) {
565       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
566     }
567     if(qlength != 0) {
568       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
569     }
570     pos = Bytes.putLong(bytes, pos, timestamp);
571     pos = Bytes.putByte(bytes, pos, type.getCode());
572     if (value != null && value.length > 0) {
573       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
574     }
575     return bytes;
576   }
577 
578   /**
579    * Write KeyValue format into a byte array.
580    * <p>
581    * Takes column in the form <code>family:qualifier</code>
582    * @param row - row key (arbitrary byte array)
583    * @param roffset
584    * @param rlength
585    * @param column
586    * @param coffset
587    * @param clength
588    * @param timestamp
589    * @param type
590    * @param value
591    * @param voffset
592    * @param vlength
593    * @return The newly created byte array.
594    */
595   static byte [] createByteArray(final byte [] row, final int roffset,
596         final int rlength,
597       final byte [] column, final int coffset, int clength,
598       final long timestamp, final Type type,
599       final byte [] value, final int voffset, int vlength) {
600     // If column is non-null, figure where the delimiter is at.
601     int delimiteroffset = 0;
602     if (column != null && column.length > 0) {
603       delimiteroffset = getFamilyDelimiterIndex(column, coffset, clength);
604       if (delimiteroffset > Byte.MAX_VALUE) {
605         throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
606       }
607     } else {
608       return createByteArray(row,roffset,rlength,null,0,0,null,0,0,timestamp,
609           type,value,voffset,vlength);
610     }
611     int flength = delimiteroffset-coffset;
612     int qlength = clength - flength - 1;
613     return createByteArray(row, roffset, rlength, column, coffset,
614         flength, column, delimiteroffset+1, qlength, timestamp, type,
615         value, voffset, vlength);
616   }
617 
618   // Needed doing 'contains' on List.  Only compares the key portion, not the
619   // value.
620   @Override
621   public boolean equals(Object other) {
622     if (!(other instanceof KeyValue)) {
623       return false;
624     }
625     KeyValue kv = (KeyValue)other;
626     // Comparing bytes should be fine doing equals test.  Shouldn't have to
627     // worry about special .META. comparators doing straight equals.
628     return Bytes.equals(getBuffer(), getKeyOffset(), getKeyLength(),
629       kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
630   }
631 
632   @Override
633   public int hashCode() {
634     byte[] b = getBuffer();
635     int start = getOffset(), end = getOffset() + getLength();
636     int h = b[start++];
637     for (int i = start; i < end; i++) {
638       h = (h * 13) ^ b[i];
639     }
640     return h;
641   }
642 
643   //---------------------------------------------------------------------------
644   //
645   //  KeyValue cloning
646   //
647   //---------------------------------------------------------------------------
648 
649   /**
650    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
651    * @return Fully copied clone of this KeyValue
652    */
653   public KeyValue clone() {
654     byte [] b = new byte[this.length];
655     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
656     KeyValue ret = new KeyValue(b, 0, b.length);
657     // Important to clone the memstoreTS as well - otherwise memstore's
658     // update-in-place methods (eg increment) will end up creating
659     // new entries
660     ret.setMemstoreTS(memstoreTS);
661     return ret;
662   }
663 
664   /**
665    * Creates a deep copy of this KeyValue, re-allocating the buffer.
666    * Same function as {@link #clone()}.  Added for clarity vs shallowCopy()
667    * @return Deep copy of this KeyValue
668    */
669   public KeyValue deepCopy() {
670     return clone();
671   }
672 
673   /**
674    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
675    * http://en.wikipedia.org/wiki/Object_copy
676    * @return Shallow copy of this KeyValue
677    */
678   public KeyValue shallowCopy() {
679     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
680     shallowCopy.setMemstoreTS(this.memstoreTS);
681     return shallowCopy;
682   }
683 
684   //---------------------------------------------------------------------------
685   //
686   //  String representation
687   //
688   //---------------------------------------------------------------------------
689 
690   public String toString() {
691     if (this.bytes == null || this.bytes.length == 0) {
692       return "empty";
693     }
694     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
695       "/vlen=" + getValueLength() + "/ts=" + memstoreTS;
696   }
697 
698   /**
699    * @param k Key portion of a KeyValue.
700    * @return Key as a String.
701    */
702   public static String keyToString(final byte [] k) {
703     if (k == null) {
704       return "";
705     }
706     return keyToString(k, 0, k.length);
707   }
708 
709   /**
710    * Use for logging.
711    * @param b Key portion of a KeyValue.
712    * @param o Offset to start of key
713    * @param l Length of key.
714    * @return Key as a String.
715    */
716   /**
717    * Produces a string map for this key/value pair. Useful for programmatic use
718    * and manipulation of the data stored in an HLogKey, for example, printing
719    * as JSON. Values are left out due to their tendency to be large. If needed,
720    * they can be added manually.
721    *
722    * @return the Map<String,?> containing data from this key
723    */
724   public Map<String, Object> toStringMap() {
725     Map<String, Object> stringMap = new HashMap<String, Object>();
726     stringMap.put("row", Bytes.toStringBinary(getRow()));
727     stringMap.put("family", Bytes.toStringBinary(getFamily()));
728     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
729     stringMap.put("timestamp", getTimestamp());
730     stringMap.put("vlen", getValueLength());
731     return stringMap;
732   }
733 
734   public static String keyToString(final byte [] b, final int o, final int l) {
735     if (b == null) return "";
736     int rowlength = Bytes.toShort(b, o);
737     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
738     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
739     int familylength = b[columnoffset - 1];
740     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
741     String family = familylength == 0? "":
742       Bytes.toStringBinary(b, columnoffset, familylength);
743     String qualifier = columnlength == 0? "":
744       Bytes.toStringBinary(b, columnoffset + familylength,
745       columnlength - familylength);
746     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
747     String timestampStr = humanReadableTimestamp(timestamp);
748     byte type = b[o + l - 1];
749     return row + "/" + family +
750       (family != null && family.length() > 0? ":" :"") +
751       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
752   }
753 
754   public static String humanReadableTimestamp(final long timestamp) {
755     if (timestamp == HConstants.LATEST_TIMESTAMP) {
756       return "LATEST_TIMESTAMP";
757     }
758     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
759       return "OLDEST_TIMESTAMP";
760     }
761     return String.valueOf(timestamp);
762   }
763 
764   //---------------------------------------------------------------------------
765   //
766   //  Public Member Accessors
767   //
768   //---------------------------------------------------------------------------
769 
770   /**
771    * @return The byte array backing this KeyValue.
772    */
773   public byte [] getBuffer() {
774     return this.bytes;
775   }
776 
777   /**
778    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
779    */
780   public int getOffset() {
781     return this.offset;
782   }
783 
784   /**
785    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
786    */
787   public int getLength() {
788     return length;
789   }
790 
791   //---------------------------------------------------------------------------
792   //
793   //  Length and Offset Calculators
794   //
795   //---------------------------------------------------------------------------
796 
797   /**
798    * Determines the total length of the KeyValue stored in the specified
799    * byte array and offset.  Includes all headers.
800    * @param bytes byte array
801    * @param offset offset to start of the KeyValue
802    * @return length of entire KeyValue, in bytes
803    */
804   private static int getLength(byte [] bytes, int offset) {
805     return ROW_OFFSET +
806         Bytes.toInt(bytes, offset) +
807         Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
808   }
809 
810   /**
811    * @return Key offset in backing buffer..
812    */
813   public int getKeyOffset() {
814     return this.offset + ROW_OFFSET;
815   }
816 
817   public String getKeyString() {
818     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
819   }
820 
821   /**
822    * @return Length of key portion.
823    */
824   public int getKeyLength() {
825     return Bytes.toInt(this.bytes, this.offset);
826   }
827 
828   /**
829    * @return Value offset
830    */
831   public int getValueOffset() {
832     return getKeyOffset() + getKeyLength();
833   }
834 
835   /**
836    * @return Value length
837    */
838   public int getValueLength() {
839     return Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
840   }
841 
842   /**
843    * @return Row offset
844    */
845   public int getRowOffset() {
846     return getKeyOffset() + Bytes.SIZEOF_SHORT;
847   }
848 
849   /**
850    * @return Row length
851    */
852   public short getRowLength() {
853     return Bytes.toShort(this.bytes, getKeyOffset());
854   }
855 
856   /**
857    * @return Family offset
858    */
859   public int getFamilyOffset() {
860     return getFamilyOffset(getRowLength());
861   }
862 
863   /**
864    * @return Family offset
865    */
866   public int getFamilyOffset(int rlength) {
867     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
868   }
869 
870   /**
871    * @return Family length
872    */
873   public byte getFamilyLength() {
874     return getFamilyLength(getFamilyOffset());
875   }
876 
877   /**
878    * @return Family length
879    */
880   public byte getFamilyLength(int foffset) {
881     return this.bytes[foffset-1];
882   }
883 
884   /**
885    * @return Qualifier offset
886    */
887   public int getQualifierOffset() {
888     return getQualifierOffset(getFamilyOffset());
889   }
890 
891   /**
892    * @return Qualifier offset
893    */
894   public int getQualifierOffset(int foffset) {
895     return foffset + getFamilyLength(foffset);
896   }
897 
898   /**
899    * @return Qualifier length
900    */
901   public int getQualifierLength() {
902     return getQualifierLength(getRowLength(),getFamilyLength());
903   }
904 
905   /**
906    * @return Qualifier length
907    */
908   public int getQualifierLength(int rlength, int flength) {
909     return getKeyLength() -
910       (KEY_INFRASTRUCTURE_SIZE + rlength + flength);
911   }
912 
913   /**
914    * @return Column (family + qualifier) length
915    */
916   public int getTotalColumnLength() {
917     int rlength = getRowLength();
918     int foffset = getFamilyOffset(rlength);
919     return getTotalColumnLength(rlength,foffset);
920   }
921 
922   /**
923    * @return Column (family + qualifier) length
924    */
925   public int getTotalColumnLength(int rlength, int foffset) {
926     int flength = getFamilyLength(foffset);
927     int qlength = getQualifierLength(rlength,flength);
928     return flength + qlength;
929   }
930 
931   /**
932    * @return Timestamp offset
933    */
934   public int getTimestampOffset() {
935     return getTimestampOffset(getKeyLength());
936   }
937 
938   /**
939    * @param keylength Pass if you have it to save on a int creation.
940    * @return Timestamp offset
941    */
942   public int getTimestampOffset(final int keylength) {
943     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
944   }
945 
946   /**
947    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
948    */
949   public boolean isLatestTimestamp() {
950     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
951       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
952   }
953 
954   /**
955    * @return True if this is a "fake" KV created for internal seeking purposes,
956    * which should not be seen by user code
957    */
958   public boolean isInternal() {
959     byte type = getType();
960     return type == Type.Minimum.code || type == Type.Maximum.code;
961   }
962   /**
963    * @param now Time to set into <code>this</code> IFF timestamp ==
964    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
965    * @return True is we modified this.
966    */
967   public boolean updateLatestStamp(final byte [] now) {
968     if (this.isLatestTimestamp()) {
969       int tsOffset = getTimestampOffset();
970       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
971       // clear cache or else getTimestamp() possibly returns an old value
972       return true;
973     }
974     return false;
975   }
976 
977   //---------------------------------------------------------------------------
978   //
979   //  Methods that return copies of fields
980   //
981   //---------------------------------------------------------------------------
982 
983   /**
984    * Do not use unless you have to.  Used internally for compacting and testing.
985    *
986    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
987    * {@link #getValue()} if accessing a KeyValue client-side.
988    * @return Copy of the key portion only.
989    */
990   public byte [] getKey() {
991     int keylength = getKeyLength();
992     byte [] key = new byte[keylength];
993     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
994     return key;
995   }
996 
997   /**
998    * Returns value in a new byte array.
999    * Primarily for use client-side. If server-side, use
1000    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1001    * save on allocations.
1002    * @return Value in a new byte array.
1003    */
1004   public byte [] getValue() {
1005     int o = getValueOffset();
1006     int l = getValueLength();
1007     byte [] result = new byte[l];
1008     System.arraycopy(getBuffer(), o, result, 0, l);
1009     return result;
1010   }
1011 
1012   /**
1013    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1014    * byte array.<p>
1015    *
1016    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1017    * lengths instead.
1018    * @return Row in a new byte array.
1019    */
1020   public byte [] getRow() {
1021     int o = getRowOffset();
1022     short l = getRowLength();
1023     byte result[] = new byte[l];
1024     System.arraycopy(getBuffer(), o, result, 0, l);
1025     return result;
1026   }
1027 
1028   /**
1029    *
1030    * @return Timestamp
1031    */
1032   public long getTimestamp() {
1033     return getTimestamp(getKeyLength());
1034   }
1035 
1036   /**
1037    * @param keylength Pass if you have it to save on a int creation.
1038    * @return Timestamp
1039    */
1040   long getTimestamp(final int keylength) {
1041     int tsOffset = getTimestampOffset(keylength);
1042     return Bytes.toLong(this.bytes, tsOffset);
1043   }
1044 
1045   /**
1046    * @return Type of this KeyValue.
1047    */
1048   public byte getType() {
1049     return getType(getKeyLength());
1050   }
1051 
1052   /**
1053    * @param keylength Pass if you have it to save on a int creation.
1054    * @return Type of this KeyValue.
1055    */
1056   byte getType(final int keylength) {
1057     return this.bytes[this.offset + keylength - 1 + ROW_OFFSET];
1058   }
1059 
1060   /**
1061    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1062    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1063    * KeyValue type.
1064    */
1065   public boolean isDelete() {
1066     return KeyValue.isDelete(getType());
1067   }
1068 
1069   /**
1070    * @return True if this KV is a {@link KeyValue.Type#Delete} type.
1071    */
1072   public boolean isDeleteType() {
1073     // TODO: Fix this method name vis-a-vis isDelete!
1074     return getType() == Type.Delete.getCode();
1075   }
1076 
1077   /**
1078    * @return True if this KV is a delete family type.
1079    */
1080   public boolean isDeleteFamily() {
1081     return getType() == Type.DeleteFamily.getCode();
1082   }
1083 
1084   /**
1085    *
1086    * @return True if this KV is a delete family or column type.
1087    */
1088   public boolean isDeleteColumnOrFamily() {
1089     int t = getType();
1090     return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
1091   }
1092 
1093   /**
1094    * Primarily for use client-side.  Returns the family of this KeyValue in a
1095    * new byte array.<p>
1096    *
1097    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1098    * lengths instead.
1099    * @return Returns family. Makes a copy.
1100    */
1101   public byte [] getFamily() {
1102     int o = getFamilyOffset();
1103     int l = getFamilyLength(o);
1104     byte [] result = new byte[l];
1105     System.arraycopy(this.bytes, o, result, 0, l);
1106     return result;
1107   }
1108 
1109   /**
1110    * Primarily for use client-side.  Returns the column qualifier of this
1111    * KeyValue in a new byte array.<p>
1112    *
1113    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1114    * lengths instead.
1115    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1116    * @return Returns qualifier. Makes a copy.
1117    */
1118   public byte [] getQualifier() {
1119     int o = getQualifierOffset();
1120     int l = getQualifierLength();
1121     byte [] result = new byte[l];
1122     System.arraycopy(this.bytes, o, result, 0, l);
1123     return result;
1124   }
1125 
1126   //---------------------------------------------------------------------------
1127   //
1128   //  KeyValue splitter
1129   //
1130   //---------------------------------------------------------------------------
1131 
1132   /**
1133    * Utility class that splits a KeyValue buffer into separate byte arrays.
1134    * <p>
1135    * Should get rid of this if we can, but is very useful for debugging.
1136    */
1137   public static class SplitKeyValue {
1138     private byte [][] split;
1139     SplitKeyValue() {
1140       this.split = new byte[6][];
1141     }
1142     public void setRow(byte [] value) { this.split[0] = value; }
1143     public void setFamily(byte [] value) { this.split[1] = value; }
1144     public void setQualifier(byte [] value) { this.split[2] = value; }
1145     public void setTimestamp(byte [] value) { this.split[3] = value; }
1146     public void setType(byte [] value) { this.split[4] = value; }
1147     public void setValue(byte [] value) { this.split[5] = value; }
1148     public byte [] getRow() { return this.split[0]; }
1149     public byte [] getFamily() { return this.split[1]; }
1150     public byte [] getQualifier() { return this.split[2]; }
1151     public byte [] getTimestamp() { return this.split[3]; }
1152     public byte [] getType() { return this.split[4]; }
1153     public byte [] getValue() { return this.split[5]; }
1154   }
1155 
1156   public SplitKeyValue split() {
1157     SplitKeyValue split = new SplitKeyValue();
1158     int splitOffset = this.offset;
1159     int keyLen = Bytes.toInt(bytes, splitOffset);
1160     splitOffset += Bytes.SIZEOF_INT;
1161     int valLen = Bytes.toInt(bytes, splitOffset);
1162     splitOffset += Bytes.SIZEOF_INT;
1163     short rowLen = Bytes.toShort(bytes, splitOffset);
1164     splitOffset += Bytes.SIZEOF_SHORT;
1165     byte [] row = new byte[rowLen];
1166     System.arraycopy(bytes, splitOffset, row, 0, rowLen);
1167     splitOffset += rowLen;
1168     split.setRow(row);
1169     byte famLen = bytes[splitOffset];
1170     splitOffset += Bytes.SIZEOF_BYTE;
1171     byte [] family = new byte[famLen];
1172     System.arraycopy(bytes, splitOffset, family, 0, famLen);
1173     splitOffset += famLen;
1174     split.setFamily(family);
1175     int colLen = keyLen -
1176       (rowLen + famLen + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BYTE +
1177       Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE);
1178     byte [] qualifier = new byte[colLen];
1179     System.arraycopy(bytes, splitOffset, qualifier, 0, colLen);
1180     splitOffset += colLen;
1181     split.setQualifier(qualifier);
1182     byte [] timestamp = new byte[Bytes.SIZEOF_LONG];
1183     System.arraycopy(bytes, splitOffset, timestamp, 0, Bytes.SIZEOF_LONG);
1184     splitOffset += Bytes.SIZEOF_LONG;
1185     split.setTimestamp(timestamp);
1186     byte [] type = new byte[1];
1187     type[0] = bytes[splitOffset];
1188     splitOffset += Bytes.SIZEOF_BYTE;
1189     split.setType(type);
1190     byte [] value = new byte[valLen];
1191     System.arraycopy(bytes, splitOffset, value, 0, valLen);
1192     split.setValue(value);
1193     return split;
1194   }
1195 
1196   //---------------------------------------------------------------------------
1197   //
1198   //  Compare specified fields against those contained in this KeyValue
1199   //
1200   //---------------------------------------------------------------------------
1201 
1202   /**
1203    * @param family
1204    * @return True if matching families.
1205    */
1206   public boolean matchingFamily(final byte [] family) {
1207     return matchingFamily(family, 0, family.length);
1208   }
1209 
1210   public boolean matchingFamily(final byte[] family, int offset, int length) {
1211     if (this.length == 0 || this.bytes.length == 0) {
1212       return false;
1213     }
1214     return Bytes.equals(family, offset, length,
1215         this.bytes, getFamilyOffset(), getFamilyLength());
1216   }
1217 
1218   public boolean matchingFamily(final KeyValue other) {
1219     return matchingFamily(other.getBuffer(), other.getFamilyOffset(),
1220         other.getFamilyLength());
1221   }
1222 
1223   /**
1224    * @param qualifier
1225    * @return True if matching qualifiers.
1226    */
1227   public boolean matchingQualifier(final byte [] qualifier) {
1228     return matchingQualifier(qualifier, 0, qualifier.length);
1229   }
1230 
1231   public boolean matchingQualifier(final byte [] qualifier, int offset, int length) {
1232     return Bytes.equals(qualifier, offset, length,
1233         this.bytes, getQualifierOffset(), getQualifierLength());
1234   }
1235 
1236   public boolean matchingQualifier(final KeyValue other) {
1237     return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
1238         other.getQualifierLength());
1239   }
1240 
1241   public boolean matchingRow(final byte [] row) {
1242     return matchingRow(row, 0, row.length);
1243   }
1244 
1245   public boolean matchingRow(final byte[] row, int offset, int length) {
1246     return Bytes.equals(row, offset, length,
1247         this.bytes, getRowOffset(), getRowLength());
1248   }
1249 
1250   public boolean matchingRow(KeyValue other) {
1251     return matchingRow(other.getBuffer(), other.getRowOffset(),
1252         other.getRowLength());
1253   }
1254 
1255   /**
1256    * @param column Column minus its delimiter
1257    * @return True if column matches.
1258    */
1259   public boolean matchingColumnNoDelimiter(final byte [] column) {
1260     int rl = getRowLength();
1261     int o = getFamilyOffset(rl);
1262     int fl = getFamilyLength(o);
1263     int l = fl + getQualifierLength(rl,fl);
1264     return Bytes.equals(column, 0, column.length, this.bytes, o, l);
1265   }
1266 
1267   /**
1268    *
1269    * @param family column family
1270    * @param qualifier column qualifier
1271    * @return True if column matches
1272    */
1273   public boolean matchingColumn(final byte[] family, final byte[] qualifier) {
1274     int rl = getRowLength();
1275     int o = getFamilyOffset(rl);
1276     int fl = getFamilyLength(o);
1277     int ql = getQualifierLength(rl,fl);
1278     if (!Bytes.equals(family, 0, family.length, this.bytes, o, fl)) {
1279       return false;
1280     }
1281     if (qualifier == null || qualifier.length == 0) {
1282       if (ql == 0) {
1283         return true;
1284       }
1285       return false;
1286     }
1287     return Bytes.equals(qualifier, 0, qualifier.length,
1288         this.bytes, o + fl, ql);
1289   }
1290 
1291   /**
1292    * @param left
1293    * @param loffset
1294    * @param llength
1295    * @param lfamilylength Offset of family delimiter in left column.
1296    * @param right
1297    * @param roffset
1298    * @param rlength
1299    * @param rfamilylength Offset of family delimiter in right column.
1300    * @return The result of the comparison.
1301    */
1302   static int compareColumns(final byte [] left, final int loffset,
1303       final int llength, final int lfamilylength,
1304       final byte [] right, final int roffset, final int rlength,
1305       final int rfamilylength) {
1306     // Compare family portion first.
1307     int diff = Bytes.compareTo(left, loffset, lfamilylength,
1308       right, roffset, rfamilylength);
1309     if (diff != 0) {
1310       return diff;
1311     }
1312     // Compare qualifier portion
1313     return Bytes.compareTo(left, loffset + lfamilylength,
1314       llength - lfamilylength,
1315       right, roffset + rfamilylength, rlength - rfamilylength);
1316   }
1317 
1318   /**
1319    * @return True if non-null row and column.
1320    */
1321   public boolean nonNullRowAndColumn() {
1322     return getRowLength() > 0 && !isEmptyColumn();
1323   }
1324 
1325   /**
1326    * @return True if column is empty.
1327    */
1328   public boolean isEmptyColumn() {
1329     return getQualifierLength() == 0;
1330   }
1331 
1332   /**
1333    * Creates a new KeyValue that only contains the key portion (the value is
1334    * set to be null).
1335    * @param lenAsVal replace value with the actual value length (false=empty)
1336    */
1337   public KeyValue createKeyOnly(boolean lenAsVal) {
1338     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1339     // Rebuild as: <keylen:4><0:4><key:keylen>
1340     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1341     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1342     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1343         Math.min(newBuffer.length,this.length));
1344     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1345     if (lenAsVal) {
1346       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1347     }
1348     return new KeyValue(newBuffer);
1349   }
1350 
1351   /**
1352    * Splits a column in family:qualifier form into separate byte arrays.
1353    * <p>
1354    * Not recommend to be used as this is old-style API.
1355    * @param c  The column.
1356    * @return The parsed column.
1357    */
1358   public static byte [][] parseColumn(byte [] c) {
1359     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1360     if (index == -1) {
1361       // If no delimiter, return array of size 1
1362       return new byte [][] { c };
1363     } else if(index == c.length - 1) {
1364       // Only a family, return array size 1
1365       byte [] family = new byte[c.length-1];
1366       System.arraycopy(c, 0, family, 0, family.length);
1367       return new byte [][] { family };
1368     }
1369     // Family and column, return array size 2
1370     final byte [][] result = new byte [2][];
1371     result[0] = new byte [index];
1372     System.arraycopy(c, 0, result[0], 0, index);
1373     final int len = c.length - (index + 1);
1374     result[1] = new byte[len];
1375     System.arraycopy(c, index + 1 /*Skip delimiter*/, result[1], 0,
1376       len);
1377     return result;
1378   }
1379 
1380   /**
1381    * Makes a column in family:qualifier form from separate byte arrays.
1382    * <p>
1383    * Not recommended for usage as this is old-style API.
1384    * @param family
1385    * @param qualifier
1386    * @return family:qualifier
1387    */
1388   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1389     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1390   }
1391 
1392   /**
1393    * @param b
1394    * @return Index of the family-qualifier colon delimiter character in passed
1395    * buffer.
1396    */
1397   public static int getFamilyDelimiterIndex(final byte [] b, final int offset,
1398       final int length) {
1399     return getRequiredDelimiter(b, offset, length, COLUMN_FAMILY_DELIMITER);
1400   }
1401 
1402   private static int getRequiredDelimiter(final byte [] b,
1403       final int offset, final int length, final int delimiter) {
1404     int index = getDelimiter(b, offset, length, delimiter);
1405     if (index < 0) {
1406       throw new IllegalArgumentException("No " + (char)delimiter + " in <" +
1407         Bytes.toString(b) + ">" + ", length=" + length + ", offset=" + offset);
1408     }
1409     return index;
1410   }
1411 
1412   /**
1413    * This function is only used in Meta key comparisons so its error message
1414    * is specific for meta key errors.
1415    */
1416   static int getRequiredDelimiterInReverse(final byte [] b,
1417       final int offset, final int length, final int delimiter) {
1418     int index = getDelimiterInReverse(b, offset, length, delimiter);
1419     if (index < 0) {
1420       throw new IllegalArgumentException(".META. key must have two '" + (char)delimiter + "' "
1421         + "delimiters and have the following format: '<table>,<key>,<etc>'");
1422     }
1423     return index;
1424   }
1425 
1426   /**
1427    * @param b
1428    * @param delimiter
1429    * @return Index of delimiter having started from start of <code>b</code>
1430    * moving rightward.
1431    */
1432   public static int getDelimiter(final byte [] b, int offset, final int length,
1433       final int delimiter) {
1434     if (b == null) {
1435       throw new IllegalArgumentException("Passed buffer is null");
1436     }
1437     int result = -1;
1438     for (int i = offset; i < length + offset; i++) {
1439       if (b[i] == delimiter) {
1440         result = i;
1441         break;
1442       }
1443     }
1444     return result;
1445   }
1446 
1447   /**
1448    * Find index of passed delimiter walking from end of buffer backwards.
1449    * @param b
1450    * @param delimiter
1451    * @return Index of delimiter
1452    */
1453   public static int getDelimiterInReverse(final byte [] b, final int offset,
1454       final int length, final int delimiter) {
1455     if (b == null) {
1456       throw new IllegalArgumentException("Passed buffer is null");
1457     }
1458     int result = -1;
1459     for (int i = (offset + length) - 1; i >= offset; i--) {
1460       if (b[i] == delimiter) {
1461         result = i;
1462         break;
1463       }
1464     }
1465     return result;
1466   }
1467 
1468   /**
1469    * A {@link KVComparator} for <code>-ROOT-</code> catalog table
1470    * {@link KeyValue}s.
1471    */
1472   public static class RootComparator extends MetaComparator {
1473     private final KeyComparator rawcomparator = new RootKeyComparator();
1474 
1475     public KeyComparator getRawComparator() {
1476       return this.rawcomparator;
1477     }
1478 
1479     @Override
1480     protected Object clone() throws CloneNotSupportedException {
1481       return new RootComparator();
1482     }
1483   }
1484 
1485   /**
1486    * A {@link KVComparator} for <code>.META.</code> catalog table
1487    * {@link KeyValue}s.
1488    */
1489   public static class MetaComparator extends KVComparator {
1490     private final KeyComparator rawcomparator = new MetaKeyComparator();
1491 
1492     public KeyComparator getRawComparator() {
1493       return this.rawcomparator;
1494     }
1495 
1496     @Override
1497     protected Object clone() throws CloneNotSupportedException {
1498       return new MetaComparator();
1499     }
1500   }
1501 
1502   /**
1503    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1504    * portion.  This means two KeyValues with same Key but different Values are
1505    * considered the same as far as this Comparator is concerned.
1506    * Hosts a {@link KeyComparator}.
1507    */
1508   public static class KVComparator implements java.util.Comparator<KeyValue> {
1509     private final KeyComparator rawcomparator = new KeyComparator();
1510 
1511     /**
1512      * @return RawComparator that can compare the Key portion of a KeyValue.
1513      * Used in hfile where indices are the Key portion of a KeyValue.
1514      */
1515     public KeyComparator getRawComparator() {
1516       return this.rawcomparator;
1517     }
1518 
1519     public int compare(final KeyValue left, final KeyValue right) {
1520       int ret = getRawComparator().compare(left.getBuffer(),
1521           left.getOffset() + ROW_OFFSET, left.getKeyLength(),
1522           right.getBuffer(), right.getOffset() + ROW_OFFSET,
1523           right.getKeyLength());
1524       if (ret != 0) return ret;
1525       // Negate this comparison so later edits show up first
1526       return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS());
1527     }
1528 
1529     public int compareTimestamps(final KeyValue left, final KeyValue right) {
1530       return compareTimestamps(left, left.getKeyLength(), right,
1531         right.getKeyLength());
1532     }
1533 
1534     int compareTimestamps(final KeyValue left, final int lkeylength,
1535         final KeyValue right, final int rkeylength) {
1536       // Compare timestamps
1537       long ltimestamp = left.getTimestamp(lkeylength);
1538       long rtimestamp = right.getTimestamp(rkeylength);
1539       return getRawComparator().compareTimestamps(ltimestamp, rtimestamp);
1540     }
1541 
1542     /**
1543      * @param left
1544      * @param right
1545      * @return Result comparing rows.
1546      */
1547     public int compareRows(final KeyValue left, final KeyValue right) {
1548       return compareRows(left, left.getRowLength(), right,
1549           right.getRowLength());
1550     }
1551 
1552     /**
1553      * @param left
1554      * @param lrowlength Length of left row.
1555      * @param right
1556      * @param rrowlength Length of right row.
1557      * @return Result comparing rows.
1558      */
1559     public int compareRows(final KeyValue left, final short lrowlength,
1560         final KeyValue right, final short rrowlength) {
1561       return getRawComparator().compareRows(left.getBuffer(),
1562           left.getRowOffset(), lrowlength,
1563         right.getBuffer(), right.getRowOffset(), rrowlength);
1564     }
1565 
1566     /**
1567      * @param left
1568      * @param row - row key (arbitrary byte array)
1569      * @return RawComparator
1570      */
1571     public int compareRows(final KeyValue left, final byte [] row) {
1572       return getRawComparator().compareRows(left.getBuffer(),
1573           left.getRowOffset(), left.getRowLength(), row, 0, row.length);
1574     }
1575 
1576     public int compareRows(byte [] left, int loffset, int llength,
1577         byte [] right, int roffset, int rlength) {
1578       return getRawComparator().compareRows(left, loffset, llength,
1579         right, roffset, rlength);
1580     }
1581 
1582     public int compareColumns(final KeyValue left, final byte [] right,
1583         final int roffset, final int rlength, final int rfamilyoffset) {
1584       int offset = left.getFamilyOffset();
1585       int length = left.getFamilyLength() + left.getQualifierLength();
1586       return getRawComparator().compareColumns(left.getBuffer(), offset, length,
1587         left.getFamilyLength(offset),
1588         right, roffset, rlength, rfamilyoffset);
1589     }
1590 
1591     int compareColumns(final KeyValue left, final short lrowlength,
1592         final KeyValue right, final short rrowlength) {
1593       int lfoffset = left.getFamilyOffset(lrowlength);
1594       int rfoffset = right.getFamilyOffset(rrowlength);
1595       int lclength = left.getTotalColumnLength(lrowlength,lfoffset);
1596       int rclength = right.getTotalColumnLength(rrowlength, rfoffset);
1597       int lfamilylength = left.getFamilyLength(lfoffset);
1598       int rfamilylength = right.getFamilyLength(rfoffset);
1599       return getRawComparator().compareColumns(left.getBuffer(), lfoffset,
1600           lclength, lfamilylength,
1601         right.getBuffer(), rfoffset, rclength, rfamilylength);
1602     }
1603 
1604     /**
1605      * Compares the row and column of two keyvalues for equality
1606      * @param left
1607      * @param right
1608      * @return True if same row and column.
1609      */
1610     public boolean matchingRowColumn(final KeyValue left,
1611         final KeyValue right) {
1612       short lrowlength = left.getRowLength();
1613       short rrowlength = right.getRowLength();
1614       // TsOffset = end of column data. just comparing Row+CF length of each
1615       return ((left.getTimestampOffset() - left.getOffset()) ==
1616               (right.getTimestampOffset() - right.getOffset())) &&
1617         matchingRows(left, lrowlength, right, rrowlength) &&
1618         compareColumns(left, lrowlength, right, rrowlength) == 0;
1619     }
1620 
1621     /**
1622      * @param left
1623      * @param right
1624      * @return True if rows match.
1625      */
1626     public boolean matchingRows(final KeyValue left, final byte [] right) {
1627       return Bytes.equals(left.getBuffer(), left.getRowOffset(), left.getRowLength(),
1628           right, 0, right.length);
1629     }
1630 
1631     /**
1632      * Compares the row of two keyvalues for equality
1633      * @param left
1634      * @param right
1635      * @return True if rows match.
1636      */
1637     public boolean matchingRows(final KeyValue left, final KeyValue right) {
1638       short lrowlength = left.getRowLength();
1639       short rrowlength = right.getRowLength();
1640       return matchingRows(left, lrowlength, right, rrowlength);
1641     }
1642 
1643     /**
1644      * @param left
1645      * @param lrowlength
1646      * @param right
1647      * @param rrowlength
1648      * @return True if rows match.
1649      */
1650     public boolean matchingRows(final KeyValue left, final short lrowlength,
1651         final KeyValue right, final short rrowlength) {
1652       return lrowlength == rrowlength &&
1653           Bytes.equals(left.getBuffer(), left.getRowOffset(), lrowlength,
1654               right.getBuffer(), right.getRowOffset(), rrowlength);
1655     }
1656 
1657     public boolean matchingRows(final byte [] left, final int loffset,
1658         final int llength,
1659         final byte [] right, final int roffset, final int rlength) {
1660       return Bytes.equals(left, loffset, llength,
1661           right, roffset, rlength);
1662     }
1663 
1664     /**
1665      * Compares the row and timestamp of two keys
1666      * Was called matchesWithoutColumn in HStoreKey.
1667      * @param right Key to compare against.
1668      * @return True if same row and timestamp is greater than the timestamp in
1669      * <code>right</code>
1670      */
1671     public boolean matchingRowsGreaterTimestamp(final KeyValue left,
1672         final KeyValue right) {
1673       short lrowlength = left.getRowLength();
1674       short rrowlength = right.getRowLength();
1675       if (!matchingRows(left, lrowlength, right, rrowlength)) {
1676         return false;
1677       }
1678       return left.getTimestamp() >= right.getTimestamp();
1679     }
1680 
1681     @Override
1682     protected Object clone() throws CloneNotSupportedException {
1683       return new KVComparator();
1684     }
1685 
1686     /**
1687      * @return Comparator that ignores timestamps; useful counting versions.
1688      */
1689     public KVComparator getComparatorIgnoringTimestamps() {
1690       KVComparator c = null;
1691       try {
1692         c = (KVComparator)this.clone();
1693         c.getRawComparator().ignoreTimestamp = true;
1694       } catch (CloneNotSupportedException e) {
1695         LOG.error("Not supported", e);
1696       }
1697       return c;
1698     }
1699 
1700     /**
1701      * @return Comparator that ignores key type; useful checking deletes
1702      */
1703     public KVComparator getComparatorIgnoringType() {
1704       KVComparator c = null;
1705       try {
1706         c = (KVComparator)this.clone();
1707         c.getRawComparator().ignoreType = true;
1708       } catch (CloneNotSupportedException e) {
1709         LOG.error("Not supported", e);
1710       }
1711       return c;
1712     }
1713   }
1714 
1715   /**
1716    * Creates a KeyValue that is last on the specified row id. That is,
1717    * every other possible KeyValue for the given row would compareTo()
1718    * less than the result of this call.
1719    * @param row row key
1720    * @return Last possible KeyValue on passed <code>row</code>
1721    */
1722   public static KeyValue createLastOnRow(final byte[] row) {
1723     return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
1724   }
1725 
1726   /**
1727    * Create a KeyValue that is smaller than all other possible KeyValues
1728    * for the given row. That is any (valid) KeyValue on 'row' would sort
1729    * _after_ the result.
1730    *
1731    * @param row - row key (arbitrary byte array)
1732    * @return First possible KeyValue on passed <code>row</code>
1733    */
1734   public static KeyValue createFirstOnRow(final byte [] row) {
1735     return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
1736   }
1737 
1738   /**
1739    * Create a KeyValue that is smaller than all other possible KeyValues
1740    * for the given row. That is any (valid) KeyValue on 'row' would sort
1741    * _after_ the result.
1742    *
1743    * @param row - row key (arbitrary byte array)
1744    * @return First possible KeyValue on passed <code>row</code>
1745    */
1746   public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
1747     return new KeyValue(row, roffset, rlength,
1748         null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
1749   }
1750 
1751   /**
1752    * Creates a KeyValue that is smaller than all other KeyValues that
1753    * are older than the passed timestamp.
1754    * @param row - row key (arbitrary byte array)
1755    * @param ts - timestamp
1756    * @return First possible key on passed <code>row</code> and timestamp.
1757    */
1758   public static KeyValue createFirstOnRow(final byte [] row,
1759       final long ts) {
1760     return new KeyValue(row, null, null, ts, Type.Maximum);
1761   }
1762 
1763   /**
1764    * Create a KeyValue for the specified row, family and qualifier that would be
1765    * smaller than all other possible KeyValues that have the same row,family,qualifier.
1766    * Used for seeking.
1767    * @param row - row key (arbitrary byte array)
1768    * @param family - family name
1769    * @param qualifier - column qualifier
1770    * @return First possible key on passed <code>row</code>, and column.
1771    */
1772   public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
1773       final byte [] qualifier) {
1774     return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
1775   }
1776 
1777   /**
1778    * Create a Delete Family KeyValue for the specified row and family that would
1779    * be smaller than all other possible Delete Family KeyValues that have the
1780    * same row and family.
1781    * Used for seeking.
1782    * @param row - row key (arbitrary byte array)
1783    * @param family - family name
1784    * @return First Delete Family possible key on passed <code>row</code>.
1785    */
1786   public static KeyValue createFirstDeleteFamilyOnRow(final byte [] row,
1787       final byte [] family) {
1788     return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
1789         Type.DeleteFamily);
1790   }
1791 
1792   /**
1793    * @param row - row key (arbitrary byte array)
1794    * @param f - family name
1795    * @param q - column qualifier
1796    * @param ts - timestamp
1797    * @return First possible key on passed <code>row</code>, column and timestamp
1798    */
1799   public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
1800       final byte [] q, final long ts) {
1801     return new KeyValue(row, f, q, ts, Type.Maximum);
1802   }
1803 
1804   /**
1805    * Create a KeyValue for the specified row, family and qualifier that would be
1806    * smaller than all other possible KeyValues that have the same row,
1807    * family, qualifier.
1808    * Used for seeking.
1809    * @param row row key
1810    * @param roffset row offset
1811    * @param rlength row length
1812    * @param family family name
1813    * @param foffset family offset
1814    * @param flength family length
1815    * @param qualifier column qualifier
1816    * @param qoffset qualifier offset
1817    * @param qlength qualifier length
1818    * @return First possible key on passed Row, Family, Qualifier.
1819    */
1820   public static KeyValue createFirstOnRow(final byte [] row,
1821       final int roffset, final int rlength, final byte [] family,
1822       final int foffset, final int flength, final byte [] qualifier,
1823       final int qoffset, final int qlength) {
1824     return new KeyValue(row, roffset, rlength, family,
1825         foffset, flength, qualifier, qoffset, qlength,
1826         HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
1827   }
1828 
1829   /**
1830    * Create a KeyValue for the specified row, family and qualifier that would be
1831    * larger than or equal to all other possible KeyValues that have the same
1832    * row, family, qualifier.
1833    * Used for reseeking.
1834    * @param row row key
1835    * @param roffset row offset
1836    * @param rlength row length
1837    * @param family family name
1838    * @param foffset family offset
1839    * @param flength family length
1840    * @param qualifier column qualifier
1841    * @param qoffset qualifier offset
1842    * @param qlength qualifier length
1843    * @return Last possible key on passed row, family, qualifier.
1844    */
1845   public static KeyValue createLastOnRow(final byte [] row,
1846       final int roffset, final int rlength, final byte [] family,
1847       final int foffset, final int flength, final byte [] qualifier,
1848       final int qoffset, final int qlength) {
1849     return new KeyValue(row, roffset, rlength, family,
1850         foffset, flength, qualifier, qoffset, qlength,
1851         HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
1852   }
1853 
1854   /**
1855    * Similar to {@link #createLastOnRow(byte[], int, int, byte[], int, int,
1856    * byte[], int, int)} but creates the last key on the row/column of this KV
1857    * (the value part of the returned KV is always empty). Used in creating
1858    * "fake keys" for the multi-column Bloom filter optimization to skip the
1859    * row/column we already know is not in the file.
1860    * @return the last key on the row/column of the given key-value pair
1861    */
1862   public KeyValue createLastOnRowCol() {
1863     return new KeyValue(
1864         bytes, getRowOffset(), getRowLength(),
1865         bytes, getFamilyOffset(), getFamilyLength(),
1866         bytes, getQualifierOffset(), getQualifierLength(),
1867         HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
1868   }
1869 
1870   /**
1871    * Creates the first KV with the row/family/qualifier of this KV and the
1872    * given timestamp. Uses the "maximum" KV type that guarantees that the new
1873    * KV is the lowest possible for this combination of row, family, qualifier,
1874    * and timestamp. This KV's own timestamp is ignored. While this function
1875    * copies the value from this KV, it is normally used on key-only KVs.
1876    */
1877   public KeyValue createFirstOnRowColTS(long ts) {
1878     return new KeyValue(
1879         bytes, getRowOffset(), getRowLength(),
1880         bytes, getFamilyOffset(), getFamilyLength(),
1881         bytes, getQualifierOffset(), getQualifierLength(),
1882         ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
1883   }
1884 
1885   /**
1886    * @param b
1887    * @return A KeyValue made of a byte array that holds the key-only part.
1888    * Needed to convert hfile index members to KeyValues.
1889    */
1890   public static KeyValue createKeyValueFromKey(final byte [] b) {
1891     return createKeyValueFromKey(b, 0, b.length);
1892   }
1893 
1894   /**
1895    * @param bb
1896    * @return A KeyValue made of a byte buffer that holds the key-only part.
1897    * Needed to convert hfile index members to KeyValues.
1898    */
1899   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
1900     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
1901   }
1902 
1903   /**
1904    * @param b
1905    * @param o
1906    * @param l
1907    * @return A KeyValue made of a byte array that holds the key-only part.
1908    * Needed to convert hfile index members to KeyValues.
1909    */
1910   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
1911       final int l) {
1912     byte [] newb = new byte[l + ROW_OFFSET];
1913     System.arraycopy(b, o, newb, ROW_OFFSET, l);
1914     Bytes.putInt(newb, 0, l);
1915     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
1916     return new KeyValue(newb);
1917   }
1918 
1919   /**
1920    * Compare key portion of a {@link KeyValue} for keys in <code>-ROOT-<code>
1921    * table.
1922    */
1923   public static class RootKeyComparator extends MetaKeyComparator {
1924     public int compareRows(byte [] left, int loffset, int llength,
1925         byte [] right, int roffset, int rlength) {
1926       // Rows look like this: .META.,ROW_FROM_META,RID
1927       //        LOG.info("ROOT " + Bytes.toString(left, loffset, llength) +
1928       //          "---" + Bytes.toString(right, roffset, rlength));
1929       final int metalength = 7; // '.META.' length
1930       int lmetaOffsetPlusDelimiter = loffset + metalength;
1931       int leftFarDelimiter = getDelimiterInReverse(left,
1932           lmetaOffsetPlusDelimiter,
1933           llength - metalength, HRegionInfo.DELIMITER);
1934       int rmetaOffsetPlusDelimiter = roffset + metalength;
1935       int rightFarDelimiter = getDelimiterInReverse(right,
1936           rmetaOffsetPlusDelimiter, rlength - metalength,
1937           HRegionInfo.DELIMITER);
1938       if (leftFarDelimiter < 0 && rightFarDelimiter >= 0) {
1939         // Nothing between .META. and regionid.  Its first key.
1940         return -1;
1941       } else if (rightFarDelimiter < 0 && leftFarDelimiter >= 0) {
1942         return 1;
1943       } else if (leftFarDelimiter < 0 && rightFarDelimiter < 0) {
1944         return 0;
1945       }
1946       int result = super.compareRows(left, lmetaOffsetPlusDelimiter,
1947           leftFarDelimiter - lmetaOffsetPlusDelimiter,
1948           right, rmetaOffsetPlusDelimiter,
1949           rightFarDelimiter - rmetaOffsetPlusDelimiter);
1950       if (result != 0) {
1951         return result;
1952       }
1953       // Compare last part of row, the rowid.
1954       leftFarDelimiter++;
1955       rightFarDelimiter++;
1956       result = compareRowid(left, leftFarDelimiter,
1957           llength - (leftFarDelimiter - loffset),
1958           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1959       return result;
1960     }
1961   }
1962 
1963   /**
1964    * Comparator that compares row component only of a KeyValue.
1965    */
1966   public static class RowComparator implements Comparator<KeyValue> {
1967     final KVComparator comparator;
1968 
1969     public RowComparator(final KVComparator c) {
1970       this.comparator = c;
1971     }
1972 
1973     public int compare(KeyValue left, KeyValue right) {
1974       return comparator.compareRows(left, right);
1975     }
1976   }
1977 
1978   /**
1979    * Compare key portion of a {@link KeyValue} for keys in <code>.META.</code>
1980    * table.
1981    */
1982   public static class MetaKeyComparator extends KeyComparator {
1983     public int compareRows(byte [] left, int loffset, int llength,
1984         byte [] right, int roffset, int rlength) {
1985       //        LOG.info("META " + Bytes.toString(left, loffset, llength) +
1986       //          "---" + Bytes.toString(right, roffset, rlength));
1987       int leftDelimiter = getDelimiter(left, loffset, llength,
1988           HRegionInfo.DELIMITER);
1989       int rightDelimiter = getDelimiter(right, roffset, rlength,
1990           HRegionInfo.DELIMITER);
1991       if (leftDelimiter < 0 && rightDelimiter >= 0) {
1992         // Nothing between .META. and regionid.  Its first key.
1993         return -1;
1994       } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1995         return 1;
1996       } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1997         return 0;
1998       }
1999       // Compare up to the delimiter
2000       int result = Bytes.compareTo(left, loffset, leftDelimiter - loffset,
2001           right, roffset, rightDelimiter - roffset);
2002       if (result != 0) {
2003         return result;
2004       }
2005       // Compare middle bit of the row.
2006       // Move past delimiter
2007       leftDelimiter++;
2008       rightDelimiter++;
2009       int leftFarDelimiter = getRequiredDelimiterInReverse(left, leftDelimiter,
2010           llength - (leftDelimiter - loffset), HRegionInfo.DELIMITER);
2011       int rightFarDelimiter = getRequiredDelimiterInReverse(right,
2012           rightDelimiter, rlength - (rightDelimiter - roffset),
2013           HRegionInfo.DELIMITER);
2014       // Now compare middlesection of row.
2015       result = super.compareRows(left, leftDelimiter,
2016           leftFarDelimiter - leftDelimiter, right, rightDelimiter,
2017           rightFarDelimiter - rightDelimiter);
2018       if (result != 0) {
2019         return result;
2020       }
2021       // Compare last part of row, the rowid.
2022       leftFarDelimiter++;
2023       rightFarDelimiter++;
2024       result = compareRowid(left, leftFarDelimiter,
2025           llength - (leftFarDelimiter - loffset),
2026           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
2027       return result;
2028     }
2029 
2030     protected int compareRowid(byte[] left, int loffset, int llength,
2031         byte[] right, int roffset, int rlength) {
2032       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2033     }
2034   }
2035 
2036   /**
2037    * Avoids redundant comparisons for better performance.
2038    */
2039   public static interface SamePrefixComparator<T> {
2040     /**
2041      * Compare two keys assuming that the first n bytes are the same.
2042      * @param commonPrefix How many bytes are the same.
2043      */
2044     public int compareIgnoringPrefix(int commonPrefix,
2045         T left, int loffset, int llength,
2046         T right, int roffset, int rlength);
2047   }
2048 
2049   /**
2050    * Compare key portion of a {@link KeyValue}.
2051    */
2052   public static class KeyComparator
2053       implements RawComparator<byte []>, SamePrefixComparator<byte[]> {
2054     volatile boolean ignoreTimestamp = false;
2055     volatile boolean ignoreType = false;
2056 
2057     public int compare(byte[] left, int loffset, int llength, byte[] right,
2058         int roffset, int rlength) {
2059       // Compare row
2060       short lrowlength = Bytes.toShort(left, loffset);
2061       short rrowlength = Bytes.toShort(right, roffset);
2062       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
2063           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
2064       if (compare != 0) {
2065         return compare;
2066       }
2067 
2068       // Compare the rest of the two KVs without making any assumptions about
2069       // the common prefix. This function will not compare rows anyway, so we
2070       // don't need to tell it that the common prefix includes the row.
2071       return compareWithoutRow(0, left, loffset, llength, right, roffset,
2072           rlength, rrowlength);
2073     }
2074 
2075     /**
2076      * Compare the two key-values, ignoring the prefix of the given length
2077      * that is known to be the same between the two.
2078      * @param commonPrefix the prefix length to ignore
2079      */
2080     @Override
2081     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2082         int loffset, int llength, byte[] right, int roffset, int rlength) {
2083       // Compare row
2084       short lrowlength = Bytes.toShort(left, loffset);
2085       short rrowlength;
2086 
2087       int comparisonResult = 0;
2088       if (commonPrefix < ROW_LENGTH_SIZE) {
2089         // almost nothing in common
2090         rrowlength = Bytes.toShort(right, roffset);
2091         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2092             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2093       } else { // the row length is the same
2094         rrowlength = lrowlength;
2095         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2096           // The rows are not the same. Exclude the common prefix and compare
2097           // the rest of the two rows.
2098           int common = commonPrefix - ROW_LENGTH_SIZE;
2099           comparisonResult = compareRows(
2100               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2101               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2102         }
2103       }
2104       if (comparisonResult != 0) {
2105         return comparisonResult;
2106       }
2107 
2108       assert lrowlength == rrowlength;
2109 
2110       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2111           roffset, rlength, lrowlength);
2112     }
2113 
2114     /**
2115      * Compare columnFamily, qualifier, timestamp, and key type (everything
2116      * except the row). This method is used both in the normal comparator and
2117      * the "same-prefix" comparator. Note that we are assuming that row portions
2118      * of both KVs have already been parsed and found identical, and we don't
2119      * validate that assumption here.
2120      * @param commonPrefix
2121      *          the length of the common prefix of the two key-values being
2122      *          compared, including row length and row
2123      */
2124     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2125         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2126       /***
2127        * KeyValue Format and commonLength:
2128        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2129        * ------------------|-------commonLength--------|--------------
2130        */
2131       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2132 
2133       // commonLength + TIMESTAMP_TYPE_SIZE
2134       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2135       // ColumnFamily + Qualifier length.
2136       int lcolumnlength = llength - commonLengthWithTSAndType;
2137       int rcolumnlength = rlength - commonLengthWithTSAndType;
2138 
2139       byte ltype = left[loffset + (llength - 1)];
2140       byte rtype = right[roffset + (rlength - 1)];
2141 
2142       // If the column is not specified, the "minimum" key type appears the
2143       // latest in the sorted order, regardless of the timestamp. This is used
2144       // for specifying the last key/value in a given row, because there is no
2145       // "lexicographically last column" (it would be infinitely long). The
2146       // "maximum" key type does not need this behavior.
2147       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2148         // left is "bigger", i.e. it appears later in the sorted order
2149         return 1;
2150       }
2151       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2152         return -1;
2153       }
2154 
2155       int lfamilyoffset = commonLength + loffset;
2156       int rfamilyoffset = commonLength + roffset;
2157 
2158       // Column family length.
2159       int lfamilylength = left[lfamilyoffset - 1];
2160       int rfamilylength = right[rfamilyoffset - 1];
2161       // If left family size is not equal to right family size, we need not
2162       // compare the qualifiers. 
2163       boolean sameFamilySize = (lfamilylength == rfamilylength);
2164       int common = 0;
2165       if (commonPrefix > 0) {
2166         common = Math.max(0, commonPrefix - commonLength);
2167         if (!sameFamilySize) {
2168           // Common should not be larger than Math.min(lfamilylength,
2169           // rfamilylength).
2170           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2171         } else {
2172           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2173         }
2174       }
2175       if (!sameFamilySize) {
2176         // comparing column family is enough.
2177         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2178             - common, right, rfamilyoffset + common, rfamilylength - common);
2179       }
2180       // Compare family & qualifier together.
2181       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2182           lcolumnlength - common, right, rfamilyoffset + common,
2183           rcolumnlength - common);
2184       if (comparison != 0) {
2185         return comparison;
2186       }
2187       return compareTimestampAndType(left, loffset, llength, right, roffset,
2188           rlength, ltype, rtype);
2189     }
2190 
2191     private int compareTimestampAndType(byte[] left, int loffset, int llength,
2192         byte[] right, int roffset, int rlength, byte ltype, byte rtype) {
2193       int compare;
2194       if (!this.ignoreTimestamp) {
2195         // Get timestamps.
2196         long ltimestamp = Bytes.toLong(left,
2197             loffset + (llength - TIMESTAMP_TYPE_SIZE));
2198         long rtimestamp = Bytes.toLong(right,
2199             roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2200         compare = compareTimestamps(ltimestamp, rtimestamp);
2201         if (compare != 0) {
2202           return compare;
2203         }
2204       }
2205 
2206       if (!this.ignoreType) {
2207         // Compare types. Let the delete types sort ahead of puts; i.e. types
2208         // of higher numbers sort before those of lesser numbers. Maximum (255)
2209         // appears ahead of everything, and minimum (0) appears after
2210         // everything.
2211         return (0xff & rtype) - (0xff & ltype);
2212       }
2213       return 0;
2214     }
2215 
2216     public int compare(byte[] left, byte[] right) {
2217       return compare(left, 0, left.length, right, 0, right.length);
2218     }
2219 
2220     public int compareRows(byte [] left, int loffset, int llength,
2221         byte [] right, int roffset, int rlength) {
2222       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2223     }
2224 
2225     protected int compareColumns(
2226         byte [] left, int loffset, int llength, final int lfamilylength,
2227         byte [] right, int roffset, int rlength, final int rfamilylength) {
2228       return KeyValue.compareColumns(left, loffset, llength, lfamilylength,
2229         right, roffset, rlength, rfamilylength);
2230     }
2231 
2232     int compareTimestamps(final long ltimestamp, final long rtimestamp) {
2233       // The below older timestamps sorting ahead of newer timestamps looks
2234       // wrong but it is intentional. This way, newer timestamps are first
2235       // found when we iterate over a memstore and newer versions are the
2236       // first we trip over when reading from a store file.
2237       if (ltimestamp < rtimestamp) {
2238         return 1;
2239       } else if (ltimestamp > rtimestamp) {
2240         return -1;
2241       }
2242       return 0;
2243     }
2244   }
2245 
2246   // HeapSize
2247   public long heapSize() {
2248     return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE
2249         + ClassSize.align(ClassSize.ARRAY) + ClassSize.align(length)
2250         + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_LONG);
2251   }
2252 
2253   // this overload assumes that the length bytes have already been read,
2254   // and it expects the length of the KeyValue to be explicitly passed
2255   // to it.
2256   public void readFields(int length, final DataInput in) throws IOException {
2257 
2258     if (length <= 0) {
2259       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2260     }
2261 
2262     this.length = length;
2263     this.offset = 0;
2264     this.bytes = new byte[this.length];
2265     in.readFully(this.bytes, 0, this.length);
2266   }
2267 
2268   // Writable
2269   public void readFields(final DataInput in) throws IOException {
2270     int length = in.readInt();
2271     readFields(length, in);
2272   }
2273 
2274   public void write(final DataOutput out) throws IOException {
2275     out.writeInt(this.length);
2276     out.write(this.bytes, this.offset, this.length);
2277   }
2278 }