1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.EOFException;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.BufferOverflowException;
30  import java.nio.ByteBuffer;
31  import java.util.Arrays;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.Map;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.classification.InterfaceAudience;
39  import org.apache.hadoop.classification.InterfaceStability;
40  import org.apache.hadoop.fs.FSDataInputStream;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.ClassSize;
44  import org.apache.hadoop.io.IOUtils;
45  import org.apache.hadoop.io.RawComparator;
46  
47  import com.google.common.primitives.Longs;
48  
49  /**
50   * An HBase Key/Value. This is the fundamental HBase Type.
51   * <p>
52   * If being used client-side, the primary methods to access individual fields are {@link #getRow()},
53   * {@link #getFamily()}, {@link #getQualifier()}, {@link #getTimestamp()}, and {@link #getValue()}.
54   * These methods allocate new byte arrays and return copies. Avoid their use server-side.
55   * <p>
56   * Instances of this class are immutable. They do not implement Comparable but Comparators are
57   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
58   * critical you use the appropriate comparator. There are Comparators for KeyValue instances and
59   * then for just the Key portion of a KeyValue used mostly by HFile.
60   * <p>
61   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
62   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
63   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code> Key is further decomposed as:
64   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier> &lt;timestamp> &lt;keytype></code>
65   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>, column family length maximum
66   * is <code>Byte.MAX_SIZE</code>, and column qualifier + key length must be <
67   * <code>Integer.MAX_SIZE</code>. The column does not contain the family/qualifier delimiter,
68   * {@link #COLUMN_FAMILY_DELIMITER}
69   */
70  @InterfaceAudience.Public
71  @InterfaceStability.Evolving
72  public class KeyValue implements Cell, HeapSize, Cloneable {
73    static final Log LOG = LogFactory.getLog(KeyValue.class);
74    // TODO: Group Key-only comparators and operations into a Key class, just
75    // for neatness sake, if can figure what to call it.
76  
77    /**
78     * Colon character in UTF-8
79     */
80    public static final char COLUMN_FAMILY_DELIMITER = ':';
81  
82    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
83      new byte[]{COLUMN_FAMILY_DELIMITER};
84  
85    /**
86     * Comparator for plain key/values; i.e. non-catalog table key/values.
87     */
88    public static final KVComparator COMPARATOR = new KVComparator();
89  
90    /**
91     * Comparator for plain key; i.e. non-catalog table key.  Works on Key portion
92     * of KeyValue only.
93     */
94    public static final KeyComparator KEY_COMPARATOR = new KeyComparator();
95  
96    /**
97     * A {@link KVComparator} for <code>.META.</code> catalog table
98     * {@link KeyValue}s.
99     */
100   public static final KVComparator META_COMPARATOR = new MetaComparator();
101 
102   /**
103    * A {@link KVComparator} for <code>.META.</code> catalog table
104    * {@link KeyValue} keys.
105    */
106   public static final KeyComparator META_KEY_COMPARATOR = new MetaKeyComparator();
107 
108   /**
109    * A {@link KVComparator} for <code>-ROOT-</code> catalog table
110    * {@link KeyValue}s.
111    */
112   public static final KVComparator ROOT_COMPARATOR = new RootComparator();
113 
114   /**
115    * A {@link KVComparator} for <code>-ROOT-</code> catalog table
116    * {@link KeyValue} keys.
117    */
118   public static final KeyComparator ROOT_KEY_COMPARATOR = new RootKeyComparator();
119 
120   /**
121    * Get the appropriate row comparator for the specified table.
122    *
123    * Hopefully we can get rid of this, I added this here because it's replacing
124    * something in HSK.  We should move completely off of that.
125    *
126    * @param tableName  The table name.
127    * @return The comparator.
128    */
129   public static KeyComparator getRowComparator(byte [] tableName) {
130     if(Bytes.equals(HConstants.ROOT_TABLE_NAME,tableName)) {
131       return ROOT_COMPARATOR.getRawComparator();
132     }
133     if(Bytes.equals(HConstants.META_TABLE_NAME, tableName)) {
134       return META_COMPARATOR.getRawComparator();
135     }
136     return COMPARATOR.getRawComparator();
137   }
138 
139   /** Size of the key length field in bytes*/
140   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
141 
142   /** Size of the key type field in bytes */
143   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
144 
145   /** Size of the row length field in bytes */
146   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
147 
148   /** Size of the family length field in bytes */
149   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
150 
151   /** Size of the timestamp field in bytes */
152   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
153 
154   // Size of the timestamp and type byte on end of a key -- a long + a byte.
155   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
156 
157   // Size of the length shorts and bytes in key.
158   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
159       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
160 
161   // How far into the key the row starts at. First thing to read is the short
162   // that says how long the row is.
163   public static final int ROW_OFFSET =
164     Bytes.SIZEOF_INT /*keylength*/ +
165     Bytes.SIZEOF_INT /*valuelength*/;
166 
167   // Size of the length ints in a KeyValue datastructure.
168   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
169 
170   /**
171    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
172    * characteristics would take up for its underlying data structure.
173    *
174    * @param rlength row length
175    * @param flength family length
176    * @param qlength qualifier length
177    * @param vlength value length
178    *
179    * @return the <code>KeyValue</code> data structure length
180    */
181   public static long getKeyValueDataStructureSize(int rlength,
182       int flength, int qlength, int vlength) {
183     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE +
184             getKeyDataStructureSize(rlength, flength, qlength) + vlength;
185   }
186 
187   /**
188    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
189    * characteristics would take up in its underlying data structure for the key.
190    *
191    * @param rlength row length
192    * @param flength family length
193    * @param qlength qualifier length
194    *
195    * @return the key data structure length
196    */
197   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
198     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
199   }
200 
201   /**
202    * Key type.
203    * Has space for other key types to be added later.  Cannot rely on
204    * enum ordinals . They change if item is removed or moved.  Do our own codes.
205    */
206   public static enum Type {
207     Minimum((byte)0),
208     Put((byte)4),
209 
210     Delete((byte)8),
211     DeleteColumn((byte)12),
212     DeleteFamily((byte)14),
213 
214     // Maximum is used when searching; you look from maximum on down.
215     Maximum((byte)255);
216 
217     private final byte code;
218 
219     Type(final byte c) {
220       this.code = c;
221     }
222 
223     public byte getCode() {
224       return this.code;
225     }
226 
227     /**
228      * Cannot rely on enum ordinals . They change if item is removed or moved.
229      * Do our own codes.
230      * @param b
231      * @return Type associated with passed code.
232      */
233     public static Type codeToType(final byte b) {
234       for (Type t : Type.values()) {
235         if (t.getCode() == b) {
236           return t;
237         }
238       }
239       throw new RuntimeException("Unknown code " + b);
240     }
241   }
242 
243   /**
244    * Lowest possible key.
245    * Makes a Key with highest possible Timestamp, empty row and column.  No
246    * key can be equal or lower than this one in memstore or in store file.
247    */
248   public static final KeyValue LOWESTKEY =
249     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
250 
251   private byte [] bytes = null;
252   private int offset = 0;
253   private int length = 0;
254 
255   /**
256    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
257    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
258    * KeyValue type.
259    */
260   public static boolean isDelete(byte t) {
261     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
262   }
263 
264   /** Here be dragons **/
265 
266   // used to achieve atomic operations in the memstore.
267   @Override
268   public long getMvccVersion() {
269     return memstoreTS;
270   }
271 
272   public void setMvccVersion(long mvccVersion){
273     this.memstoreTS = mvccVersion;
274   }
275 
276   @Deprecated
277   public long getMemstoreTS() {
278     return getMvccVersion();
279   }
280 
281   @Deprecated
282   public void setMemstoreTS(long memstoreTS) {
283     setMvccVersion(memstoreTS);
284   }
285 
286   // default value is 0, aka DNC
287   private long memstoreTS = 0;
288 
289   /** Dragon time over, return to normal business */
290 
291 
292   /** Writable Constructor -- DO NOT USE */
293   public KeyValue() {}
294 
295   /**
296    * Creates a KeyValue from the start of the specified byte array.
297    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
298    * @param bytes byte array
299    */
300   public KeyValue(final byte [] bytes) {
301     this(bytes, 0);
302   }
303 
304   /**
305    * Creates a KeyValue from the specified byte array and offset.
306    * Presumes <code>bytes</code> content starting at <code>offset</code> is
307    * formatted as a KeyValue blob.
308    * @param bytes byte array
309    * @param offset offset to start of KeyValue
310    */
311   public KeyValue(final byte [] bytes, final int offset) {
312     this(bytes, offset, getLength(bytes, offset));
313   }
314 
315   /**
316    * Creates a KeyValue from the specified byte array, starting at offset, and
317    * for length <code>length</code>.
318    * @param bytes byte array
319    * @param offset offset to start of the KeyValue
320    * @param length length of the KeyValue
321    */
322   public KeyValue(final byte [] bytes, final int offset, final int length) {
323     this.bytes = bytes;
324     this.offset = offset;
325     this.length = length;
326   }
327 
328   /**
329    * Creates a KeyValue from the specified byte array, starting at offset,
330    * for length <code>length</code>, and a known <code>keyLength</code>.
331    * @param bytes byte array
332    * @param offset offset to start of the KeyValue
333    * @param length length of the KeyValue
334    * @param keyLength length of the key portion of the KeyValue
335    */
336   public KeyValue(final byte [] bytes, final int offset, final int length, final int keyLength) {
337     this.bytes = bytes;
338     this.offset = offset;
339     this.length = length;
340     this.keyLength = keyLength;
341   }
342 
343   /** Constructors that build a new backing byte array from fields */
344 
345   /**
346    * Constructs KeyValue structure filled with null value.
347    * Sets type to {@link KeyValue.Type#Maximum}
348    * @param row - row key (arbitrary byte array)
349    * @param timestamp
350    */
351   public KeyValue(final byte [] row, final long timestamp) {
352     this(row, timestamp, Type.Maximum);
353   }
354 
355   /**
356    * Constructs KeyValue structure filled with null value.
357    * @param row - row key (arbitrary byte array)
358    * @param timestamp
359    */
360   public KeyValue(final byte [] row, final long timestamp, Type type) {
361     this(row, null, null, timestamp, type, null);
362   }
363 
364   /**
365    * Constructs KeyValue structure filled with null value.
366    * Sets type to {@link KeyValue.Type#Maximum}
367    * @param row - row key (arbitrary byte array)
368    * @param family family name
369    * @param qualifier column qualifier
370    */
371   public KeyValue(final byte [] row, final byte [] family,
372       final byte [] qualifier) {
373     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
374   }
375 
376   /**
377    * Constructs KeyValue structure filled with null value.
378    * @param row - row key (arbitrary byte array)
379    * @param family family name
380    * @param qualifier column qualifier
381    */
382   public KeyValue(final byte [] row, final byte [] family,
383       final byte [] qualifier, final byte [] value) {
384     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
385   }
386 
387   /**
388    * Constructs KeyValue structure filled with specified values.
389    * @param row row key
390    * @param family family name
391    * @param qualifier column qualifier
392    * @param timestamp version timestamp
393    * @param type key type
394    * @throws IllegalArgumentException
395    */
396   public KeyValue(final byte[] row, final byte[] family,
397       final byte[] qualifier, final long timestamp, Type type) {
398     this(row, family, qualifier, timestamp, type, null);
399   }
400 
401   /**
402    * Constructs KeyValue structure filled with specified values.
403    * @param row row key
404    * @param family family name
405    * @param qualifier column qualifier
406    * @param timestamp version timestamp
407    * @param value column value
408    * @throws IllegalArgumentException
409    */
410   public KeyValue(final byte[] row, final byte[] family,
411       final byte[] qualifier, final long timestamp, final byte[] value) {
412     this(row, family, qualifier, timestamp, Type.Put, value);
413   }
414 
415   /**
416    * Constructs KeyValue structure filled with specified values.
417    * @param row row key
418    * @param family family name
419    * @param qualifier column qualifier
420    * @param timestamp version timestamp
421    * @param type key type
422    * @param value column value
423    * @throws IllegalArgumentException
424    */
425   public KeyValue(final byte[] row, final byte[] family,
426       final byte[] qualifier, final long timestamp, Type type,
427       final byte[] value) {
428     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
429         timestamp, type, value, 0, value==null ? 0 : value.length);
430   }
431 
432   /**
433    * Constructs KeyValue structure filled with specified values.
434    * @param row row key
435    * @param family family name
436    * @param qualifier column qualifier
437    * @param qoffset qualifier offset
438    * @param qlength qualifier length
439    * @param timestamp version timestamp
440    * @param type key type
441    * @param value column value
442    * @param voffset value offset
443    * @param vlength value length
444    * @throws IllegalArgumentException
445    */
446   public KeyValue(byte [] row, byte [] family,
447       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
448       byte [] value, int voffset, int vlength) {
449     this(row, 0, row==null ? 0 : row.length,
450         family, 0, family==null ? 0 : family.length,
451         qualifier, qoffset, qlength, timestamp, type,
452         value, voffset, vlength);
453   }
454 
455   /**
456    * Constructs KeyValue structure filled with specified values.
457    * <p>
458    * Column is split into two fields, family and qualifier.
459    * @param row row key
460    * @param roffset row offset
461    * @param rlength row length
462    * @param family family name
463    * @param foffset family offset
464    * @param flength family length
465    * @param qualifier column qualifier
466    * @param qoffset qualifier offset
467    * @param qlength qualifier length
468    * @param timestamp version timestamp
469    * @param type key type
470    * @param value column value
471    * @param voffset value offset
472    * @param vlength value length
473    * @throws IllegalArgumentException
474    */
475   public KeyValue(final byte [] row, final int roffset, final int rlength,
476       final byte [] family, final int foffset, final int flength,
477       final byte [] qualifier, final int qoffset, final int qlength,
478       final long timestamp, final Type type,
479       final byte [] value, final int voffset, final int vlength) {
480     this.bytes = createByteArray(row, roffset, rlength,
481         family, foffset, flength, qualifier, qoffset, qlength,
482         timestamp, type, value, voffset, vlength);
483     this.length = bytes.length;
484     this.offset = 0;
485   }
486 
487   /**
488    * Constructs an empty KeyValue structure, with specified sizes.
489    * This can be used to partially fill up KeyValues.
490    * <p>
491    * Column is split into two fields, family and qualifier.
492    * @param rlength row length
493    * @param flength family length
494    * @param qlength qualifier length
495    * @param timestamp version timestamp
496    * @param type key type
497    * @param vlength value length
498    * @throws IllegalArgumentException
499    */
500   public KeyValue(final int rlength,
501       final int flength,
502       final int qlength,
503       final long timestamp, final Type type,
504       final int vlength) {
505     this.bytes = createEmptyByteArray(rlength,
506         flength, qlength,
507         timestamp, type, vlength);
508     this.length = bytes.length;
509     this.offset = 0;
510   }
511 
512   /**
513    * Create an empty byte[] representing a KeyValue
514    * All lengths are preset and can be filled in later.
515    * @param rlength
516    * @param flength
517    * @param qlength
518    * @param timestamp
519    * @param type
520    * @param vlength
521    * @return The newly created byte array.
522    */
523   static byte[] createEmptyByteArray(final int rlength, int flength,
524       int qlength, final long timestamp, final Type type, int vlength) {
525     if (rlength > Short.MAX_VALUE) {
526       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
527     }
528     if (flength > Byte.MAX_VALUE) {
529       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
530     }
531     // Qualifier length
532     if (qlength > Integer.MAX_VALUE - rlength - flength) {
533       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
534     }
535     // Key length
536     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
537     if (longkeylength > Integer.MAX_VALUE) {
538       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
539         Integer.MAX_VALUE);
540     }
541     int keylength = (int)longkeylength;
542     // Value length
543     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
544       throw new IllegalArgumentException("Valuer > " +
545           HConstants.MAXIMUM_VALUE_LENGTH);
546     }
547 
548     // Allocate right-sized byte array.
549     byte [] bytes =
550         new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength)];
551     // Write the correct size markers
552     int pos = 0;
553     pos = Bytes.putInt(bytes, pos, keylength);
554     pos = Bytes.putInt(bytes, pos, vlength);
555     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
556     pos += rlength;
557     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
558     pos += flength + qlength;
559     pos = Bytes.putLong(bytes, pos, timestamp);
560     pos = Bytes.putByte(bytes, pos, type.getCode());
561     return bytes;
562   }
563 
564   /**
565    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as its
566    * backing data buffer.
567    * <p>
568    * Column is split into two fields, family and qualifier.
569    *
570    * @param buffer the bytes buffer to use
571    * @param row row key
572    * @param roffset row offset
573    * @param rlength row length
574    * @param family family name
575    * @param foffset family offset
576    * @param flength family length
577    * @param qualifier column qualifier
578    * @param qoffset qualifier offset
579    * @param qlength qualifier length
580    * @param timestamp version timestamp
581    * @param type key type
582    * @param value column value
583    * @param voffset value offset
584    * @param vlength value length
585    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
586    * remaining in the buffer
587    */
588   public KeyValue(byte [] buffer,
589       final byte [] row, final int roffset, final int rlength,
590       final byte [] family, final int foffset, final int flength,
591       final byte [] qualifier, final int qoffset, final int qlength,
592       final long timestamp, final Type type,
593       final byte [] value, final int voffset, final int vlength) {
594 
595     this(buffer, 0,
596         row, roffset, rlength,
597         family, foffset, flength,
598         qualifier, qoffset, qlength,
599         timestamp, type,
600         value, voffset, vlength);
601   }
602 
603   /**
604    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
605    * data buffer.
606    * <p>
607    * Column is split into two fields, family and qualifier.
608    *
609    * @param buffer the bytes buffer to use
610    * @param boffset buffer offset
611    * @param row row key
612    * @param roffset row offset
613    * @param rlength row length
614    * @param family family name
615    * @param foffset family offset
616    * @param flength family length
617    * @param qualifier column qualifier
618    * @param qoffset qualifier offset
619    * @param qlength qualifier length
620    * @param timestamp version timestamp
621    * @param type key type
622    * @param value column value
623    * @param voffset value offset
624    * @param vlength value length
625    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
626    * remaining in the buffer
627    */
628   public KeyValue(byte [] buffer, final int boffset,
629       final byte [] row, final int roffset, final int rlength,
630       final byte [] family, final int foffset, final int flength,
631       final byte [] qualifier, final int qoffset, final int qlength,
632       final long timestamp, final Type type,
633       final byte [] value, final int voffset, final int vlength) {
634 
635     this.bytes  = buffer;
636     this.length = writeByteArray(buffer, boffset,
637         row, roffset, rlength,
638         family, foffset, flength, qualifier, qoffset, qlength,
639         timestamp, type, value, voffset, vlength);
640     this.offset = boffset;
641   }
642 
643   /**
644    * Checks the parameters passed to a constructor.
645    *
646    * @param row row key
647    * @param rlength row length
648    * @param family family name
649    * @param flength family length
650    * @param qualifier column qualifier
651    * @param qlength qualifier length
652    * @param value column value
653    * @param vlength value length
654    *
655    * @throws IllegalArgumentException an illegal value was passed
656    */
657   private static void checkParameters(final byte [] row, final int rlength,
658       final byte [] family, int flength,
659       final byte [] qualifier, int qlength,
660       final byte [] value, int vlength)
661           throws IllegalArgumentException {
662 
663     if (rlength > Short.MAX_VALUE) {
664       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
665     }
666     if (row == null) {
667       throw new IllegalArgumentException("Row is null");
668     }
669     // Family length
670     flength = family == null ? 0 : flength;
671     if (flength > Byte.MAX_VALUE) {
672       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
673     }
674     // Qualifier length
675     qlength = qualifier == null ? 0 : qlength;
676     if (qlength > Integer.MAX_VALUE - rlength - flength) {
677       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
678     }
679     // Key length
680     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
681     if (longKeyLength > Integer.MAX_VALUE) {
682       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
683           Integer.MAX_VALUE);
684     }
685     // Value length
686     vlength = value == null? 0 : vlength;
687     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
688       throw new IllegalArgumentException("Value length " + vlength + " > " +
689           HConstants.MAXIMUM_VALUE_LENGTH);
690     }
691   }
692 
693   /**
694    * Write KeyValue format into the provided byte array.
695    *
696    * @param buffer the bytes buffer to use
697    * @param boffset buffer offset
698    * @param row row key
699    * @param roffset row offset
700    * @param rlength row length
701    * @param family family name
702    * @param foffset family offset
703    * @param flength family length
704    * @param qualifier column qualifier
705    * @param qoffset qualifier offset
706    * @param qlength qualifier length
707    * @param timestamp version timestamp
708    * @param type key type
709    * @param value column value
710    * @param voffset value offset
711    * @param vlength value length
712    *
713    * @return The number of useful bytes in the buffer.
714    *
715    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
716    * remaining in the buffer
717    */
718   static int writeByteArray(byte [] buffer, final int boffset,
719       final byte [] row, final int roffset, final int rlength,
720       final byte [] family, final int foffset, int flength,
721       final byte [] qualifier, final int qoffset, int qlength,
722       final long timestamp, final Type type,
723       final byte [] value, final int voffset, int vlength) {
724 
725     checkParameters(row, rlength, family, flength, qualifier, qlength, value, vlength);
726 
727     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
728     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
729     if (keyValueLength > buffer.length - boffset) {
730       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
731           keyValueLength);
732     }
733 
734     // Write key, value and key row length.
735     int pos = boffset;
736     pos = Bytes.putInt(buffer, pos, keyLength);
737     pos = Bytes.putInt(buffer, pos, vlength);
738     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
739     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
740     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
741     if (flength != 0) {
742       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
743     }
744     if (qlength != 0) {
745       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
746     }
747     pos = Bytes.putLong(buffer, pos, timestamp);
748     pos = Bytes.putByte(buffer, pos, type.getCode());
749     if (value != null && value.length > 0) {
750       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
751     }
752 
753     return keyValueLength;
754   }
755 
756   /**
757    * Write KeyValue format into a byte array.
758    *
759    * @param row row key
760    * @param roffset row offset
761    * @param rlength row length
762    * @param family family name
763    * @param foffset family offset
764    * @param flength family length
765    * @param qualifier column qualifier
766    * @param qoffset qualifier offset
767    * @param qlength qualifier length
768    * @param timestamp version timestamp
769    * @param type key type
770    * @param value column value
771    * @param voffset value offset
772    * @param vlength value length
773    * @return The newly created byte array.
774    */
775   static byte [] createByteArray(final byte [] row, final int roffset,
776       final int rlength, final byte [] family, final int foffset, int flength,
777       final byte [] qualifier, final int qoffset, int qlength,
778       final long timestamp, final Type type,
779       final byte [] value, final int voffset, int vlength) {
780 
781     checkParameters(row, rlength, family, flength, qualifier, qlength, value, vlength);
782 
783     // Allocate right-sized byte array.
784     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
785     byte [] bytes =
786         new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength)];
787     // Write key, value and key row length.
788     int pos = 0;
789     pos = Bytes.putInt(bytes, pos, keyLength);
790     pos = Bytes.putInt(bytes, pos, vlength);
791     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
792     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
793     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
794     if(flength != 0) {
795       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
796     }
797     if(qlength != 0) {
798       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
799     }
800     pos = Bytes.putLong(bytes, pos, timestamp);
801     pos = Bytes.putByte(bytes, pos, type.getCode());
802     if (value != null && value.length > 0) {
803       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
804     }
805     return bytes;
806   }
807 
808   /**
809    * Write KeyValue format into a byte array.
810    * <p>
811    * Takes column in the form <code>family:qualifier</code>
812    * @param row - row key (arbitrary byte array)
813    * @param roffset
814    * @param rlength
815    * @param column
816    * @param coffset
817    * @param clength
818    * @param timestamp
819    * @param type
820    * @param value
821    * @param voffset
822    * @param vlength
823    * @return The newly created byte array.
824    */
825   static byte [] createByteArray(final byte [] row, final int roffset,
826         final int rlength,
827       final byte [] column, final int coffset, int clength,
828       final long timestamp, final Type type,
829       final byte [] value, final int voffset, int vlength) {
830     // If column is non-null, figure where the delimiter is at.
831     int delimiteroffset = 0;
832     if (column != null && column.length > 0) {
833       delimiteroffset = getFamilyDelimiterIndex(column, coffset, clength);
834       if (delimiteroffset > Byte.MAX_VALUE) {
835         throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
836       }
837     } else {
838       return createByteArray(row,roffset,rlength,null,0,0,null,0,0,timestamp,
839           type,value,voffset,vlength);
840     }
841     int flength = delimiteroffset-coffset;
842     int qlength = clength - flength - 1;
843     return createByteArray(row, roffset, rlength, column, coffset,
844         flength, column, delimiteroffset+1, qlength, timestamp, type,
845         value, voffset, vlength);
846   }
847 
848   /**
849    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
850    */
851   @Override
852   public boolean equals(Object other) {
853     if (!(other instanceof Cell)) {
854       return false;
855     }
856     return CellComparator.equals(this, (Cell)other);
857   }
858 
859   @Override
860   public int hashCode() {
861     byte[] b = getBuffer();
862     int start = getOffset(), end = getOffset() + getLength();
863     int h = b[start++];
864     for (int i = start; i < end; i++) {
865       h = (h * 13) ^ b[i];
866     }
867     return h;
868   }
869 
870   //---------------------------------------------------------------------------
871   //
872   //  KeyValue cloning
873   //
874   //---------------------------------------------------------------------------
875 
876   /**
877    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
878    * @return Fully copied clone of this KeyValue
879    * @throws CloneNotSupportedException
880    */
881   @Override
882   public KeyValue clone() throws CloneNotSupportedException {
883     super.clone();
884     byte [] b = new byte[this.length];
885     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
886     KeyValue ret = new KeyValue(b, 0, b.length);
887     // Important to clone the memstoreTS as well - otherwise memstore's
888     // update-in-place methods (eg increment) will end up creating
889     // new entries
890     ret.setMemstoreTS(memstoreTS);
891     return ret;
892   }
893 
894   /**
895    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
896    * http://en.wikipedia.org/wiki/Object_copy
897    * @return Shallow copy of this KeyValue
898    */
899   public KeyValue shallowCopy() {
900     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
901     shallowCopy.setMemstoreTS(this.memstoreTS);
902     return shallowCopy;
903   }
904 
905   //---------------------------------------------------------------------------
906   //
907   //  String representation
908   //
909   //---------------------------------------------------------------------------
910 
911   public String toString() {
912     if (this.bytes == null || this.bytes.length == 0) {
913       return "empty";
914     }
915     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
916       "/vlen=" + getValueLength() + "/mvcc=" + memstoreTS;
917   }
918 
919   /**
920    * @param k Key portion of a KeyValue.
921    * @return Key as a String.
922    */
923   public static String keyToString(final byte [] k) {
924     return keyToString(k, 0, k.length);
925   }
926 
927   /**
928    * Use for logging.
929    * @param b Key portion of a KeyValue.
930    * @param o Offset to start of key
931    * @param l Length of key.
932    * @return Key as a String.
933    */
934   /**
935    * Produces a string map for this key/value pair. Useful for programmatic use
936    * and manipulation of the data stored in an HLogKey, for example, printing
937    * as JSON. Values are left out due to their tendency to be large. If needed,
938    * they can be added manually.
939    *
940    * @return the Map<String,?> containing data from this key
941    */
942   public Map<String, Object> toStringMap() {
943     Map<String, Object> stringMap = new HashMap<String, Object>();
944     stringMap.put("row", Bytes.toStringBinary(getRow()));
945     stringMap.put("family", Bytes.toStringBinary(getFamily()));
946     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
947     stringMap.put("timestamp", getTimestamp());
948     stringMap.put("vlen", getValueLength());
949     return stringMap;
950   }
951 
952   public static String keyToString(final byte [] b, final int o, final int l) {
953     if (b == null) return "";
954     int rowlength = Bytes.toShort(b, o);
955     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
956     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
957     int familylength = b[columnoffset - 1];
958     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
959     String family = familylength == 0? "":
960       Bytes.toStringBinary(b, columnoffset, familylength);
961     String qualifier = columnlength == 0? "":
962       Bytes.toStringBinary(b, columnoffset + familylength,
963       columnlength - familylength);
964     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
965     String timestampStr = humanReadableTimestamp(timestamp);
966     byte type = b[o + l - 1];
967     return row + "/" + family +
968       (family != null && family.length() > 0? ":" :"") +
969       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
970   }
971 
972   public static String humanReadableTimestamp(final long timestamp) {
973     if (timestamp == HConstants.LATEST_TIMESTAMP) {
974       return "LATEST_TIMESTAMP";
975     }
976     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
977       return "OLDEST_TIMESTAMP";
978     }
979     return String.valueOf(timestamp);
980   }
981 
982   //---------------------------------------------------------------------------
983   //
984   //  Public Member Accessors
985   //
986   //---------------------------------------------------------------------------
987 
988   /**
989    * @return The byte array backing this KeyValue.
990    */
991   public byte [] getBuffer() {
992     return this.bytes;
993   }
994 
995   /**
996    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
997    */
998   public int getOffset() {
999     return this.offset;
1000   }
1001 
1002   /**
1003    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1004    */
1005   public int getLength() {
1006     return length;
1007   }
1008 
1009   //---------------------------------------------------------------------------
1010   //
1011   //  Length and Offset Calculators
1012   //
1013   //---------------------------------------------------------------------------
1014 
1015   /**
1016    * Determines the total length of the KeyValue stored in the specified
1017    * byte array and offset.  Includes all headers.
1018    * @param bytes byte array
1019    * @param offset offset to start of the KeyValue
1020    * @return length of entire KeyValue, in bytes
1021    */
1022   private static int getLength(byte [] bytes, int offset) {
1023     return ROW_OFFSET +
1024         Bytes.toInt(bytes, offset) +
1025         Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1026   }
1027 
1028   /**
1029    * @return Key offset in backing buffer..
1030    */
1031   public int getKeyOffset() {
1032     return this.offset + ROW_OFFSET;
1033   }
1034 
1035   public String getKeyString() {
1036     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1037   }
1038 
1039   /**
1040    * @return Length of key portion.
1041    */
1042   private int keyLength = 0;
1043 
1044   public int getKeyLength() {
1045     if (keyLength == 0) {
1046       keyLength = Bytes.toInt(this.bytes, this.offset);
1047     }
1048     return keyLength;
1049   }
1050 
1051   /**
1052    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1053    */
1054   @Override
1055   public byte[] getValueArray() {
1056     return bytes;
1057   }
1058 
1059   /**
1060    * @return Value offset
1061    */
1062   @Override
1063   public int getValueOffset() {
1064     return getKeyOffset() + getKeyLength();
1065   }
1066 
1067   /**
1068    * @return Value length
1069    */
1070   @Override
1071   public int getValueLength() {
1072     return Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1073   }
1074 
1075   /**
1076    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1077    */
1078   @Override
1079   public byte[] getRowArray() {
1080     return bytes;
1081   }
1082 
1083   /**
1084    * @return Row offset
1085    */
1086   @Override
1087   public int getRowOffset() {
1088     return getKeyOffset() + Bytes.SIZEOF_SHORT;
1089   }
1090 
1091   /**
1092    * @return Row length
1093    */
1094   @Override
1095   public short getRowLength() {
1096     return Bytes.toShort(this.bytes, getKeyOffset());
1097   }
1098 
1099   /**
1100    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1101    */
1102   @Override
1103   public byte[] getFamilyArray() {
1104     return bytes;
1105   }
1106 
1107   /**
1108    * @return Family offset
1109    */
1110   @Override
1111   public int getFamilyOffset() {
1112     return getFamilyOffset(getRowLength());
1113   }
1114 
1115   /**
1116    * @return Family offset
1117    */
1118   public int getFamilyOffset(int rlength) {
1119     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
1120   }
1121 
1122   /**
1123    * @return Family length
1124    */
1125   @Override
1126   public byte getFamilyLength() {
1127     return getFamilyLength(getFamilyOffset());
1128   }
1129 
1130   /**
1131    * @return Family length
1132    */
1133   public byte getFamilyLength(int foffset) {
1134     return this.bytes[foffset-1];
1135   }
1136 
1137   /**
1138    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1139    */
1140   @Override
1141   public byte[] getQualifierArray() {
1142     return bytes;
1143   }
1144 
1145   /**
1146    * @return Qualifier offset
1147    */
1148   @Override
1149   public int getQualifierOffset() {
1150     return getQualifierOffset(getFamilyOffset());
1151   }
1152 
1153   /**
1154    * @return Qualifier offset
1155    */
1156   public int getQualifierOffset(int foffset) {
1157     return foffset + getFamilyLength(foffset);
1158   }
1159 
1160   /**
1161    * @return Qualifier length
1162    */
1163   @Override
1164   public int getQualifierLength() {
1165     return getQualifierLength(getRowLength(),getFamilyLength());
1166   }
1167 
1168   /**
1169    * @return Qualifier length
1170    */
1171   public int getQualifierLength(int rlength, int flength) {
1172     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1173   }
1174 
1175   /**
1176    * @return Column (family + qualifier) length
1177    */
1178   public int getTotalColumnLength() {
1179     int rlength = getRowLength();
1180     int foffset = getFamilyOffset(rlength);
1181     return getTotalColumnLength(rlength,foffset);
1182   }
1183 
1184   /**
1185    * @return Column (family + qualifier) length
1186    */
1187   public int getTotalColumnLength(int rlength, int foffset) {
1188     int flength = getFamilyLength(foffset);
1189     int qlength = getQualifierLength(rlength,flength);
1190     return flength + qlength;
1191   }
1192 
1193   /**
1194    * @return Timestamp offset
1195    */
1196   public int getTimestampOffset() {
1197     return getTimestampOffset(getKeyLength());
1198   }
1199 
1200   /**
1201    * @param keylength Pass if you have it to save on a int creation.
1202    * @return Timestamp offset
1203    */
1204   public int getTimestampOffset(final int keylength) {
1205     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1206   }
1207 
1208   /**
1209    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1210    */
1211   public boolean isLatestTimestamp() {
1212     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1213       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1214   }
1215 
1216   /**
1217    * @return True if this is a "fake" KV created for internal seeking purposes,
1218    * which should not be seen by user code
1219    */
1220   public boolean isInternal() {
1221     byte type = getType();
1222     return type == Type.Minimum.code || type == Type.Maximum.code;
1223   }
1224   /**
1225    * @param now Time to set into <code>this</code> IFF timestamp ==
1226    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1227    * @return True is we modified this.
1228    */
1229   public boolean updateLatestStamp(final byte [] now) {
1230     if (this.isLatestTimestamp()) {
1231       int tsOffset = getTimestampOffset();
1232       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1233       // clear cache or else getTimestamp() possibly returns an old value
1234       return true;
1235     }
1236     return false;
1237   }
1238 
1239   //---------------------------------------------------------------------------
1240   //
1241   //  Methods that return copies of fields
1242   //
1243   //---------------------------------------------------------------------------
1244 
1245   /**
1246    * Do not use unless you have to.  Used internally for compacting and testing.
1247    *
1248    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
1249    * {@link #getValue()} if accessing a KeyValue client-side.
1250    * @return Copy of the key portion only.
1251    */
1252   public byte [] getKey() {
1253     int keylength = getKeyLength();
1254     byte [] key = new byte[keylength];
1255     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1256     return key;
1257   }
1258 
1259   /**
1260    * Returns value in a new byte array.
1261    * Primarily for use client-side. If server-side, use
1262    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1263    * save on allocations.
1264    * @return Value in a new byte array.
1265    */
1266   public byte [] getValue() {
1267     int o = getValueOffset();
1268     int l = getValueLength();
1269     byte [] result = new byte[l];
1270     System.arraycopy(getBuffer(), o, result, 0, l);
1271     return result;
1272   }
1273 
1274   /**
1275    * Returns the value wrapped in a new <code>ByteBuffer</code>.
1276    *
1277    * @return the value
1278    */
1279   public ByteBuffer getValueAsByteBuffer() {
1280     return ByteBuffer.wrap(getBuffer(), getValueOffset(), getValueLength());
1281   }
1282 
1283   /**
1284    * Loads this object's value into the provided <code>ByteBuffer</code>.
1285    * <p>
1286    * Does not clear or flip the buffer.
1287    *
1288    * @param dst the buffer where to write the value
1289    *
1290    * @throws BufferOverflowException if there is insufficient space remaining in the buffer
1291    */
1292   public void loadValue(ByteBuffer dst) throws BufferOverflowException {
1293     dst.put(getBuffer(), getValueOffset(), getValueLength());
1294   }
1295 
1296   /**
1297    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1298    * byte array.<p>
1299    *
1300    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1301    * lengths instead.
1302    * @return Row in a new byte array.
1303    */
1304   public byte [] getRow() {
1305     int o = getRowOffset();
1306     short l = getRowLength();
1307     byte result[] = new byte[l];
1308     System.arraycopy(getBuffer(), o, result, 0, l);
1309     return result;
1310   }
1311 
1312   /**
1313    *
1314    * @return Timestamp
1315    */
1316   @Override
1317   public long getTimestamp() {
1318     return getTimestamp(getKeyLength());
1319   }
1320 
1321   /**
1322    * @param keylength Pass if you have it to save on a int creation.
1323    * @return Timestamp
1324    */
1325   long getTimestamp(final int keylength) {
1326     int tsOffset = getTimestampOffset(keylength);
1327     return Bytes.toLong(this.bytes, tsOffset);
1328   }
1329 
1330   /**
1331    * @return Type of this KeyValue.
1332    */
1333   public byte getType() {
1334     return getType(getKeyLength());
1335   }
1336 
1337   /**
1338    * @return KeyValue.TYPE byte representation
1339    */
1340   @Override
1341   public byte getTypeByte() {
1342     return getType(getKeyLength());
1343   }
1344 
1345   /**
1346    * @param keylength Pass if you have it to save on a int creation.
1347    * @return Type of this KeyValue.
1348    */
1349   byte getType(final int keylength) {
1350     return this.bytes[this.offset + keylength - 1 + ROW_OFFSET];
1351   }
1352 
1353   /**
1354    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1355    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1356    * KeyValue type.
1357    */
1358   public boolean isDelete() {
1359     return KeyValue.isDelete(getType());
1360   }
1361 
1362   /**
1363    * @return True if this KV is a {@link KeyValue.Type#Delete} type.
1364    */
1365   public boolean isDeleteType() {
1366     // TODO: Fix this method name vis-a-vis isDelete!
1367     return getType() == Type.Delete.getCode();
1368   }
1369 
1370   /**
1371    * @return True if this KV is a delete family type.
1372    */
1373   public boolean isDeleteFamily() {
1374     return getType() == Type.DeleteFamily.getCode();
1375   }
1376 
1377   /**
1378    *
1379    * @return True if this KV is a delete family or column type.
1380    */
1381   public boolean isDeleteColumnOrFamily() {
1382     int t = getType();
1383     return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
1384   }
1385 
1386   /**
1387    * Primarily for use client-side.  Returns the family of this KeyValue in a
1388    * new byte array.<p>
1389    *
1390    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1391    * lengths instead.
1392    * @return Returns family. Makes a copy.
1393    */
1394   public byte [] getFamily() {
1395     int o = getFamilyOffset();
1396     int l = getFamilyLength(o);
1397     byte [] result = new byte[l];
1398     System.arraycopy(this.bytes, o, result, 0, l);
1399     return result;
1400   }
1401 
1402   /**
1403    * Primarily for use client-side.  Returns the column qualifier of this
1404    * KeyValue in a new byte array.<p>
1405    *
1406    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1407    * lengths instead.
1408    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1409    * @return Returns qualifier. Makes a copy.
1410    */
1411   public byte [] getQualifier() {
1412     int o = getQualifierOffset();
1413     int l = getQualifierLength();
1414     byte [] result = new byte[l];
1415     System.arraycopy(this.bytes, o, result, 0, l);
1416     return result;
1417   }
1418 
1419   //---------------------------------------------------------------------------
1420   //
1421   //  KeyValue splitter
1422   //
1423   //---------------------------------------------------------------------------
1424 
1425   /**
1426    * Utility class that splits a KeyValue buffer into separate byte arrays.
1427    * <p>
1428    * Should get rid of this if we can, but is very useful for debugging.
1429    */
1430   public static class SplitKeyValue {
1431     private byte [][] split;
1432     SplitKeyValue() {
1433       this.split = new byte[6][];
1434     }
1435     public void setRow(byte [] value) { this.split[0] = value; }
1436     public void setFamily(byte [] value) { this.split[1] = value; }
1437     public void setQualifier(byte [] value) { this.split[2] = value; }
1438     public void setTimestamp(byte [] value) { this.split[3] = value; }
1439     public void setType(byte [] value) { this.split[4] = value; }
1440     public void setValue(byte [] value) { this.split[5] = value; }
1441     public byte [] getRow() { return this.split[0]; }
1442     public byte [] getFamily() { return this.split[1]; }
1443     public byte [] getQualifier() { return this.split[2]; }
1444     public byte [] getTimestamp() { return this.split[3]; }
1445     public byte [] getType() { return this.split[4]; }
1446     public byte [] getValue() { return this.split[5]; }
1447   }
1448 
1449   public SplitKeyValue split() {
1450     SplitKeyValue split = new SplitKeyValue();
1451     int splitOffset = this.offset;
1452     int keyLen = Bytes.toInt(bytes, splitOffset);
1453     splitOffset += Bytes.SIZEOF_INT;
1454     int valLen = Bytes.toInt(bytes, splitOffset);
1455     splitOffset += Bytes.SIZEOF_INT;
1456     short rowLen = Bytes.toShort(bytes, splitOffset);
1457     splitOffset += Bytes.SIZEOF_SHORT;
1458     byte [] row = new byte[rowLen];
1459     System.arraycopy(bytes, splitOffset, row, 0, rowLen);
1460     splitOffset += rowLen;
1461     split.setRow(row);
1462     byte famLen = bytes[splitOffset];
1463     splitOffset += Bytes.SIZEOF_BYTE;
1464     byte [] family = new byte[famLen];
1465     System.arraycopy(bytes, splitOffset, family, 0, famLen);
1466     splitOffset += famLen;
1467     split.setFamily(family);
1468     int colLen = keyLen -
1469       (rowLen + famLen + Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BYTE +
1470       Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE);
1471     byte [] qualifier = new byte[colLen];
1472     System.arraycopy(bytes, splitOffset, qualifier, 0, colLen);
1473     splitOffset += colLen;
1474     split.setQualifier(qualifier);
1475     byte [] timestamp = new byte[Bytes.SIZEOF_LONG];
1476     System.arraycopy(bytes, splitOffset, timestamp, 0, Bytes.SIZEOF_LONG);
1477     splitOffset += Bytes.SIZEOF_LONG;
1478     split.setTimestamp(timestamp);
1479     byte [] type = new byte[1];
1480     type[0] = bytes[splitOffset];
1481     splitOffset += Bytes.SIZEOF_BYTE;
1482     split.setType(type);
1483     byte [] value = new byte[valLen];
1484     System.arraycopy(bytes, splitOffset, value, 0, valLen);
1485     split.setValue(value);
1486     return split;
1487   }
1488 
1489   //---------------------------------------------------------------------------
1490   //
1491   //  Compare specified fields against those contained in this KeyValue
1492   //
1493   //---------------------------------------------------------------------------
1494 
1495   /**
1496    * @param family
1497    * @return True if matching families.
1498    */
1499   public boolean matchingFamily(final byte [] family) {
1500     return matchingFamily(family, 0, family.length);
1501   }
1502 
1503   public boolean matchingFamily(final byte[] family, int offset, int length) {
1504     if (this.length == 0 || this.bytes.length == 0) {
1505       return false;
1506     }
1507     return Bytes.equals(family, offset, length,
1508         this.bytes, getFamilyOffset(), getFamilyLength());
1509   }
1510 
1511   public boolean matchingFamily(final KeyValue other) {
1512     return matchingFamily(other.getBuffer(), other.getFamilyOffset(),
1513         other.getFamilyLength());
1514   }
1515 
1516   /**
1517    * @param qualifier
1518    * @return True if matching qualifiers.
1519    */
1520   public boolean matchingQualifier(final byte [] qualifier) {
1521     return matchingQualifier(qualifier, 0, qualifier.length);
1522   }
1523 
1524   public boolean matchingQualifier(final byte [] qualifier, int offset, int length) {
1525     return Bytes.equals(qualifier, offset, length,
1526         this.bytes, getQualifierOffset(), getQualifierLength());
1527   }
1528 
1529   public boolean matchingQualifier(final KeyValue other) {
1530     return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
1531         other.getQualifierLength());
1532   }
1533 
1534   public boolean matchingRow(final byte [] row) {
1535     return matchingRow(row, 0, row.length);
1536   }
1537 
1538   public boolean matchingRow(final byte[] row, int offset, int length) {
1539     return Bytes.equals(row, offset, length,
1540         this.bytes, getRowOffset(), getRowLength());
1541   }
1542 
1543   public boolean matchingRow(KeyValue other) {
1544     return matchingRow(other.getBuffer(), other.getRowOffset(),
1545         other.getRowLength());
1546   }
1547 
1548   /**
1549    * @param column Column minus its delimiter
1550    * @return True if column matches.
1551    */
1552   public boolean matchingColumnNoDelimiter(final byte [] column) {
1553     int rl = getRowLength();
1554     int o = getFamilyOffset(rl);
1555     int fl = getFamilyLength(o);
1556     int l = fl + getQualifierLength(rl,fl);
1557     return Bytes.equals(column, 0, column.length, this.bytes, o, l);
1558   }
1559 
1560   /**
1561    *
1562    * @param family column family
1563    * @param qualifier column qualifier
1564    * @return True if column matches
1565    */
1566   public boolean matchingColumn(final byte[] family, final byte[] qualifier) {
1567     return matchingColumn(family, 0, family == null ? 0 : family.length,
1568         qualifier, 0, qualifier == null ? 0 : qualifier.length);
1569   }
1570 
1571   /**
1572    * Checks if column matches.
1573    *
1574    * @param family family name
1575    * @param foffset family offset
1576    * @param flength family length
1577    * @param qualifier column qualifier
1578    * @param qoffset qualifier offset
1579    * @param qlength qualifier length
1580    *
1581    * @return True if column matches
1582    */
1583   public boolean matchingColumn(final byte [] family, final int foffset, final int flength,
1584       final byte [] qualifier, final int qoffset, final int qlength) {
1585     int rl = getRowLength();
1586     int o = getFamilyOffset(rl);
1587     int fl = getFamilyLength(o);
1588     if (!Bytes.equals(family, foffset, flength, this.bytes, o, fl)) {
1589       return false;
1590     }
1591 
1592     int ql = getQualifierLength(rl, fl);
1593     if (qualifier == null || qlength == 0) {
1594       return (ql == 0);
1595     }
1596     return Bytes.equals(qualifier, qoffset, qlength, this.bytes, o + fl, ql);
1597   }
1598 
1599   /**
1600    * @param left
1601    * @param loffset
1602    * @param llength
1603    * @param lfamilylength Offset of family delimiter in left column.
1604    * @param right
1605    * @param roffset
1606    * @param rlength
1607    * @param rfamilylength Offset of family delimiter in right column.
1608    * @return The result of the comparison.
1609    */
1610   static int compareColumns(final byte [] left, final int loffset,
1611       final int llength, final int lfamilylength,
1612       final byte [] right, final int roffset, final int rlength,
1613       final int rfamilylength) {
1614     // Compare family portion first.
1615     int diff = Bytes.compareTo(left, loffset, lfamilylength,
1616       right, roffset, rfamilylength);
1617     if (diff != 0) {
1618       return diff;
1619     }
1620     // Compare qualifier portion
1621     return Bytes.compareTo(left, loffset + lfamilylength,
1622       llength - lfamilylength,
1623       right, roffset + rfamilylength, rlength - rfamilylength);
1624   }
1625 
1626   /**
1627    * @return True if non-null row and column.
1628    */
1629   public boolean nonNullRowAndColumn() {
1630     return getRowLength() > 0 && !isEmptyColumn();
1631   }
1632 
1633   /**
1634    * @return True if column is empty.
1635    */
1636   public boolean isEmptyColumn() {
1637     return getQualifierLength() == 0;
1638   }
1639 
1640   /**
1641    * Creates a new KeyValue that only contains the key portion (the value is
1642    * set to be null).
1643    * @param lenAsVal replace value with the actual value length (false=empty)
1644    */
1645   public KeyValue createKeyOnly(boolean lenAsVal) {
1646     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1647     // Rebuild as: <keylen:4><0:4><key:keylen>
1648     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1649     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1650     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1651         Math.min(newBuffer.length,this.length));
1652     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1653     if (lenAsVal) {
1654       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1655     }
1656     return new KeyValue(newBuffer);
1657   }
1658 
1659   /**
1660    * Splits a column in family:qualifier form into separate byte arrays.
1661    * <p>
1662    * Not recommend to be used as this is old-style API.
1663    * @param c  The column.
1664    * @return The parsed column.
1665    */
1666   public static byte [][] parseColumn(byte [] c) {
1667     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1668     if (index == -1) {
1669       // If no delimiter, return array of size 1
1670       return new byte [][] { c };
1671     } else if(index == c.length - 1) {
1672       // Only a family, return array size 1
1673       byte [] family = new byte[c.length-1];
1674       System.arraycopy(c, 0, family, 0, family.length);
1675       return new byte [][] { family };
1676     }
1677     // Family and column, return array size 2
1678     final byte [][] result = new byte [2][];
1679     result[0] = new byte [index];
1680     System.arraycopy(c, 0, result[0], 0, index);
1681     final int len = c.length - (index + 1);
1682     result[1] = new byte[len];
1683     System.arraycopy(c, index + 1 /*Skip delimiter*/, result[1], 0,
1684       len);
1685     return result;
1686   }
1687 
1688   /**
1689    * Makes a column in family:qualifier form from separate byte arrays.
1690    * <p>
1691    * Not recommended for usage as this is old-style API.
1692    * @param family
1693    * @param qualifier
1694    * @return family:qualifier
1695    */
1696   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1697     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1698   }
1699 
1700   /**
1701    * @param b
1702    * @return Index of the family-qualifier colon delimiter character in passed
1703    * buffer.
1704    */
1705   public static int getFamilyDelimiterIndex(final byte [] b, final int offset,
1706       final int length) {
1707     return getRequiredDelimiter(b, offset, length, COLUMN_FAMILY_DELIMITER);
1708   }
1709 
1710   private static int getRequiredDelimiter(final byte [] b,
1711       final int offset, final int length, final int delimiter) {
1712     int index = getDelimiter(b, offset, length, delimiter);
1713     if (index < 0) {
1714       throw new IllegalArgumentException("No " + (char)delimiter + " in <" +
1715         Bytes.toString(b) + ">" + ", length=" + length + ", offset=" + offset);
1716     }
1717     return index;
1718   }
1719 
1720   /**
1721    * This function is only used in Meta key comparisons so its error message
1722    * is specific for meta key errors.
1723    */
1724   static int getRequiredDelimiterInReverse(final byte [] b,
1725       final int offset, final int length, final int delimiter) {
1726     int index = getDelimiterInReverse(b, offset, length, delimiter);
1727     if (index < 0) {
1728       throw new IllegalArgumentException(".META. key must have two '" + (char)delimiter + "' "
1729         + "delimiters and have the following format: '<table>,<key>,<etc>'");
1730     }
1731     return index;
1732   }
1733 
1734   /**
1735    * @param b
1736    * @param delimiter
1737    * @return Index of delimiter having started from start of <code>b</code>
1738    * moving rightward.
1739    */
1740   public static int getDelimiter(final byte [] b, int offset, final int length,
1741       final int delimiter) {
1742     if (b == null) {
1743       throw new IllegalArgumentException("Passed buffer is null");
1744     }
1745     int result = -1;
1746     for (int i = offset; i < length + offset; i++) {
1747       if (b[i] == delimiter) {
1748         result = i;
1749         break;
1750       }
1751     }
1752     return result;
1753   }
1754 
1755   /**
1756    * Find index of passed delimiter walking from end of buffer backwards.
1757    * @param b
1758    * @param delimiter
1759    * @return Index of delimiter
1760    */
1761   public static int getDelimiterInReverse(final byte [] b, final int offset,
1762       final int length, final int delimiter) {
1763     if (b == null) {
1764       throw new IllegalArgumentException("Passed buffer is null");
1765     }
1766     int result = -1;
1767     for (int i = (offset + length) - 1; i >= offset; i--) {
1768       if (b[i] == delimiter) {
1769         result = i;
1770         break;
1771       }
1772     }
1773     return result;
1774   }
1775 
1776   /**
1777    * A {@link KVComparator} for <code>-ROOT-</code> catalog table
1778    * {@link KeyValue}s.
1779    */
1780   public static class RootComparator extends MetaComparator {
1781     private final KeyComparator rawcomparator = new RootKeyComparator();
1782 
1783     public KeyComparator getRawComparator() {
1784       return this.rawcomparator;
1785     }
1786 
1787     @Override
1788     protected Object clone() throws CloneNotSupportedException {
1789       return new RootComparator();
1790     }
1791   }
1792 
1793   /**
1794    * A {@link KVComparator} for <code>.META.</code> catalog table
1795    * {@link KeyValue}s.
1796    */
1797   public static class MetaComparator extends KVComparator {
1798     private final KeyComparator rawcomparator = new MetaKeyComparator();
1799 
1800     public KeyComparator getRawComparator() {
1801       return this.rawcomparator;
1802     }
1803 
1804     @Override
1805     protected Object clone() throws CloneNotSupportedException {
1806       return new MetaComparator();
1807     }
1808   }
1809 
1810   /**
1811    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1812    * portion.  This means two KeyValues with same Key but different Values are
1813    * considered the same as far as this Comparator is concerned.
1814    * Hosts a {@link KeyComparator}.
1815    */
1816   public static class KVComparator implements java.util.Comparator<KeyValue> {
1817     private final KeyComparator rawcomparator = new KeyComparator();
1818 
1819     /**
1820      * @return RawComparator that can compare the Key portion of a KeyValue.
1821      * Used in hfile where indices are the Key portion of a KeyValue.
1822      */
1823     public KeyComparator getRawComparator() {
1824       return this.rawcomparator;
1825     }
1826 
1827     public int compare(final KeyValue left, final KeyValue right) {
1828       int ret = getRawComparator().compare(left.getBuffer(),
1829           left.getOffset() + ROW_OFFSET, left.getKeyLength(),
1830           right.getBuffer(), right.getOffset() + ROW_OFFSET,
1831           right.getKeyLength());
1832       if (ret != 0) return ret;
1833       // Negate this comparison so later edits show up first
1834       return -Longs.compare(left.getMemstoreTS(), right.getMemstoreTS());
1835     }
1836 
1837     public int compareTimestamps(final KeyValue left, final KeyValue right) {
1838       return compareTimestamps(left, left.getKeyLength(), right,
1839         right.getKeyLength());
1840     }
1841 
1842     int compareTimestamps(final KeyValue left, final int lkeylength,
1843         final KeyValue right, final int rkeylength) {
1844       // Compare timestamps
1845       long ltimestamp = left.getTimestamp(lkeylength);
1846       long rtimestamp = right.getTimestamp(rkeylength);
1847       return getRawComparator().compareTimestamps(ltimestamp, rtimestamp);
1848     }
1849 
1850     /**
1851      * @param left
1852      * @param right
1853      * @return Result comparing rows.
1854      */
1855     public int compareRows(final KeyValue left, final KeyValue right) {
1856       return compareRows(left, left.getRowLength(), right,
1857           right.getRowLength());
1858     }
1859 
1860     /**
1861      * @param left
1862      * @param lrowlength Length of left row.
1863      * @param right
1864      * @param rrowlength Length of right row.
1865      * @return Result comparing rows.
1866      */
1867     public int compareRows(final KeyValue left, final short lrowlength,
1868         final KeyValue right, final short rrowlength) {
1869       return getRawComparator().compareRows(left.getBuffer(),
1870           left.getRowOffset(), lrowlength,
1871         right.getBuffer(), right.getRowOffset(), rrowlength);
1872     }
1873 
1874     /**
1875      * @param left
1876      * @param row - row key (arbitrary byte array)
1877      * @return RawComparator
1878      */
1879     public int compareRows(final KeyValue left, final byte [] row) {
1880       return getRawComparator().compareRows(left.getBuffer(),
1881           left.getRowOffset(), left.getRowLength(), row, 0, row.length);
1882     }
1883 
1884     public int compareRows(byte [] left, int loffset, int llength,
1885         byte [] right, int roffset, int rlength) {
1886       return getRawComparator().compareRows(left, loffset, llength,
1887         right, roffset, rlength);
1888     }
1889 
1890     public int compareColumns(final KeyValue left, final byte [] right,
1891         final int roffset, final int rlength, final int rfamilyoffset) {
1892       int offset = left.getFamilyOffset();
1893       int length = left.getFamilyLength() + left.getQualifierLength();
1894       return getRawComparator().compareColumns(left.getBuffer(), offset, length,
1895         left.getFamilyLength(offset),
1896         right, roffset, rlength, rfamilyoffset);
1897     }
1898 
1899     int compareColumns(final KeyValue left, final short lrowlength,
1900         final KeyValue right, final short rrowlength) {
1901       int lfoffset = left.getFamilyOffset(lrowlength);
1902       int rfoffset = right.getFamilyOffset(rrowlength);
1903       int lclength = left.getTotalColumnLength(lrowlength,lfoffset);
1904       int rclength = right.getTotalColumnLength(rrowlength, rfoffset);
1905       int lfamilylength = left.getFamilyLength(lfoffset);
1906       int rfamilylength = right.getFamilyLength(rfoffset);
1907       return getRawComparator().compareColumns(left.getBuffer(), lfoffset,
1908           lclength, lfamilylength,
1909         right.getBuffer(), rfoffset, rclength, rfamilylength);
1910     }
1911 
1912     /**
1913      * Compares the row and column of two keyvalues for equality
1914      * @param left
1915      * @param right
1916      * @return True if same row and column.
1917      */
1918     public boolean matchingRowColumn(final KeyValue left,
1919         final KeyValue right) {
1920       short lrowlength = left.getRowLength();
1921       short rrowlength = right.getRowLength();
1922       // TsOffset = end of column data. just comparing Row+CF length of each
1923       return ((left.getTimestampOffset() - left.getOffset()) ==
1924               (right.getTimestampOffset() - right.getOffset())) &&
1925         matchingRows(left, lrowlength, right, rrowlength) &&
1926         compareColumns(left, lrowlength, right, rrowlength) == 0;
1927     }
1928 
1929     /**
1930      * @param left
1931      * @param right
1932      * @return True if rows match.
1933      */
1934     public boolean matchingRows(final KeyValue left, final byte [] right) {
1935       return Bytes.equals(left.getBuffer(), left.getRowOffset(), left.getRowLength(),
1936           right, 0, right.length);
1937     }
1938 
1939     /**
1940      * Compares the row of two keyvalues for equality
1941      * @param left
1942      * @param right
1943      * @return True if rows match.
1944      */
1945     public boolean matchingRows(final KeyValue left, final KeyValue right) {
1946       short lrowlength = left.getRowLength();
1947       short rrowlength = right.getRowLength();
1948       return matchingRows(left, lrowlength, right, rrowlength);
1949     }
1950 
1951     /**
1952      * @param left
1953      * @param lrowlength
1954      * @param right
1955      * @param rrowlength
1956      * @return True if rows match.
1957      */
1958     public boolean matchingRows(final KeyValue left, final short lrowlength,
1959         final KeyValue right, final short rrowlength) {
1960       return lrowlength == rrowlength &&
1961           Bytes.equals(left.getBuffer(), left.getRowOffset(), lrowlength,
1962               right.getBuffer(), right.getRowOffset(), rrowlength);
1963     }
1964 
1965     public boolean matchingRows(final byte [] left, final int loffset,
1966         final int llength,
1967         final byte [] right, final int roffset, final int rlength) {
1968       return Bytes.equals(left, loffset, llength,
1969           right, roffset, rlength);
1970     }
1971 
1972     /**
1973      * Compares the row and timestamp of two keys
1974      * Was called matchesWithoutColumn in HStoreKey.
1975      * @param right Key to compare against.
1976      * @return True if same row and timestamp is greater than the timestamp in
1977      * <code>right</code>
1978      */
1979     public boolean matchingRowsGreaterTimestamp(final KeyValue left,
1980         final KeyValue right) {
1981       short lrowlength = left.getRowLength();
1982       short rrowlength = right.getRowLength();
1983       if (!matchingRows(left, lrowlength, right, rrowlength)) {
1984         return false;
1985       }
1986       return left.getTimestamp() >= right.getTimestamp();
1987     }
1988 
1989     @Override
1990     protected Object clone() throws CloneNotSupportedException {
1991       return new KVComparator();
1992     }
1993 
1994     /**
1995      * @return Comparator that ignores timestamps; useful counting versions.
1996      */
1997     public KVComparator getComparatorIgnoringTimestamps() {
1998       KVComparator c = null;
1999       try {
2000         c = (KVComparator)this.clone();
2001         c.getRawComparator().ignoreTimestamp = true;
2002       } catch (CloneNotSupportedException e) {
2003         LOG.error("Not supported", e);
2004       }
2005       return c;
2006     }
2007 
2008     /**
2009      * @return Comparator that ignores key type; useful checking deletes
2010      */
2011     public KVComparator getComparatorIgnoringType() {
2012       KVComparator c = null;
2013       try {
2014         c = (KVComparator)this.clone();
2015         c.getRawComparator().ignoreType = true;
2016       } catch (CloneNotSupportedException e) {
2017         LOG.error("Not supported", e);
2018       }
2019       return c;
2020     }
2021   }
2022 
2023   /**
2024    * Creates a KeyValue that is last on the specified row id. That is,
2025    * every other possible KeyValue for the given row would compareTo()
2026    * less than the result of this call.
2027    * @param row row key
2028    * @return Last possible KeyValue on passed <code>row</code>
2029    */
2030   public static KeyValue createLastOnRow(final byte[] row) {
2031     return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
2032   }
2033 
2034   /**
2035    * Create a KeyValue that is smaller than all other possible KeyValues
2036    * for the given row. That is any (valid) KeyValue on 'row' would sort
2037    * _after_ the result.
2038    *
2039    * @param row - row key (arbitrary byte array)
2040    * @return First possible KeyValue on passed <code>row</code>
2041    */
2042   public static KeyValue createFirstOnRow(final byte [] row) {
2043     return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
2044   }
2045 
2046   /**
2047    * Create a KeyValue that is smaller than all other possible KeyValues
2048    * for the given row. That is any (valid) KeyValue on 'row' would sort
2049    * _after_ the result.
2050    *
2051    * @param row - row key (arbitrary byte array)
2052    * @return First possible KeyValue on passed <code>row</code>
2053    */
2054   public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
2055     return new KeyValue(row, roffset, rlength,
2056         null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
2057   }
2058 
2059   /**
2060    * Creates a KeyValue that is smaller than all other KeyValues that
2061    * are older than the passed timestamp.
2062    * @param row - row key (arbitrary byte array)
2063    * @param ts - timestamp
2064    * @return First possible key on passed <code>row</code> and timestamp.
2065    */
2066   public static KeyValue createFirstOnRow(final byte [] row,
2067       final long ts) {
2068     return new KeyValue(row, null, null, ts, Type.Maximum);
2069   }
2070 
2071   /**
2072    * Create a KeyValue for the specified row, family and qualifier that would be
2073    * smaller than all other possible KeyValues that have the same row,family,qualifier.
2074    * Used for seeking.
2075    * @param row - row key (arbitrary byte array)
2076    * @param family - family name
2077    * @param qualifier - column qualifier
2078    * @return First possible key on passed <code>row</code>, and column.
2079    */
2080   public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
2081       final byte [] qualifier) {
2082     return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
2083   }
2084 
2085   /**
2086    * Create a Delete Family KeyValue for the specified row and family that would
2087    * be smaller than all other possible Delete Family KeyValues that have the
2088    * same row and family.
2089    * Used for seeking.
2090    * @param row - row key (arbitrary byte array)
2091    * @param family - family name
2092    * @return First Delete Family possible key on passed <code>row</code>.
2093    */
2094   public static KeyValue createFirstDeleteFamilyOnRow(final byte [] row,
2095       final byte [] family) {
2096     return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
2097         Type.DeleteFamily);
2098   }
2099 
2100   /**
2101    * @param row - row key (arbitrary byte array)
2102    * @param f - family name
2103    * @param q - column qualifier
2104    * @param ts - timestamp
2105    * @return First possible key on passed <code>row</code>, column and timestamp
2106    */
2107   public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
2108       final byte [] q, final long ts) {
2109     return new KeyValue(row, f, q, ts, Type.Maximum);
2110   }
2111 
2112   /**
2113    * Create a KeyValue for the specified row, family and qualifier that would be
2114    * smaller than all other possible KeyValues that have the same row,
2115    * family, qualifier.
2116    * Used for seeking.
2117    * @param row row key
2118    * @param roffset row offset
2119    * @param rlength row length
2120    * @param family family name
2121    * @param foffset family offset
2122    * @param flength family length
2123    * @param qualifier column qualifier
2124    * @param qoffset qualifier offset
2125    * @param qlength qualifier length
2126    * @return First possible key on passed Row, Family, Qualifier.
2127    */
2128   public static KeyValue createFirstOnRow(final byte [] row,
2129       final int roffset, final int rlength, final byte [] family,
2130       final int foffset, final int flength, final byte [] qualifier,
2131       final int qoffset, final int qlength) {
2132     return new KeyValue(row, roffset, rlength, family,
2133         foffset, flength, qualifier, qoffset, qlength,
2134         HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
2135   }
2136 
2137   /**
2138    * Create a KeyValue for the specified row, family and qualifier that would be
2139    * smaller than all other possible KeyValues that have the same row,
2140    * family, qualifier.
2141    * Used for seeking.
2142    *
2143    * @param buffer the buffer to use for the new <code>KeyValue</code> object
2144    * @param row the value key
2145    * @param family family name
2146    * @param qualifier column qualifier
2147    *
2148    * @return First possible key on passed Row, Family, Qualifier.
2149    *
2150    * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
2151    * than the provided buffer or than <code>Integer.MAX_VALUE</code>
2152    */
2153   public static KeyValue createFirstOnRow(byte [] buffer, final byte [] row,
2154       final byte [] family, final byte [] qualifier)
2155           throws IllegalArgumentException {
2156 
2157     return createFirstOnRow(buffer, 0, row, 0, row.length,
2158         family, 0, family.length,
2159         qualifier, 0, qualifier.length);
2160   }
2161 
2162   /**
2163    * Create a KeyValue for the specified row, family and qualifier that would be
2164    * smaller than all other possible KeyValues that have the same row,
2165    * family, qualifier.
2166    * Used for seeking.
2167    *
2168    * @param buffer the buffer to use for the new <code>KeyValue</code> object
2169    * @param boffset buffer offset
2170    * @param row the value key
2171    * @param roffset row offset
2172    * @param rlength row length
2173    * @param family family name
2174    * @param foffset family offset
2175    * @param flength family length
2176    * @param qualifier column qualifier
2177    * @param qoffset qualifier offset
2178    * @param qlength qualifier length
2179    *
2180    * @return First possible key on passed Row, Family, Qualifier.
2181    *
2182    * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
2183    * than the provided buffer or than <code>Integer.MAX_VALUE</code>
2184    */
2185   public static KeyValue createFirstOnRow(byte [] buffer, final int boffset,
2186       final byte [] row, final int roffset, final int rlength,
2187       final byte [] family, final int foffset, final int flength,
2188       final byte [] qualifier, final int qoffset, final int qlength)
2189           throws IllegalArgumentException {
2190 
2191     long lLength = getKeyValueDataStructureSize(rlength, flength, qlength, 0);
2192 
2193     if (lLength > Integer.MAX_VALUE) {
2194       throw new IllegalArgumentException("KeyValue length " + lLength + " > " + Integer.MAX_VALUE);
2195     }
2196     int iLength = (int) lLength;
2197     if (buffer.length - boffset < iLength) {
2198       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
2199           iLength);
2200     }
2201     return new KeyValue(buffer, boffset,
2202         row, roffset, rlength,
2203         family, foffset, flength,
2204         qualifier, qoffset, qlength,
2205         HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum,
2206         null, 0, 0);
2207   }
2208 
2209   /**
2210    * Create a KeyValue for the specified row, family and qualifier that would be
2211    * larger than or equal to all other possible KeyValues that have the same
2212    * row, family, qualifier.
2213    * Used for reseeking.
2214    * @param row row key
2215    * @param roffset row offset
2216    * @param rlength row length
2217    * @param family family name
2218    * @param foffset family offset
2219    * @param flength family length
2220    * @param qualifier column qualifier
2221    * @param qoffset qualifier offset
2222    * @param qlength qualifier length
2223    * @return Last possible key on passed row, family, qualifier.
2224    */
2225   public static KeyValue createLastOnRow(final byte [] row,
2226       final int roffset, final int rlength, final byte [] family,
2227       final int foffset, final int flength, final byte [] qualifier,
2228       final int qoffset, final int qlength) {
2229     return new KeyValue(row, roffset, rlength, family,
2230         foffset, flength, qualifier, qoffset, qlength,
2231         HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
2232   }
2233 
2234   /**
2235    * Similar to {@link #createLastOnRow(byte[], int, int, byte[], int, int,
2236    * byte[], int, int)} but creates the last key on the row/column of this KV
2237    * (the value part of the returned KV is always empty). Used in creating
2238    * "fake keys" for the multi-column Bloom filter optimization to skip the
2239    * row/column we already know is not in the file.
2240    * @return the last key on the row/column of the given key-value pair
2241    */
2242   public KeyValue createLastOnRowCol() {
2243     return new KeyValue(
2244         bytes, getRowOffset(), getRowLength(),
2245         bytes, getFamilyOffset(), getFamilyLength(),
2246         bytes, getQualifierOffset(), getQualifierLength(),
2247         HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
2248   }
2249 
2250   /**
2251    * Creates the first KV with the row/family/qualifier of this KV and the
2252    * given timestamp. Uses the "maximum" KV type that guarantees that the new
2253    * KV is the lowest possible for this combination of row, family, qualifier,
2254    * and timestamp. This KV's own timestamp is ignored. While this function
2255    * copies the value from this KV, it is normally used on key-only KVs.
2256    */
2257   public KeyValue createFirstOnRowColTS(long ts) {
2258     return new KeyValue(
2259         bytes, getRowOffset(), getRowLength(),
2260         bytes, getFamilyOffset(), getFamilyLength(),
2261         bytes, getQualifierOffset(), getQualifierLength(),
2262         ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
2263   }
2264 
2265   /**
2266    * @param b
2267    * @return A KeyValue made of a byte array that holds the key-only part.
2268    * Needed to convert hfile index members to KeyValues.
2269    */
2270   public static KeyValue createKeyValueFromKey(final byte [] b) {
2271     return createKeyValueFromKey(b, 0, b.length);
2272   }
2273 
2274   /**
2275    * @param bb
2276    * @return A KeyValue made of a byte buffer that holds the key-only part.
2277    * Needed to convert hfile index members to KeyValues.
2278    */
2279   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2280     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2281   }
2282 
2283   /**
2284    * @param b
2285    * @param o
2286    * @param l
2287    * @return A KeyValue made of a byte array that holds the key-only part.
2288    * Needed to convert hfile index members to KeyValues.
2289    */
2290   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2291       final int l) {
2292     byte [] newb = new byte[l + ROW_OFFSET];
2293     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2294     Bytes.putInt(newb, 0, l);
2295     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2296     return new KeyValue(newb);
2297   }
2298 
2299   /**
2300    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2301    * backing bytes copied from the steam.
2302    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2303    * of zero, we will return null which can be useful marking a stream as done.
2304    * @throws IOException
2305    */
2306   public static KeyValue create(final DataInput in) throws IOException {
2307     return create(in.readInt(), in);
2308   }
2309 
2310   /**
2311    * Create a KeyValue reading <code>length</code> from <code>in</code>
2312    * @param length
2313    * @param in
2314    * @return Created KeyValue OR if we find a length of zero, we will return null which
2315    * can be useful marking a stream as done.
2316    * @throws IOException
2317    */
2318   public static KeyValue create(int length, final DataInput in) throws IOException {
2319     if (length == 0) return null;
2320     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2321     byte [] bytes = new byte[length];
2322     in.readFully(bytes);
2323     return new KeyValue(bytes, 0, length);
2324   }
2325 
2326   /**
2327    * Create a KeyValue reading from the raw InputStream.
2328    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2329    * @param in
2330    * @return Created KeyValue OR if we find a length of zero, we will return null which
2331    * can be useful marking a stream as done.
2332    * @throws IOException
2333    */
2334   public static KeyValue iscreate(final InputStream in) throws IOException {
2335     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2336     int bytesRead = 0;
2337     while (bytesRead < intBytes.length) {
2338       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2339       if (n < 0) {
2340         if (bytesRead == 0) return null; // EOF at start is ok
2341         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2342       }
2343       bytesRead += n;
2344     }
2345     // TODO: perhaps some sanity check is needed here.
2346     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2347     IOUtils.readFully(in, bytes, 0, bytes.length);
2348     return new KeyValue(bytes, 0, bytes.length);
2349   }
2350 
2351   /**
2352    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2353    * @param kv
2354    * @param out
2355    * @return Length written on stream
2356    * @throws IOException
2357    * @see #create(DataInput) for the inverse function
2358    */
2359   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2360     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2361     // work for all implementations.
2362     int length = kv.getLength();
2363     out.writeInt(length);
2364     out.write(kv.getBuffer(), kv.getOffset(), length);
2365     return length + Bytes.SIZEOF_INT;
2366   }
2367 
2368   /**
2369    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2370    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2371    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2372    * @param kv
2373    * @param out
2374    * @return Length written on stream
2375    * @throws IOException
2376    * @see #create(DataInput) for the inverse function
2377    * @see #write(KeyValue, DataOutput)
2378    */
2379   public static long oswrite(final KeyValue kv, final OutputStream out) throws IOException {
2380     int length = kv.getLength();
2381     // This does same as DataOuput#writeInt (big-endian, etc.)
2382     out.write(Bytes.toBytes(length));
2383     out.write(kv.getBuffer(), kv.getOffset(), length);
2384     return length + Bytes.SIZEOF_INT;
2385   }
2386 
2387   /**
2388    * Compare key portion of a {@link KeyValue} for keys in <code>-ROOT-<code>
2389    * table.
2390    */
2391   public static class RootKeyComparator extends MetaKeyComparator {
2392     public int compareRows(byte [] left, int loffset, int llength,
2393         byte [] right, int roffset, int rlength) {
2394       // Rows look like this: .META.,ROW_FROM_META,RID
2395       //        LOG.info("ROOT " + Bytes.toString(left, loffset, llength) +
2396       //          "---" + Bytes.toString(right, roffset, rlength));
2397       final int metalength = 7; // '.META.' length
2398       int lmetaOffsetPlusDelimiter = loffset + metalength;
2399       int leftFarDelimiter = getDelimiterInReverse(left,
2400           lmetaOffsetPlusDelimiter,
2401           llength - metalength, HConstants.DELIMITER);
2402       int rmetaOffsetPlusDelimiter = roffset + metalength;
2403       int rightFarDelimiter = getDelimiterInReverse(right,
2404           rmetaOffsetPlusDelimiter, rlength - metalength,
2405           HConstants.DELIMITER);
2406       if (leftFarDelimiter < 0 && rightFarDelimiter >= 0) {
2407         // Nothing between .META. and regionid.  Its first key.
2408         return -1;
2409       } else if (rightFarDelimiter < 0 && leftFarDelimiter >= 0) {
2410         return 1;
2411       } else if (leftFarDelimiter < 0 && rightFarDelimiter < 0) {
2412         return 0;
2413       }
2414       int result = super.compareRows(left, lmetaOffsetPlusDelimiter,
2415           leftFarDelimiter - lmetaOffsetPlusDelimiter,
2416           right, rmetaOffsetPlusDelimiter,
2417           rightFarDelimiter - rmetaOffsetPlusDelimiter);
2418       if (result != 0) {
2419         return result;
2420       }
2421       // Compare last part of row, the rowid.
2422       leftFarDelimiter++;
2423       rightFarDelimiter++;
2424       result = compareRowid(left, leftFarDelimiter,
2425           llength - (leftFarDelimiter - loffset),
2426           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
2427       return result;
2428     }
2429   }
2430 
2431   /**
2432    * Comparator that compares row component only of a KeyValue.
2433    */
2434   public static class RowComparator implements Comparator<KeyValue> {
2435     final KVComparator comparator;
2436 
2437     public RowComparator(final KVComparator c) {
2438       this.comparator = c;
2439     }
2440 
2441     public int compare(KeyValue left, KeyValue right) {
2442       return comparator.compareRows(left, right);
2443     }
2444   }
2445 
2446   /**
2447    * Compare key portion of a {@link KeyValue} for keys in <code>.META.</code>
2448    * table.
2449    */
2450   public static class MetaKeyComparator extends KeyComparator {
2451     public int compareRows(byte [] left, int loffset, int llength,
2452         byte [] right, int roffset, int rlength) {
2453       //        LOG.info("META " + Bytes.toString(left, loffset, llength) +
2454       //          "---" + Bytes.toString(right, roffset, rlength));
2455       int leftDelimiter = getDelimiter(left, loffset, llength,
2456           HConstants.DELIMITER);
2457       int rightDelimiter = getDelimiter(right, roffset, rlength,
2458           HConstants.DELIMITER);
2459       if (leftDelimiter < 0 && rightDelimiter >= 0) {
2460         // Nothing between .META. and regionid.  Its first key.
2461         return -1;
2462       } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
2463         return 1;
2464       } else if (leftDelimiter < 0 && rightDelimiter < 0) {
2465         return 0;
2466       }
2467       // Compare up to the delimiter
2468       int result = Bytes.compareTo(left, loffset, leftDelimiter - loffset,
2469           right, roffset, rightDelimiter - roffset);
2470       if (result != 0) {
2471         return result;
2472       }
2473       // Compare middle bit of the row.
2474       // Move past delimiter
2475       leftDelimiter++;
2476       rightDelimiter++;
2477       int leftFarDelimiter = getRequiredDelimiterInReverse(left, leftDelimiter,
2478           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
2479       int rightFarDelimiter = getRequiredDelimiterInReverse(right,
2480           rightDelimiter, rlength - (rightDelimiter - roffset),
2481           HConstants.DELIMITER);
2482       // Now compare middlesection of row.
2483       result = super.compareRows(left, leftDelimiter,
2484           leftFarDelimiter - leftDelimiter, right, rightDelimiter,
2485           rightFarDelimiter - rightDelimiter);
2486       if (result != 0) {
2487         return result;
2488       }
2489       // Compare last part of row, the rowid.
2490       leftFarDelimiter++;
2491       rightFarDelimiter++;
2492       result = compareRowid(left, leftFarDelimiter,
2493           llength - (leftFarDelimiter - loffset),
2494           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
2495       return result;
2496     }
2497 
2498     @Override
2499     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2500       return Arrays.copyOf(rightKey, rightKey.length);
2501     }
2502 
2503     protected int compareRowid(byte[] left, int loffset, int llength,
2504         byte[] right, int roffset, int rlength) {
2505       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2506     }
2507   }
2508 
2509   /**
2510    * Avoids redundant comparisons for better performance.
2511    */
2512   public static interface SamePrefixComparator<T> {
2513     /**
2514      * Compare two keys assuming that the first n bytes are the same.
2515      * @param commonPrefix How many bytes are the same.
2516      */
2517     public int compareIgnoringPrefix(int commonPrefix,
2518         T left, int loffset, int llength,
2519         T right, int roffset, int rlength);
2520   }
2521 
2522   /**
2523    * Compare key portion of a {@link KeyValue}.
2524    */
2525   public static class KeyComparator
2526       implements RawComparator<byte []>, SamePrefixComparator<byte[]> {
2527     volatile boolean ignoreTimestamp = false;
2528     volatile boolean ignoreType = false;
2529 
2530     public int compare(byte[] left, int loffset, int llength, byte[] right,
2531         int roffset, int rlength) {
2532       // Compare row
2533       short lrowlength = Bytes.toShort(left, loffset);
2534       short rrowlength = Bytes.toShort(right, roffset);
2535       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
2536           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
2537       if (compare != 0) {
2538         return compare;
2539       }
2540 
2541       // Compare the rest of the two KVs without making any assumptions about
2542       // the common prefix. This function will not compare rows anyway, so we
2543       // don't need to tell it that the common prefix includes the row.
2544       return compareWithoutRow(0, left, loffset, llength, right, roffset,
2545           rlength, rrowlength);
2546     }
2547 
2548     /**
2549      * Compare the two key-values, ignoring the prefix of the given length
2550      * that is known to be the same between the two.
2551      * @param commonPrefix the prefix length to ignore
2552      */
2553     @Override
2554     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2555         int loffset, int llength, byte[] right, int roffset, int rlength) {
2556       // Compare row
2557       short lrowlength = Bytes.toShort(left, loffset);
2558       short rrowlength;
2559 
2560       int comparisonResult = 0;
2561       if (commonPrefix < ROW_LENGTH_SIZE) {
2562         // almost nothing in common
2563         rrowlength = Bytes.toShort(right, roffset);
2564         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2565             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2566       } else { // the row length is the same
2567         rrowlength = lrowlength;
2568         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2569           // The rows are not the same. Exclude the common prefix and compare
2570           // the rest of the two rows.
2571           int common = commonPrefix - ROW_LENGTH_SIZE;
2572           comparisonResult = compareRows(
2573               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2574               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2575         }
2576       }
2577       if (comparisonResult != 0) {
2578         return comparisonResult;
2579       }
2580 
2581       assert lrowlength == rrowlength;
2582 
2583       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2584           roffset, rlength, lrowlength);
2585     }
2586 
2587     /**
2588      * Compare columnFamily, qualifier, timestamp, and key type (everything
2589      * except the row). This method is used both in the normal comparator and
2590      * the "same-prefix" comparator. Note that we are assuming that row portions
2591      * of both KVs have already been parsed and found identical, and we don't
2592      * validate that assumption here.
2593      * @param commonPrefix
2594      *          the length of the common prefix of the two key-values being
2595      *          compared, including row length and row
2596      */
2597     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2598         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2599       /***
2600        * KeyValue Format and commonLength:
2601        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2602        * ------------------|-------commonLength--------|--------------
2603        */
2604       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2605 
2606       // commonLength + TIMESTAMP_TYPE_SIZE
2607       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2608       // ColumnFamily + Qualifier length.
2609       int lcolumnlength = llength - commonLengthWithTSAndType;
2610       int rcolumnlength = rlength - commonLengthWithTSAndType;
2611 
2612       byte ltype = left[loffset + (llength - 1)];
2613       byte rtype = right[roffset + (rlength - 1)];
2614 
2615       // If the column is not specified, the "minimum" key type appears the
2616       // latest in the sorted order, regardless of the timestamp. This is used
2617       // for specifying the last key/value in a given row, because there is no
2618       // "lexicographically last column" (it would be infinitely long). The
2619       // "maximum" key type does not need this behavior.
2620       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2621         // left is "bigger", i.e. it appears later in the sorted order
2622         return 1;
2623       }
2624       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2625         return -1;
2626       }
2627 
2628       int lfamilyoffset = commonLength + loffset;
2629       int rfamilyoffset = commonLength + roffset;
2630 
2631       // Column family length.
2632       int lfamilylength = left[lfamilyoffset - 1];
2633       int rfamilylength = right[rfamilyoffset - 1];
2634       // If left family size is not equal to right family size, we need not
2635       // compare the qualifiers.
2636       boolean sameFamilySize = (lfamilylength == rfamilylength);
2637       int common = 0;
2638       if (commonPrefix > 0) {
2639         common = Math.max(0, commonPrefix - commonLength);
2640         if (!sameFamilySize) {
2641           // Common should not be larger than Math.min(lfamilylength,
2642           // rfamilylength).
2643           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2644         } else {
2645           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2646         }
2647       }
2648       if (!sameFamilySize) {
2649         // comparing column family is enough.
2650         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2651             - common, right, rfamilyoffset + common, rfamilylength - common);
2652       }
2653       // Compare family & qualifier together.
2654       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2655           lcolumnlength - common, right, rfamilyoffset + common,
2656           rcolumnlength - common);
2657       if (comparison != 0) {
2658         return comparison;
2659       }
2660       return compareTimestampAndType(left, loffset, llength, right, roffset,
2661           rlength, ltype, rtype);
2662     }
2663 
2664     private int compareTimestampAndType(byte[] left, int loffset, int llength,
2665         byte[] right, int roffset, int rlength, byte ltype, byte rtype) {
2666       int compare;
2667       if (!this.ignoreTimestamp) {
2668         // Get timestamps.
2669         long ltimestamp = Bytes.toLong(left,
2670             loffset + (llength - TIMESTAMP_TYPE_SIZE));
2671         long rtimestamp = Bytes.toLong(right,
2672             roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2673         compare = compareTimestamps(ltimestamp, rtimestamp);
2674         if (compare != 0) {
2675           return compare;
2676         }
2677       }
2678 
2679       if (!this.ignoreType) {
2680         // Compare types. Let the delete types sort ahead of puts; i.e. types
2681         // of higher numbers sort before those of lesser numbers. Maximum (255)
2682         // appears ahead of everything, and minimum (0) appears after
2683         // everything.
2684         return (0xff & rtype) - (0xff & ltype);
2685       }
2686       return 0;
2687     }
2688 
2689     public int compare(byte[] left, byte[] right) {
2690       return compare(left, 0, left.length, right, 0, right.length);
2691     }
2692 
2693     public int compareRows(byte [] left, int loffset, int llength,
2694         byte [] right, int roffset, int rlength) {
2695       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2696     }
2697 
2698     /**
2699      * Generate a faked byte array if possible. It aims to:
2700      * 1)reduce key length, which expects to reduce HFile index memory footprint
2701      * 2)replace TS field with LATEST_TIMESTAMP(to avoid seeking previous block)
2702      * see HBASE-7845 for more details
2703      * we need to ensure: leftKey < newKey <= rightKey
2704      * @param leftKey the previous block's real stop key usually
2705      * @param rightKey the current block's real start key usually
2706      * @return newKey: the newly generated faked key
2707      */
2708     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2709       if (rightKey == null) {
2710         throw new IllegalArgumentException("rightKey can not be null");
2711       }
2712       if (leftKey == null) {
2713         return Arrays.copyOf(rightKey, rightKey.length);
2714       }
2715       if (compare(leftKey, rightKey) >= 0) {
2716         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2717           + ", rightKey:" + Bytes.toString(rightKey));
2718       }
2719 
2720       short leftRowLength = Bytes.toShort(leftKey, 0);
2721       short rightRowLength = Bytes.toShort(rightKey, 0);
2722       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2723       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2724       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2725       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2726       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2727       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2728       // rows are equal
2729       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2730         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2731         // Compare family & qualifier together.
2732         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2733           rightCommonLength, rightColumnLength);
2734         // same with "row + family + qualifier", return rightKey directly
2735         if (comparison == 0) {
2736           return Arrays.copyOf(rightKey, rightKey.length);
2737         }
2738         // "family + qualifier" are different, generate a faked key per rightKey
2739         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2740         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2741         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2742         return newKey;
2743       }
2744       // rows are different
2745       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2746       short diffIdx = 0;
2747       while (diffIdx < minLength
2748           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2749         diffIdx++;
2750       }
2751       if (diffIdx >= minLength) {
2752         // leftKey's row is prefix of rightKey's. we can optimize it in future
2753         return Arrays.copyOf(rightKey, rightKey.length);
2754       }
2755       int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2756       if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2757           (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2758         byte[] newRowKey = new byte[diffIdx + 1];
2759         System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2760         newRowKey[diffIdx] = (byte) (diffByte + 1);
2761         int rightFamilyLength = rightKey[rightCommonLength - 1];
2762         byte[] family = null;
2763         if (rightFamilyLength > 0) {
2764           family = new byte[rightFamilyLength];
2765           System.arraycopy(rightKey, rightCommonLength, family, 0, rightFamilyLength);
2766         }
2767         int rightQualifierLength = rightColumnLength - rightFamilyLength;
2768         byte[] qualifier = null;
2769         if (rightQualifierLength > 0) {
2770           qualifier = new byte[rightQualifierLength];
2771           System.arraycopy(rightKey, rightCommonLength + rightFamilyLength, qualifier, 0,
2772             rightQualifierLength);
2773         }
2774         return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2775           Type.Maximum).getKey();
2776       }
2777       // the following is optimizable in future
2778       return Arrays.copyOf(rightKey, rightKey.length);
2779     }
2780 
2781     protected int compareColumns(
2782         byte [] left, int loffset, int llength, final int lfamilylength,
2783         byte [] right, int roffset, int rlength, final int rfamilylength) {
2784       return KeyValue.compareColumns(left, loffset, llength, lfamilylength,
2785         right, roffset, rlength, rfamilylength);
2786     }
2787 
2788     int compareTimestamps(final long ltimestamp, final long rtimestamp) {
2789       // The below older timestamps sorting ahead of newer timestamps looks
2790       // wrong but it is intentional. This way, newer timestamps are first
2791       // found when we iterate over a memstore and newer versions are the
2792       // first we trip over when reading from a store file.
2793       if (ltimestamp < rtimestamp) {
2794         return 1;
2795       } else if (ltimestamp > rtimestamp) {
2796         return -1;
2797       }
2798       return 0;
2799     }
2800   }
2801 
2802   /**
2803    * HeapSize implementation
2804    *
2805    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2806    * MemStore.
2807    */
2808   @Override
2809   public long heapSize() {
2810     int sum = 0;
2811     sum += ClassSize.OBJECT;// the KeyValue object itself
2812     sum += ClassSize.REFERENCE;// pointer to "bytes"
2813     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2814     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2815     sum += 3 * Bytes.SIZEOF_INT;// offset, length, keyLength
2816     sum += Bytes.SIZEOF_LONG;// memstoreTS
2817     return ClassSize.align(sum);
2818   }
2819 
2820   @Override
2821   public int getTagsOffset() {
2822     throw new UnsupportedOperationException("Not implememnted");
2823   }
2824 
2825   @Override
2826   public int getTagsLength() {
2827     throw new UnsupportedOperationException("Not implememnted");
2828   }
2829 
2830   @Override
2831   public byte[] getTagsArray() {
2832     throw new UnsupportedOperationException("Not implememnted");
2833   }
2834 }