View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.io.HeapSize;
41  import org.apache.hadoop.hbase.io.util.StreamUtils;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.ClassSize;
44  import org.apache.hadoop.io.IOUtils;
45  import org.apache.hadoop.io.RawComparator;
46  
47  import com.google.common.annotations.VisibleForTesting;
48  
49  /**
50   * An HBase Key/Value. This is the fundamental HBase Type.  
51   * <p>
52   * HBase applications and users should use the Cell interface and avoid directly using KeyValue
53   * and member functions not defined in Cell.
54   * <p>
55   * If being used client-side, the primary methods to access individual fields are {@link #getRow()},
56   * {@link #getFamily()}, {@link #getQualifier()}, {@link #getTimestamp()}, and {@link #getValue()}.
57   * These methods allocate new byte arrays and return copies. Avoid their use server-side.
58   * <p>
59   * Instances of this class are immutable. They do not implement Comparable but Comparators are
60   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
61   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
62   * Hfiles, and bloom filter keys.
63   * <p>
64   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
65   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
66   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code> Key is further decomposed as:
67   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier>
68   * &lt;timestamp> &lt;keytype></code>
69   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>, column family length maximum
70   * is <code>Byte.MAX_SIZE</code>, and column qualifier + key length must be <
71   * <code>Integer.MAX_SIZE</code>. The column does not contain the family/qualifier delimiter,
72   * {@link #COLUMN_FAMILY_DELIMITER}<br>
73   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
74   * the value part. The format for this part is: <code>&lt;tagslength>&lt;tagsbytes></code>.
75   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
76   * contain one or more tags where as each tag is of the form
77   * <code>&lt;taglength>&lt;tagtype>&lt;tagbytes></code>.  <code>tagtype</code> is one byte and
78   * <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type length
79   * and actual tag bytes length.
80   */
81  @InterfaceAudience.Private
82  public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, SettableTimestamp {
83    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
84  
85    static final Log LOG = LogFactory.getLog(KeyValue.class);
86  
87    /**
88     * Colon character in UTF-8
89     */
90    public static final char COLUMN_FAMILY_DELIMITER = ':';
91  
92    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
93      new byte[]{COLUMN_FAMILY_DELIMITER};
94  
95    /**
96     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
97     * of KeyValue only.
98     */
99    public static final KVComparator COMPARATOR = new KVComparator();
100   /**
101    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
102    * {@link KeyValue}s.
103    */
104   public static final KVComparator META_COMPARATOR = new MetaComparator();
105 
106   /**
107    * Needed for Bloom Filters.
108    */
109   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
110 
111   /** Size of the key length field in bytes*/
112   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
113 
114   /** Size of the key type field in bytes */
115   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
116 
117   /** Size of the row length field in bytes */
118   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
119 
120   /** Size of the family length field in bytes */
121   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
122 
123   /** Size of the timestamp field in bytes */
124   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
125 
126   // Size of the timestamp and type byte on end of a key -- a long + a byte.
127   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
128 
129   // Size of the length shorts and bytes in key.
130   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
131       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
132 
133   // How far into the key the row starts at. First thing to read is the short
134   // that says how long the row is.
135   public static final int ROW_OFFSET =
136     Bytes.SIZEOF_INT /*keylength*/ +
137     Bytes.SIZEOF_INT /*valuelength*/;
138 
139   // Size of the length ints in a KeyValue datastructure.
140   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
141 
142   /** Size of the tags length field in bytes */
143   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
144 
145   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
146 
147   private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
148 
149   /**
150    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
151    * characteristics would take up for its underlying data structure.
152    *
153    * @param rlength row length
154    * @param flength family length
155    * @param qlength qualifier length
156    * @param vlength value length
157    *
158    * @return the <code>KeyValue</code> data structure length
159    */
160   public static long getKeyValueDataStructureSize(int rlength,
161       int flength, int qlength, int vlength) {
162     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
163         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
164   }
165 
166   /**
167    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
168    * characteristics would take up for its underlying data structure.
169    *
170    * @param rlength row length
171    * @param flength family length
172    * @param qlength qualifier length
173    * @param vlength value length
174    * @param tagsLength total length of the tags
175    *
176    * @return the <code>KeyValue</code> data structure length
177    */
178   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
179       int vlength, int tagsLength) {
180     if (tagsLength == 0) {
181       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
182     }
183     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
184         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
185   }
186 
187   /**
188    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
189    * characteristics would take up for its underlying data structure.
190    *
191    * @param klength key length
192    * @param vlength value length
193    * @param tagsLength total length of the tags
194    *
195    * @return the <code>KeyValue</code> data structure length
196    */
197   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
198     if (tagsLength == 0) {
199       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
200     }
201     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
202   }
203 
204   /**
205    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
206    * characteristics would take up in its underlying data structure for the key.
207    *
208    * @param rlength row length
209    * @param flength family length
210    * @param qlength qualifier length
211    *
212    * @return the key data structure length
213    */
214   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
215     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
216   }
217 
218   /**
219    * Key type.
220    * Has space for other key types to be added later.  Cannot rely on
221    * enum ordinals . They change if item is removed or moved.  Do our own codes.
222    */
223   public static enum Type {
224     Minimum((byte)0),
225     Put((byte)4),
226 
227     Delete((byte)8),
228     DeleteFamilyVersion((byte)10),
229     DeleteColumn((byte)12),
230     DeleteFamily((byte)14),
231 
232     // Maximum is used when searching; you look from maximum on down.
233     Maximum((byte)255);
234 
235     private final byte code;
236 
237     Type(final byte c) {
238       this.code = c;
239     }
240 
241     public byte getCode() {
242       return this.code;
243     }
244 
245     /**
246      * Cannot rely on enum ordinals . They change if item is removed or moved.
247      * Do our own codes.
248      * @param b
249      * @return Type associated with passed code.
250      */
251     public static Type codeToType(final byte b) {
252       for (Type t : Type.values()) {
253         if (t.getCode() == b) {
254           return t;
255         }
256       }
257       throw new RuntimeException("Unknown code " + b);
258     }
259   }
260 
261   /**
262    * Lowest possible key.
263    * Makes a Key with highest possible Timestamp, empty row and column.  No
264    * key can be equal or lower than this one in memstore or in store file.
265    */
266   public static final KeyValue LOWESTKEY =
267     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
268 
269   ////
270   // KeyValue core instance fields.
271   private byte [] bytes = null;  // an immutable byte array that contains the KV
272   private int offset = 0;  // offset into bytes buffer KV starts at
273   private int length = 0;  // length of the KV starting from offset.
274 
275   /**
276    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
277    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
278    * KeyValue type.
279    */
280   public static boolean isDelete(byte t) {
281     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
282   }
283 
284   /** Here be dragons **/
285 
286   // used to achieve atomic operations in the memstore.
287   @Override
288   public long getMvccVersion() {
289     return this.getSequenceId();
290   }
291 
292   /**
293    * used to achieve atomic operations in the memstore.
294    */
295   @Override
296   public long getSequenceId() {
297     return seqId;
298   }
299 
300   public void setSequenceId(long seqId) {
301     this.seqId = seqId;
302   }
303 
304   // multi-version concurrency control version.  default value is 0, aka do not care.
305   private long seqId = 0;
306 
307   /** Dragon time over, return to normal business */
308 
309 
310   /** Writable Constructor -- DO NOT USE */
311   public KeyValue() {}
312 
313   /**
314    * Creates a KeyValue from the start of the specified byte array.
315    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
316    * @param bytes byte array
317    */
318   public KeyValue(final byte [] bytes) {
319     this(bytes, 0);
320   }
321 
322   /**
323    * Creates a KeyValue from the specified byte array and offset.
324    * Presumes <code>bytes</code> content starting at <code>offset</code> is
325    * formatted as a KeyValue blob.
326    * @param bytes byte array
327    * @param offset offset to start of KeyValue
328    */
329   public KeyValue(final byte [] bytes, final int offset) {
330     this(bytes, offset, getLength(bytes, offset));
331   }
332 
333   /**
334    * Creates a KeyValue from the specified byte array, starting at offset, and
335    * for length <code>length</code>.
336    * @param bytes byte array
337    * @param offset offset to start of the KeyValue
338    * @param length length of the KeyValue
339    */
340   public KeyValue(final byte [] bytes, final int offset, final int length) {
341     this.bytes = bytes;
342     this.offset = offset;
343     this.length = length;
344   }
345 
346   /**
347    * Creates a KeyValue from the specified byte array, starting at offset, and
348    * for length <code>length</code>.
349    *
350    * @param bytes  byte array
351    * @param offset offset to start of the KeyValue
352    * @param length length of the KeyValue
353    * @param ts
354    */
355   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
356     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
357   }
358 
359   /** Constructors that build a new backing byte array from fields */
360 
361   /**
362    * Constructs KeyValue structure filled with null value.
363    * Sets type to {@link KeyValue.Type#Maximum}
364    * @param row - row key (arbitrary byte array)
365    * @param timestamp
366    */
367   public KeyValue(final byte [] row, final long timestamp) {
368     this(row, null, null, timestamp, Type.Maximum, null);
369   }
370 
371   /**
372    * Constructs KeyValue structure filled with null value.
373    * @param row - row key (arbitrary byte array)
374    * @param timestamp
375    */
376   public KeyValue(final byte [] row, final long timestamp, Type type) {
377     this(row, null, null, timestamp, type, null);
378   }
379 
380   /**
381    * Constructs KeyValue structure filled with null value.
382    * Sets type to {@link KeyValue.Type#Maximum}
383    * @param row - row key (arbitrary byte array)
384    * @param family family name
385    * @param qualifier column qualifier
386    */
387   public KeyValue(final byte [] row, final byte [] family,
388       final byte [] qualifier) {
389     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
390   }
391 
392   /**
393    * Constructs KeyValue structure filled with null value.
394    * @param row - row key (arbitrary byte array)
395    * @param family family name
396    * @param qualifier column qualifier
397    */
398   public KeyValue(final byte [] row, final byte [] family,
399       final byte [] qualifier, final byte [] value) {
400     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
401   }
402 
403   /**
404    * Constructs KeyValue structure filled with specified values.
405    * @param row row key
406    * @param family family name
407    * @param qualifier column qualifier
408    * @param timestamp version timestamp
409    * @param type key type
410    * @throws IllegalArgumentException
411    */
412   public KeyValue(final byte[] row, final byte[] family,
413       final byte[] qualifier, final long timestamp, Type type) {
414     this(row, family, qualifier, timestamp, type, null);
415   }
416 
417   /**
418    * Constructs KeyValue structure filled with specified values.
419    * @param row row key
420    * @param family family name
421    * @param qualifier column qualifier
422    * @param timestamp version timestamp
423    * @param value column value
424    * @throws IllegalArgumentException
425    */
426   public KeyValue(final byte[] row, final byte[] family,
427       final byte[] qualifier, final long timestamp, final byte[] value) {
428     this(row, family, qualifier, timestamp, Type.Put, value);
429   }
430 
431   /**
432    * Constructs KeyValue structure filled with specified values.
433    * @param row row key
434    * @param family family name
435    * @param qualifier column qualifier
436    * @param timestamp version timestamp
437    * @param value column value
438    * @param tags tags
439    * @throws IllegalArgumentException
440    */
441   public KeyValue(final byte[] row, final byte[] family,
442       final byte[] qualifier, final long timestamp, final byte[] value,
443       final Tag[] tags) {
444     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
445   }
446 
447   /**
448    * Constructs KeyValue structure filled with specified values.
449    * @param row row key
450    * @param family family name
451    * @param qualifier column qualifier
452    * @param timestamp version timestamp
453    * @param value column value
454    * @param tags tags non-empty list of tags or null
455    * @throws IllegalArgumentException
456    */
457   public KeyValue(final byte[] row, final byte[] family,
458       final byte[] qualifier, final long timestamp, final byte[] value,
459       final List<Tag> tags) {
460     this(row, 0, row==null ? 0 : row.length,
461       family, 0, family==null ? 0 : family.length,
462       qualifier, 0, qualifier==null ? 0 : qualifier.length,
463       timestamp, Type.Put,
464       value, 0, value==null ? 0 : value.length, tags);
465   }
466 
467   /**
468    * Constructs KeyValue structure filled with specified values.
469    * @param row row key
470    * @param family family name
471    * @param qualifier column qualifier
472    * @param timestamp version timestamp
473    * @param type key type
474    * @param value column value
475    * @throws IllegalArgumentException
476    */
477   public KeyValue(final byte[] row, final byte[] family,
478       final byte[] qualifier, final long timestamp, Type type,
479       final byte[] value) {
480     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
481         timestamp, type,   value, 0, len(value));
482   }
483 
484   /**
485    * Constructs KeyValue structure filled with specified values.
486    * <p>
487    * Column is split into two fields, family and qualifier.
488    * @param row row key
489    * @param family family name
490    * @param qualifier column qualifier
491    * @param timestamp version timestamp
492    * @param type key type
493    * @param value column value
494    * @throws IllegalArgumentException
495    */
496   public KeyValue(final byte[] row, final byte[] family,
497       final byte[] qualifier, final long timestamp, Type type,
498       final byte[] value, final List<Tag> tags) {
499     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
500         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
501   }
502 
503   /**
504    * Constructs KeyValue structure filled with specified values.
505    * @param row row key
506    * @param family family name
507    * @param qualifier column qualifier
508    * @param timestamp version timestamp
509    * @param type key type
510    * @param value column value
511    * @throws IllegalArgumentException
512    */
513   public KeyValue(final byte[] row, final byte[] family,
514       final byte[] qualifier, final long timestamp, Type type,
515       final byte[] value, final byte[] tags) {
516     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
517         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
518   }
519 
520   /**
521    * Constructs KeyValue structure filled with specified values.
522    * @param row row key
523    * @param family family name
524    * @param qualifier column qualifier
525    * @param qoffset qualifier offset
526    * @param qlength qualifier length
527    * @param timestamp version timestamp
528    * @param type key type
529    * @param value column value
530    * @param voffset value offset
531    * @param vlength value length
532    * @throws IllegalArgumentException
533    */
534   public KeyValue(byte [] row, byte [] family,
535       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
536       byte [] value, int voffset, int vlength, List<Tag> tags) {
537     this(row, 0, row==null ? 0 : row.length,
538         family, 0, family==null ? 0 : family.length,
539         qualifier, qoffset, qlength, timestamp, type,
540         value, voffset, vlength, tags);
541   }
542 
543   /**
544    * @param row
545    * @param family
546    * @param qualifier
547    * @param qoffset
548    * @param qlength
549    * @param timestamp
550    * @param type
551    * @param value
552    * @param voffset
553    * @param vlength
554    * @param tags
555    */
556   public KeyValue(byte [] row, byte [] family,
557       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
558       byte [] value, int voffset, int vlength, byte[] tags) {
559     this(row, 0, row==null ? 0 : row.length,
560         family, 0, family==null ? 0 : family.length,
561         qualifier, qoffset, qlength, timestamp, type,
562         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
563   }
564 
565   /**
566    * Constructs KeyValue structure filled with specified values.
567    * <p>
568    * Column is split into two fields, family and qualifier.
569    * @param row row key
570    * @throws IllegalArgumentException
571    */
572   public KeyValue(final byte [] row, final int roffset, final int rlength,
573       final byte [] family, final int foffset, final int flength,
574       final byte [] qualifier, final int qoffset, final int qlength,
575       final long timestamp, final Type type,
576       final byte [] value, final int voffset, final int vlength) {
577     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
578       qlength, timestamp, type, value, voffset, vlength, null);
579   }
580   
581   /**
582    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
583    * data buffer.
584    * <p>
585    * Column is split into two fields, family and qualifier.
586    *
587    * @param buffer the bytes buffer to use
588    * @param boffset buffer offset
589    * @param row row key
590    * @param roffset row offset
591    * @param rlength row length
592    * @param family family name
593    * @param foffset family offset
594    * @param flength family length
595    * @param qualifier column qualifier
596    * @param qoffset qualifier offset
597    * @param qlength qualifier length
598    * @param timestamp version timestamp
599    * @param type key type
600    * @param value column value
601    * @param voffset value offset
602    * @param vlength value length
603    * @param tags non-empty list of tags or null
604    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
605    * remaining in the buffer
606    */
607   public KeyValue(byte [] buffer, final int boffset,
608       final byte [] row, final int roffset, final int rlength,
609       final byte [] family, final int foffset, final int flength,
610       final byte [] qualifier, final int qoffset, final int qlength,
611       final long timestamp, final Type type,
612       final byte [] value, final int voffset, final int vlength,
613       final Tag[] tags) {
614      this.bytes  = buffer;
615      this.length = writeByteArray(buffer, boffset,
616          row, roffset, rlength,
617          family, foffset, flength, qualifier, qoffset, qlength,
618         timestamp, type, value, voffset, vlength, tags);
619      this.offset = boffset;
620    }
621 
622   /**
623    * Constructs KeyValue structure filled with specified values.
624    * <p>
625    * Column is split into two fields, family and qualifier.
626    * @param row row key
627    * @param roffset row offset
628    * @param rlength row length
629    * @param family family name
630    * @param foffset family offset
631    * @param flength family length
632    * @param qualifier column qualifier
633    * @param qoffset qualifier offset
634    * @param qlength qualifier length
635    * @param timestamp version timestamp
636    * @param type key type
637    * @param value column value
638    * @param voffset value offset
639    * @param vlength value length
640    * @param tags tags
641    * @throws IllegalArgumentException
642    */
643   public KeyValue(final byte [] row, final int roffset, final int rlength,
644       final byte [] family, final int foffset, final int flength,
645       final byte [] qualifier, final int qoffset, final int qlength,
646       final long timestamp, final Type type,
647       final byte [] value, final int voffset, final int vlength,
648       final List<Tag> tags) {
649     this.bytes = createByteArray(row, roffset, rlength,
650         family, foffset, flength, qualifier, qoffset, qlength,
651         timestamp, type, value, voffset, vlength, tags);
652     this.length = bytes.length;
653     this.offset = 0;
654   }
655 
656   /**
657    * @param row
658    * @param roffset
659    * @param rlength
660    * @param family
661    * @param foffset
662    * @param flength
663    * @param qualifier
664    * @param qoffset
665    * @param qlength
666    * @param timestamp
667    * @param type
668    * @param value
669    * @param voffset
670    * @param vlength
671    * @param tags
672    */
673   public KeyValue(final byte [] row, final int roffset, final int rlength,
674       final byte [] family, final int foffset, final int flength,
675       final byte [] qualifier, final int qoffset, final int qlength,
676       final long timestamp, final Type type,
677       final byte [] value, final int voffset, final int vlength,
678       final byte[] tags, final int tagsOffset, final int tagsLength) {
679     this.bytes = createByteArray(row, roffset, rlength,
680         family, foffset, flength, qualifier, qoffset, qlength,
681         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
682     this.length = bytes.length;
683     this.offset = 0;
684   }
685 
686   /**
687    * Constructs an empty KeyValue structure, with specified sizes.
688    * This can be used to partially fill up KeyValues.
689    * <p>
690    * Column is split into two fields, family and qualifier.
691    * @param rlength row length
692    * @param flength family length
693    * @param qlength qualifier length
694    * @param timestamp version timestamp
695    * @param type key type
696    * @param vlength value length
697    * @throws IllegalArgumentException
698    */
699   public KeyValue(final int rlength,
700       final int flength,
701       final int qlength,
702       final long timestamp, final Type type,
703       final int vlength) {
704     this(rlength, flength, qlength, timestamp, type, vlength, 0);
705   }
706 
707   /**
708    * Constructs an empty KeyValue structure, with specified sizes.
709    * This can be used to partially fill up KeyValues.
710    * <p>
711    * Column is split into two fields, family and qualifier.
712    * @param rlength row length
713    * @param flength family length
714    * @param qlength qualifier length
715    * @param timestamp version timestamp
716    * @param type key type
717    * @param vlength value length
718    * @param tagsLength
719    * @throws IllegalArgumentException
720    */
721   public KeyValue(final int rlength,
722       final int flength,
723       final int qlength,
724       final long timestamp, final Type type,
725       final int vlength, final int tagsLength) {
726     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
727         tagsLength);
728     this.length = bytes.length;
729     this.offset = 0;
730   }
731 
732 
733   public KeyValue(byte[] row, int roffset, int rlength,
734                   byte[] family, int foffset, int flength,
735                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
736     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
737         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
738         value, 0, value == null ? 0 : value.remaining(), tags);
739     this.length = bytes.length;
740     this.offset = 0;
741   }
742 
743   public KeyValue(Cell c) {
744     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
745         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
746         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
747         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
748         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
749   }
750 
751   /**
752    * Create an empty byte[] representing a KeyValue
753    * All lengths are preset and can be filled in later.
754    * @param rlength
755    * @param flength
756    * @param qlength
757    * @param timestamp
758    * @param type
759    * @param vlength
760    * @return The newly created byte array.
761    */
762   private static byte[] createEmptyByteArray(final int rlength, int flength,
763       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
764     if (rlength > Short.MAX_VALUE) {
765       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
766     }
767     if (flength > Byte.MAX_VALUE) {
768       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
769     }
770     // Qualifier length
771     if (qlength > Integer.MAX_VALUE - rlength - flength) {
772       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
773     }
774     checkForTagsLength(tagsLength);
775     // Key length
776     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
777     if (longkeylength > Integer.MAX_VALUE) {
778       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
779         Integer.MAX_VALUE);
780     }
781     int keylength = (int)longkeylength;
782     // Value length
783     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
784       throw new IllegalArgumentException("Valuer > " +
785           HConstants.MAXIMUM_VALUE_LENGTH);
786     }
787 
788     // Allocate right-sized byte array.
789     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
790         tagsLength)];
791     // Write the correct size markers
792     int pos = 0;
793     pos = Bytes.putInt(bytes, pos, keylength);
794     pos = Bytes.putInt(bytes, pos, vlength);
795     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
796     pos += rlength;
797     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
798     pos += flength + qlength;
799     pos = Bytes.putLong(bytes, pos, timestamp);
800     pos = Bytes.putByte(bytes, pos, type.getCode());
801     pos += vlength;
802     if (tagsLength > 0) {
803       pos = Bytes.putAsShort(bytes, pos, tagsLength);
804     }
805     return bytes;
806   }
807 
808   /**
809    * Checks the parameters passed to a constructor.
810    *
811    * @param row row key
812    * @param rlength row length
813    * @param family family name
814    * @param flength family length
815    * @param qlength qualifier length
816    * @param vlength value length
817    *
818    * @throws IllegalArgumentException an illegal value was passed
819    */
820   private static void checkParameters(final byte [] row, final int rlength,
821       final byte [] family, int flength, int qlength, int vlength)
822           throws IllegalArgumentException {
823     if (rlength > Short.MAX_VALUE) {
824       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
825     }
826     if (row == null) {
827       throw new IllegalArgumentException("Row is null");
828     }
829     // Family length
830     flength = family == null ? 0 : flength;
831     if (flength > Byte.MAX_VALUE) {
832       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
833     }
834     // Qualifier length
835     if (qlength > Integer.MAX_VALUE - rlength - flength) {
836       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
837     }
838     // Key length
839     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
840     if (longKeyLength > Integer.MAX_VALUE) {
841       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
842           Integer.MAX_VALUE);
843     }
844     // Value length
845     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
846       throw new IllegalArgumentException("Value length " + vlength + " > " +
847           HConstants.MAXIMUM_VALUE_LENGTH);
848     }
849   }
850 
851   /**
852    * Write KeyValue format into the provided byte array.
853    *
854    * @param buffer the bytes buffer to use
855    * @param boffset buffer offset
856    * @param row row key
857    * @param roffset row offset
858    * @param rlength row length
859    * @param family family name
860    * @param foffset family offset
861    * @param flength family length
862    * @param qualifier column qualifier
863    * @param qoffset qualifier offset
864    * @param qlength qualifier length
865    * @param timestamp version timestamp
866    * @param type key type
867    * @param value column value
868    * @param voffset value offset
869    * @param vlength value length
870    *
871    * @return The number of useful bytes in the buffer.
872    *
873    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
874    * remaining in the buffer
875    */
876   public static int writeByteArray(byte [] buffer, final int boffset,
877       final byte [] row, final int roffset, final int rlength,
878       final byte [] family, final int foffset, int flength,
879       final byte [] qualifier, final int qoffset, int qlength,
880       final long timestamp, final Type type,
881       final byte [] value, final int voffset, int vlength, Tag[] tags) {
882 
883     checkParameters(row, rlength, family, flength, qlength, vlength);
884 
885     // Calculate length of tags area
886     int tagsLength = 0;
887     if (tags != null && tags.length > 0) {
888       for (Tag t: tags) {
889         tagsLength += t.getLength();
890       }
891     }
892     checkForTagsLength(tagsLength);
893     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
894     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
895         tagsLength);
896     if (keyValueLength > buffer.length - boffset) {
897       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
898           keyValueLength);
899     }
900 
901     // Write key, value and key row length.
902     int pos = boffset;
903     pos = Bytes.putInt(buffer, pos, keyLength);
904     pos = Bytes.putInt(buffer, pos, vlength);
905     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
906     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
907     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
908     if (flength != 0) {
909       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
910     }
911     if (qlength != 0) {
912       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
913     }
914     pos = Bytes.putLong(buffer, pos, timestamp);
915     pos = Bytes.putByte(buffer, pos, type.getCode());
916     if (value != null && value.length > 0) {
917       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
918     }
919     // Write the number of tags. If it is 0 then it means there are no tags.
920     if (tagsLength > 0) {
921       pos = Bytes.putAsShort(buffer, pos, tagsLength);
922       for (Tag t : tags) {
923         pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
924       }
925     }
926     return keyValueLength;
927   }
928 
929   private static void checkForTagsLength(int tagsLength) {
930     if (tagsLength > MAX_TAGS_LENGTH) {
931       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
932     }
933   }
934 
935   /**
936    * Write KeyValue format into a byte array.
937    * @param row row key
938    * @param roffset row offset
939    * @param rlength row length
940    * @param family family name
941    * @param foffset family offset
942    * @param flength family length
943    * @param qualifier column qualifier
944    * @param qoffset qualifier offset
945    * @param qlength qualifier length
946    * @param timestamp version timestamp
947    * @param type key type
948    * @param value column value
949    * @param voffset value offset
950    * @param vlength value length
951    * @return The newly created byte array.
952    */
953   private static byte [] createByteArray(final byte [] row, final int roffset,
954       final int rlength, final byte [] family, final int foffset, int flength,
955       final byte [] qualifier, final int qoffset, int qlength,
956       final long timestamp, final Type type,
957       final byte [] value, final int voffset, 
958       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
959 
960     checkParameters(row, rlength, family, flength, qlength, vlength);
961     checkForTagsLength(tagsLength);
962     // Allocate right-sized byte array.
963     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
964     byte [] bytes =
965         new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
966     // Write key, value and key row length.
967     int pos = 0;
968     pos = Bytes.putInt(bytes, pos, keyLength);
969     pos = Bytes.putInt(bytes, pos, vlength);
970     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
971     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
972     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
973     if(flength != 0) {
974       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
975     }
976     if(qlength != 0) {
977       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
978     }
979     pos = Bytes.putLong(bytes, pos, timestamp);
980     pos = Bytes.putByte(bytes, pos, type.getCode());
981     if (value != null && value.length > 0) {
982       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
983     }
984     // Add the tags after the value part
985     if (tagsLength > 0) {
986       pos = Bytes.putAsShort(bytes, pos, tagsLength);
987       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
988     }
989     return bytes;
990   }
991 
992   /**
993    * @param qualifier can be a ByteBuffer or a byte[], or null.
994    * @param value can be a ByteBuffer or a byte[], or null.
995    */
996   private static byte [] createByteArray(final byte [] row, final int roffset,
997       final int rlength, final byte [] family, final int foffset, int flength,
998       final Object qualifier, final int qoffset, int qlength,
999       final long timestamp, final Type type,
1000       final Object value, final int voffset, int vlength, List<Tag> tags) {
1001 
1002     checkParameters(row, rlength, family, flength, qlength, vlength);
1003 
1004     // Calculate length of tags area
1005     int tagsLength = 0;
1006     if (tags != null && !tags.isEmpty()) {
1007       for (Tag t : tags) {
1008         tagsLength += t.getLength();
1009       }
1010     }
1011     checkForTagsLength(tagsLength);
1012     // Allocate right-sized byte array.
1013     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1014     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1015         tagsLength)];
1016 
1017     // Write key, value and key row length.
1018     int pos = 0;
1019     pos = Bytes.putInt(bytes, pos, keyLength);
1020 
1021     pos = Bytes.putInt(bytes, pos, vlength);
1022     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1023     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1024     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1025     if(flength != 0) {
1026       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1027     }
1028     if (qlength > 0) {
1029       if (qualifier instanceof ByteBuffer) {
1030         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1031       } else {
1032         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1033       }
1034     }
1035     pos = Bytes.putLong(bytes, pos, timestamp);
1036     pos = Bytes.putByte(bytes, pos, type.getCode());
1037     if (vlength > 0) {
1038       if (value instanceof ByteBuffer) {
1039         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1040       } else {
1041         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1042       }
1043     }
1044     // Add the tags after the value part
1045     if (tagsLength > 0) {
1046       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1047       for (Tag t : tags) {
1048         pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
1049       }
1050     }
1051     return bytes;
1052   }
1053 
1054   /**
1055    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1056    */
1057   @Override
1058   public boolean equals(Object other) {
1059     if (!(other instanceof Cell)) {
1060       return false;
1061     }
1062     return CellComparator.equals(this, (Cell)other);
1063   }
1064 
1065   @Override
1066   public int hashCode() {
1067     byte[] b = getBuffer();
1068     int start = getOffset(), end = getOffset() + getLength();
1069     int h = b[start++];
1070     for (int i = start; i < end; i++) {
1071       h = (h * 13) ^ b[i];
1072     }
1073     return h;
1074   }
1075 
1076   //---------------------------------------------------------------------------
1077   //
1078   //  KeyValue cloning
1079   //
1080   //---------------------------------------------------------------------------
1081 
1082   /**
1083    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1084    * @return Fully copied clone of this KeyValue
1085    * @throws CloneNotSupportedException
1086    */
1087   @Override
1088   public KeyValue clone() throws CloneNotSupportedException {
1089     super.clone();
1090     byte [] b = new byte[this.length];
1091     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1092     KeyValue ret = new KeyValue(b, 0, b.length);
1093     // Important to clone the memstoreTS as well - otherwise memstore's
1094     // update-in-place methods (eg increment) will end up creating
1095     // new entries
1096     ret.setSequenceId(seqId);
1097     return ret;
1098   }
1099 
1100   /**
1101    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1102    * http://en.wikipedia.org/wiki/Object_copy
1103    * @return Shallow copy of this KeyValue
1104    */
1105   public KeyValue shallowCopy() {
1106     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1107     shallowCopy.setSequenceId(this.seqId);
1108     return shallowCopy;
1109   }
1110 
1111   //---------------------------------------------------------------------------
1112   //
1113   //  String representation
1114   //
1115   //---------------------------------------------------------------------------
1116 
1117   public String toString() {
1118     if (this.bytes == null || this.bytes.length == 0) {
1119       return "empty";
1120     }
1121     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
1122       + getValueLength() + "/seqid=" + seqId;
1123   }
1124 
1125   /**
1126    * @param k Key portion of a KeyValue.
1127    * @return Key as a String, empty string if k is null. 
1128    */
1129   public static String keyToString(final byte [] k) {
1130     if (k == null) { 
1131       return "";
1132     }
1133     return keyToString(k, 0, k.length);
1134   }
1135 
1136   /**
1137    * Produces a string map for this key/value pair. Useful for programmatic use
1138    * and manipulation of the data stored in an HLogKey, for example, printing
1139    * as JSON. Values are left out due to their tendency to be large. If needed,
1140    * they can be added manually.
1141    *
1142    * @return the Map<String,?> containing data from this key
1143    */
1144   public Map<String, Object> toStringMap() {
1145     Map<String, Object> stringMap = new HashMap<String, Object>();
1146     stringMap.put("row", Bytes.toStringBinary(getRow()));
1147     stringMap.put("family", Bytes.toStringBinary(getFamily()));
1148     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
1149     stringMap.put("timestamp", getTimestamp());
1150     stringMap.put("vlen", getValueLength());
1151     List<Tag> tags = getTags();
1152     if (tags != null) {
1153       List<String> tagsString = new ArrayList<String>();
1154       for (Tag t : tags) {
1155         tagsString.add((t.getType()) + ":" +Bytes.toStringBinary(t.getValue()));
1156       }
1157       stringMap.put("tag", tagsString);
1158     }
1159     return stringMap;
1160   }
1161 
1162   /**
1163    * Use for logging.
1164    * @param b Key portion of a KeyValue.
1165    * @param o Offset to start of key
1166    * @param l Length of key.
1167    * @return Key as a String.
1168    */
1169   public static String keyToString(final byte [] b, final int o, final int l) {
1170     if (b == null) return "";
1171     int rowlength = Bytes.toShort(b, o);
1172     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1173     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1174     int familylength = b[columnoffset - 1];
1175     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1176     String family = familylength == 0? "":
1177       Bytes.toStringBinary(b, columnoffset, familylength);
1178     String qualifier = columnlength == 0? "":
1179       Bytes.toStringBinary(b, columnoffset + familylength,
1180       columnlength - familylength);
1181     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1182     String timestampStr = humanReadableTimestamp(timestamp);
1183     byte type = b[o + l - 1];
1184     return row + "/" + family +
1185       (family != null && family.length() > 0? ":" :"") +
1186       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1187   }
1188 
1189   public static String humanReadableTimestamp(final long timestamp) {
1190     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1191       return "LATEST_TIMESTAMP";
1192     }
1193     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1194       return "OLDEST_TIMESTAMP";
1195     }
1196     return String.valueOf(timestamp);
1197   }
1198 
1199   //---------------------------------------------------------------------------
1200   //
1201   //  Public Member Accessors
1202   //
1203   //---------------------------------------------------------------------------
1204 
1205   /**
1206    * @return The byte array backing this KeyValue.
1207    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1208    */
1209   @Deprecated
1210   public byte [] getBuffer() {
1211     return this.bytes;
1212   }
1213 
1214   /**
1215    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1216    */
1217   public int getOffset() {
1218     return this.offset;
1219   }
1220 
1221   /**
1222    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1223    */
1224   public int getLength() {
1225     return length;
1226   }
1227 
1228   //---------------------------------------------------------------------------
1229   //
1230   //  Length and Offset Calculators
1231   //
1232   //---------------------------------------------------------------------------
1233 
1234   /**
1235    * Determines the total length of the KeyValue stored in the specified
1236    * byte array and offset.  Includes all headers.
1237    * @param bytes byte array
1238    * @param offset offset to start of the KeyValue
1239    * @return length of entire KeyValue, in bytes
1240    */
1241   private static int getLength(byte [] bytes, int offset) {
1242     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1243     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1244     return klength + vlength;
1245   }
1246 
1247   /**
1248    * @return Key offset in backing buffer..
1249    */
1250   public int getKeyOffset() {
1251     return this.offset + ROW_OFFSET;
1252   }
1253 
1254   public String getKeyString() {
1255     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1256   }
1257 
1258   /**
1259    * @return Length of key portion.
1260    */
1261   public int getKeyLength() {
1262     return Bytes.toInt(this.bytes, this.offset);
1263   }
1264 
1265   /**
1266    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1267    */
1268   @Override
1269   public byte[] getValueArray() {
1270     return bytes;
1271   }
1272 
1273   /**
1274    * @return the value offset
1275    */
1276   @Override
1277   public int getValueOffset() {
1278     int voffset = getKeyOffset() + getKeyLength();
1279     return voffset;
1280   }
1281 
1282   /**
1283    * @return Value length
1284    */
1285   @Override
1286   public int getValueLength() {
1287     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1288     return vlength;
1289   }
1290 
1291   /**
1292    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1293    */
1294   @Override
1295   public byte[] getRowArray() {
1296     return bytes;
1297   }
1298 
1299   /**
1300    * @return Row offset
1301    */
1302   @Override
1303   public int getRowOffset() {
1304     return getKeyOffset() + Bytes.SIZEOF_SHORT;
1305   }
1306 
1307   /**
1308    * @return Row length
1309    */
1310   @Override
1311   public short getRowLength() {
1312     return Bytes.toShort(this.bytes, getKeyOffset());
1313   }
1314 
1315   /**
1316    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1317    */
1318   @Override
1319   public byte[] getFamilyArray() {
1320     return bytes;
1321   }
1322 
1323   /**
1324    * @return Family offset
1325    */
1326   @Override
1327   public int getFamilyOffset() {
1328     return getFamilyOffset(getRowLength());
1329   }
1330 
1331   /**
1332    * @return Family offset
1333    */
1334   private int getFamilyOffset(int rlength) {
1335     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
1336   }
1337 
1338   /**
1339    * @return Family length
1340    */
1341   @Override
1342   public byte getFamilyLength() {
1343     return getFamilyLength(getFamilyOffset());
1344   }
1345 
1346   /**
1347    * @return Family length
1348    */
1349   public byte getFamilyLength(int foffset) {
1350     return this.bytes[foffset-1];
1351   }
1352 
1353   /**
1354    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1355    */
1356   @Override
1357   public byte[] getQualifierArray() {
1358     return bytes;
1359   }
1360 
1361   /**
1362    * @return Qualifier offset
1363    */
1364   @Override
1365   public int getQualifierOffset() {
1366     return getQualifierOffset(getFamilyOffset());
1367   }
1368 
1369   /**
1370    * @return Qualifier offset
1371    */
1372   private int getQualifierOffset(int foffset) {
1373     return foffset + getFamilyLength(foffset);
1374   }
1375 
1376   /**
1377    * @return Qualifier length
1378    */
1379   @Override
1380   public int getQualifierLength() {
1381     return getQualifierLength(getRowLength(),getFamilyLength());
1382   }
1383 
1384   /**
1385    * @return Qualifier length
1386    */
1387   private int getQualifierLength(int rlength, int flength) {
1388     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1389   }
1390 
1391   /**
1392    * @return Timestamp offset
1393    */
1394   public int getTimestampOffset() {
1395     return getTimestampOffset(getKeyLength());
1396   }
1397 
1398   /**
1399    * @param keylength Pass if you have it to save on a int creation.
1400    * @return Timestamp offset
1401    */
1402   private int getTimestampOffset(final int keylength) {
1403     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1404   }
1405 
1406   /**
1407    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1408    */
1409   public boolean isLatestTimestamp() {
1410     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1411       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1412   }
1413 
1414   /**
1415    * @param now Time to set into <code>this</code> IFF timestamp ==
1416    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1417    * @return True is we modified this.
1418    */
1419   public boolean updateLatestStamp(final byte [] now) {
1420     if (this.isLatestTimestamp()) {
1421       int tsOffset = getTimestampOffset();
1422       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1423       // clear cache or else getTimestamp() possibly returns an old value
1424       return true;
1425     }
1426     return false;
1427   }
1428 
1429   @Override
1430   public void setTimestamp(long ts) {
1431     Bytes.putBytes(this.bytes, this.getTimestampOffset(), Bytes.toBytes(ts), 0, Bytes.SIZEOF_LONG);
1432   }
1433 
1434   @Override
1435   public void setTimestamp(byte[] ts, int tsOffset) {
1436     Bytes.putBytes(this.bytes, this.getTimestampOffset(), ts, tsOffset, Bytes.SIZEOF_LONG);
1437   }
1438 
1439   //---------------------------------------------------------------------------
1440   //
1441   //  Methods that return copies of fields
1442   //
1443   //---------------------------------------------------------------------------
1444 
1445   /**
1446    * Do not use unless you have to.  Used internally for compacting and testing.
1447    *
1448    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
1449    * {@link #getValue()} if accessing a KeyValue client-side.
1450    * @return Copy of the key portion only.
1451    */
1452   public byte [] getKey() {
1453     int keylength = getKeyLength();
1454     byte [] key = new byte[keylength];
1455     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1456     return key;
1457   }
1458 
1459   /**
1460    * Returns value in a new byte array.
1461    * Primarily for use client-side. If server-side, use
1462    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1463    * save on allocations.
1464    * @return Value in a new byte array.
1465    */
1466   @Deprecated // use CellUtil.getValueArray()
1467   public byte [] getValue() {
1468     return CellUtil.cloneValue(this);
1469   }
1470 
1471   /**
1472    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1473    * byte array.<p>
1474    *
1475    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1476    * lengths instead.
1477    * @return Row in a new byte array.
1478    */
1479   @Deprecated // use CellUtil.getRowArray()
1480   public byte [] getRow() {
1481     return CellUtil.cloneRow(this);
1482   }
1483 
1484   /**
1485    *
1486    * @return Timestamp
1487    */
1488   @Override
1489   public long getTimestamp() {
1490     return getTimestamp(getKeyLength());
1491   }
1492 
1493   /**
1494    * @param keylength Pass if you have it to save on a int creation.
1495    * @return Timestamp
1496    */
1497   long getTimestamp(final int keylength) {
1498     int tsOffset = getTimestampOffset(keylength);
1499     return Bytes.toLong(this.bytes, tsOffset);
1500   }
1501 
1502   /**
1503    * @return Type of this KeyValue.
1504    */
1505   @Deprecated
1506   public byte getType() {
1507     return getTypeByte();
1508   }
1509 
1510   /**
1511    * @return KeyValue.TYPE byte representation
1512    */
1513   @Override
1514   public byte getTypeByte() {
1515     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1516   }
1517 
1518   /**
1519    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1520    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1521    * KeyValue type.
1522    */
1523   @Deprecated // use CellUtil#isDelete
1524   public boolean isDelete() {
1525     return KeyValue.isDelete(getType());
1526   }
1527 
1528   /**
1529    * Primarily for use client-side.  Returns the family of this KeyValue in a
1530    * new byte array.<p>
1531    *
1532    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1533    * lengths instead.
1534    * @return Returns family. Makes a copy.
1535    */
1536   @Deprecated // use CellUtil.getFamilyArray
1537   public byte [] getFamily() {
1538     return CellUtil.cloneFamily(this);
1539   }
1540 
1541   /**
1542    * Primarily for use client-side.  Returns the column qualifier of this
1543    * KeyValue in a new byte array.<p>
1544    *
1545    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1546    * lengths instead.
1547    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1548    * @return Returns qualifier. Makes a copy.
1549    */
1550   @Deprecated // use CellUtil.getQualifierArray
1551   public byte [] getQualifier() {
1552     return CellUtil.cloneQualifier(this);
1553   }
1554 
1555   /**
1556    * This returns the offset where the tag actually starts.
1557    */
1558   @Override
1559   public int getTagsOffset() {
1560     int tagsLen = getTagsLength();
1561     if (tagsLen == 0) {
1562       return this.offset + this.length;
1563     }
1564     return this.offset + this.length - tagsLen;
1565   }
1566 
1567   /**
1568    * This returns the total length of the tag bytes
1569    */
1570   @Override
1571   public int getTagsLength() {
1572     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1573     if (tagsLen > 0) {
1574       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1575       // length
1576       tagsLen -= TAGS_LENGTH_SIZE;
1577     }
1578     return tagsLen;
1579   }
1580 
1581   /**
1582    * Returns any tags embedded in the KeyValue.  Used in testcases.
1583    * @return The tags
1584    */
1585   public List<Tag> getTags() {
1586     int tagsLength = getTagsLength();
1587     if (tagsLength == 0) {
1588       return EMPTY_ARRAY_LIST;
1589     }
1590     return Tag.asList(getTagsArray(), getTagsOffset(), tagsLength);
1591   }
1592 
1593   /**
1594    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1595    */
1596   @Override
1597   public byte[] getTagsArray() {
1598     return bytes;
1599   }
1600 
1601   /**
1602    * Creates a new KeyValue that only contains the key portion (the value is
1603    * set to be null).
1604    *
1605    * TODO only used by KeyOnlyFilter -- move there.
1606    * @param lenAsVal replace value with the actual value length (false=empty)
1607    */
1608   public KeyValue createKeyOnly(boolean lenAsVal) {
1609     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1610     // Rebuild as: <keylen:4><0:4><key:keylen>
1611     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1612     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1613     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1614         Math.min(newBuffer.length,this.length));
1615     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1616     if (lenAsVal) {
1617       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1618     }
1619     return new KeyValue(newBuffer);
1620   }
1621 
1622   /**
1623    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1624    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1625    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1626    * <p>
1627    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1628    * </p>
1629    * <p>
1630    * Not recommend to be used as this is old-style API.
1631    * </p>
1632    * @param c The column.
1633    * @return The parsed column.
1634    */
1635   public static byte [][] parseColumn(byte [] c) {
1636     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1637     if (index == -1) {
1638       // If no delimiter, return array of size 1
1639       return new byte [][] { c };
1640     } else if(index == c.length - 1) {
1641       // family with empty qualifier, return array size 2
1642       byte [] family = new byte[c.length-1];
1643       System.arraycopy(c, 0, family, 0, family.length);
1644       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1645     }
1646     // Family and column, return array size 2
1647     final byte [][] result = new byte [2][];
1648     result[0] = new byte [index];
1649     System.arraycopy(c, 0, result[0], 0, index);
1650     final int len = c.length - (index + 1);
1651     result[1] = new byte[len];
1652     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1653     return result;
1654   }
1655 
1656   /**
1657    * Makes a column in family:qualifier form from separate byte arrays.
1658    * <p>
1659    * Not recommended for usage as this is old-style API.
1660    * @param family
1661    * @param qualifier
1662    * @return family:qualifier
1663    */
1664   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1665     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1666   }
1667 
1668   /**
1669    * @param b
1670    * @param delimiter
1671    * @return Index of delimiter having started from start of <code>b</code>
1672    * moving rightward.
1673    */
1674   public static int getDelimiter(final byte [] b, int offset, final int length,
1675       final int delimiter) {
1676     if (b == null) {
1677       throw new IllegalArgumentException("Passed buffer is null");
1678     }
1679     int result = -1;
1680     for (int i = offset; i < length + offset; i++) {
1681       if (b[i] == delimiter) {
1682         result = i;
1683         break;
1684       }
1685     }
1686     return result;
1687   }
1688 
1689   /**
1690    * Find index of passed delimiter walking from end of buffer backwards.
1691    * @param b
1692    * @param delimiter
1693    * @return Index of delimiter
1694    */
1695   public static int getDelimiterInReverse(final byte [] b, final int offset,
1696       final int length, final int delimiter) {
1697     if (b == null) {
1698       throw new IllegalArgumentException("Passed buffer is null");
1699     }
1700     int result = -1;
1701     for (int i = (offset + length) - 1; i >= offset; i--) {
1702       if (b[i] == delimiter) {
1703         result = i;
1704         break;
1705       }
1706     }
1707     return result;
1708   }
1709 
1710   /**
1711    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1712    * {@link KeyValue}s.
1713    */
1714   public static class MetaComparator extends KVComparator {
1715     /**
1716      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1717      * table.
1718      */
1719     @Override
1720     public int compare(final Cell left, final Cell right) {
1721       int c = compareRowKey(left, right);
1722       if (c != 0) {
1723         return c;
1724       }
1725       return CellComparator.compareWithoutRow(left, right);
1726     }
1727 
1728     @Override
1729     public int compareOnlyKeyPortion(Cell left, Cell right) {
1730       return compare(left, right);
1731     }
1732 
1733     @Override
1734     public int compareRows(byte [] left, int loffset, int llength,
1735         byte [] right, int roffset, int rlength) {
1736       int leftDelimiter = getDelimiter(left, loffset, llength,
1737           HConstants.DELIMITER);
1738       int rightDelimiter = getDelimiter(right, roffset, rlength,
1739           HConstants.DELIMITER);
1740       // Compare up to the delimiter
1741       int lpart = (leftDelimiter < 0 ? llength :leftDelimiter - loffset);
1742       int rpart = (rightDelimiter < 0 ? rlength :rightDelimiter - roffset);
1743       int result = Bytes.compareTo(left, loffset, lpart, right, roffset, rpart);
1744       if (result != 0) {
1745         return result;
1746       } else {
1747         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1748           return -1;
1749         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1750           return 1;
1751         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1752           return 0;
1753         }
1754       }
1755       // Compare middle bit of the row.
1756       // Move past delimiter
1757       leftDelimiter++;
1758       rightDelimiter++;
1759       int leftFarDelimiter = getDelimiterInReverse(left, leftDelimiter,
1760           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1761       int rightFarDelimiter = getDelimiterInReverse(right,
1762           rightDelimiter, rlength - (rightDelimiter - roffset),
1763           HConstants.DELIMITER);
1764       // Now compare middlesection of row.
1765       lpart = (leftFarDelimiter < 0 ? llength + loffset: leftFarDelimiter) - leftDelimiter;
1766       rpart = (rightFarDelimiter < 0 ? rlength + roffset: rightFarDelimiter)- rightDelimiter;
1767       result = super.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart);
1768       if (result != 0) {
1769         return result;
1770       }  else {
1771         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1772           return -1;
1773         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1774           return 1;
1775         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1776           return 0;
1777         }
1778       }
1779       // Compare last part of row, the rowid.
1780       leftFarDelimiter++;
1781       rightFarDelimiter++;
1782       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1783           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1784       return result;
1785     }
1786 
1787     /**
1788      * Don't do any fancy Block Index splitting tricks.
1789      */
1790     @Override
1791     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1792       return Arrays.copyOf(rightKey, rightKey.length);
1793     }
1794 
1795     /**
1796      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1797      * instantiate the appropriate comparator.
1798      * TODO: With V3 consider removing this.
1799      * @return legacy class name for FileFileTrailer#comparatorClassName
1800      */
1801     @Override
1802     public String getLegacyKeyComparatorName() {
1803       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1804     }
1805 
1806     @Override
1807     protected Object clone() throws CloneNotSupportedException {
1808       return new MetaComparator();
1809     }
1810 
1811     /**
1812      * Override the row key comparison to parse and compare the meta row key parts.
1813      */
1814     @Override
1815     protected int compareRowKey(final Cell l, final Cell r) {
1816       byte[] left = l.getRowArray();
1817       int loffset = l.getRowOffset();
1818       int llength = l.getRowLength();
1819       byte[] right = r.getRowArray();
1820       int roffset = r.getRowOffset();
1821       int rlength = r.getRowLength();
1822       return compareRows(left, loffset, llength, right, roffset, rlength);
1823     }
1824   }
1825 
1826   /**
1827    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1828    * portion.  This means two KeyValues with same Key but different Values are
1829    * considered the same as far as this Comparator is concerned.
1830    */
1831   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1832 
1833     /**
1834      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1835      * instantiate the appropriate comparator.
1836      * TODO: With V3 consider removing this.
1837      * @return legacy class name for FileFileTrailer#comparatorClassName
1838      */
1839     public String getLegacyKeyComparatorName() {
1840       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1841     }
1842 
1843     @Override // RawComparator
1844     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1845       return compareFlatKey(l,loff,llen, r,roff,rlen);
1846     }
1847 
1848     
1849     /**
1850      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1851      * @param left
1852      * @param right
1853      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1854      */
1855     protected int compareRowKey(final Cell left, final Cell right) {
1856       return CellComparator.compareRows(left, right);
1857     }
1858 
1859     /**
1860      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1861      * full KVs laid out in a flat byte[]s.
1862      * @param left
1863      * @param loffset
1864      * @param llength
1865      * @param right
1866      * @param roffset
1867      * @param rlength
1868      * @return  0 if equal, <0 if left smaller, >0 if right smaller
1869      */
1870     public int compareFlatKey(byte[] left, int loffset, int llength,
1871         byte[] right, int roffset, int rlength) {
1872       // Compare row
1873       short lrowlength = Bytes.toShort(left, loffset);
1874       short rrowlength = Bytes.toShort(right, roffset);
1875       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1876           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1877       if (compare != 0) {
1878         return compare;
1879       }
1880 
1881       // Compare the rest of the two KVs without making any assumptions about
1882       // the common prefix. This function will not compare rows anyway, so we
1883       // don't need to tell it that the common prefix includes the row.
1884       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1885           rlength, rrowlength);
1886     }
1887 
1888     public int compareFlatKey(byte[] left, byte[] right) {
1889       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1890     }
1891 
1892     public int compareOnlyKeyPortion(Cell left, Cell right) {
1893       return CellComparator.compareStatic(left, right, true);
1894     }
1895 
1896     /**
1897      * Compares the Key of a cell -- with fields being more significant in this order:
1898      * rowkey, colfam/qual, timestamp, type, mvcc
1899      */
1900     @Override
1901     public int compare(final Cell left, final Cell right) {
1902       int compare = CellComparator.compareStatic(left, right, false);
1903       return compare;
1904     }
1905 
1906     public int compareTimestamps(final Cell left, final Cell right) {
1907       return CellComparator.compareTimestamps(left, right);
1908     }
1909 
1910     /**
1911      * @param left
1912      * @param right
1913      * @return Result comparing rows.
1914      */
1915     public int compareRows(final Cell left, final Cell right) {
1916       return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(),
1917       right.getRowArray(), right.getRowOffset(), right.getRowLength());
1918     }
1919 
1920     /**
1921      * Get the b[],o,l for left and right rowkey portions and compare.
1922      * @param left
1923      * @param loffset
1924      * @param llength
1925      * @param right
1926      * @param roffset
1927      * @param rlength
1928      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1929      */
1930     public int compareRows(byte [] left, int loffset, int llength,
1931         byte [] right, int roffset, int rlength) {
1932       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
1933     }
1934 
1935     int compareColumns(final Cell left, final short lrowlength, final Cell right,
1936         final short rrowlength) {
1937       return CellComparator.compareColumns(left, right);
1938     }
1939 
1940     protected int compareColumns(
1941         byte [] left, int loffset, int llength, final int lfamilylength,
1942         byte [] right, int roffset, int rlength, final int rfamilylength) {
1943       // Compare family portion first.
1944       int diff = Bytes.compareTo(left, loffset, lfamilylength,
1945         right, roffset, rfamilylength);
1946       if (diff != 0) {
1947         return diff;
1948       }
1949       // Compare qualifier portion
1950       return Bytes.compareTo(left, loffset + lfamilylength,
1951         llength - lfamilylength,
1952         right, roffset + rfamilylength, rlength - rfamilylength);
1953       }
1954 
1955     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
1956       // The below older timestamps sorting ahead of newer timestamps looks
1957       // wrong but it is intentional. This way, newer timestamps are first
1958       // found when we iterate over a memstore and newer versions are the
1959       // first we trip over when reading from a store file.
1960       if (ltimestamp < rtimestamp) {
1961         return 1;
1962       } else if (ltimestamp > rtimestamp) {
1963         return -1;
1964       }
1965       return 0;
1966     }
1967 
1968     /**
1969      * Overridden
1970      * @param commonPrefix
1971      * @param left
1972      * @param loffset
1973      * @param llength
1974      * @param right
1975      * @param roffset
1976      * @param rlength
1977      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1978      */
1979     @Override // SamePrefixComparator
1980     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
1981         int loffset, int llength, byte[] right, int roffset, int rlength) {
1982       // Compare row
1983       short lrowlength = Bytes.toShort(left, loffset);
1984       short rrowlength;
1985 
1986       int comparisonResult = 0;
1987       if (commonPrefix < ROW_LENGTH_SIZE) {
1988         // almost nothing in common
1989         rrowlength = Bytes.toShort(right, roffset);
1990         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
1991             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
1992       } else { // the row length is the same
1993         rrowlength = lrowlength;
1994         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
1995           // The rows are not the same. Exclude the common prefix and compare
1996           // the rest of the two rows.
1997           int common = commonPrefix - ROW_LENGTH_SIZE;
1998           comparisonResult = compareRows(
1999               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2000               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2001         }
2002       }
2003       if (comparisonResult != 0) {
2004         return comparisonResult;
2005       }
2006 
2007       assert lrowlength == rrowlength;
2008       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2009           roffset, rlength, lrowlength);
2010     }
2011 
2012     /**
2013      * Compare columnFamily, qualifier, timestamp, and key type (everything
2014      * except the row). This method is used both in the normal comparator and
2015      * the "same-prefix" comparator. Note that we are assuming that row portions
2016      * of both KVs have already been parsed and found identical, and we don't
2017      * validate that assumption here.
2018      * @param commonPrefix
2019      *          the length of the common prefix of the two key-values being
2020      *          compared, including row length and row
2021      */
2022     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2023         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2024       /***
2025        * KeyValue Format and commonLength:
2026        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2027        * ------------------|-------commonLength--------|--------------
2028        */
2029       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2030 
2031       // commonLength + TIMESTAMP_TYPE_SIZE
2032       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2033       // ColumnFamily + Qualifier length.
2034       int lcolumnlength = llength - commonLengthWithTSAndType;
2035       int rcolumnlength = rlength - commonLengthWithTSAndType;
2036 
2037       byte ltype = left[loffset + (llength - 1)];
2038       byte rtype = right[roffset + (rlength - 1)];
2039 
2040       // If the column is not specified, the "minimum" key type appears the
2041       // latest in the sorted order, regardless of the timestamp. This is used
2042       // for specifying the last key/value in a given row, because there is no
2043       // "lexicographically last column" (it would be infinitely long). The
2044       // "maximum" key type does not need this behavior.
2045       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2046         // left is "bigger", i.e. it appears later in the sorted order
2047         return 1;
2048       }
2049       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2050         return -1;
2051       }
2052 
2053       int lfamilyoffset = commonLength + loffset;
2054       int rfamilyoffset = commonLength + roffset;
2055 
2056       // Column family length.
2057       int lfamilylength = left[lfamilyoffset - 1];
2058       int rfamilylength = right[rfamilyoffset - 1];
2059       // If left family size is not equal to right family size, we need not
2060       // compare the qualifiers.
2061       boolean sameFamilySize = (lfamilylength == rfamilylength);
2062       int common = 0;
2063       if (commonPrefix > 0) {
2064         common = Math.max(0, commonPrefix - commonLength);
2065         if (!sameFamilySize) {
2066           // Common should not be larger than Math.min(lfamilylength,
2067           // rfamilylength).
2068           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2069         } else {
2070           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2071         }
2072       }
2073       if (!sameFamilySize) {
2074         // comparing column family is enough.
2075         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2076             - common, right, rfamilyoffset + common, rfamilylength - common);
2077       }
2078       // Compare family & qualifier together.
2079       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2080           lcolumnlength - common, right, rfamilyoffset + common,
2081           rcolumnlength - common);
2082       if (comparison != 0) {
2083         return comparison;
2084       }
2085 
2086       ////
2087       // Next compare timestamps.
2088       long ltimestamp = Bytes.toLong(left,
2089           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2090       long rtimestamp = Bytes.toLong(right,
2091           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2092       int compare = compareTimestamps(ltimestamp, rtimestamp);
2093       if (compare != 0) {
2094         return compare;
2095       }
2096 
2097       // Compare types. Let the delete types sort ahead of puts; i.e. types
2098       // of higher numbers sort before those of lesser numbers. Maximum (255)
2099       // appears ahead of everything, and minimum (0) appears after
2100       // everything.
2101       return (0xff & rtype) - (0xff & ltype);
2102     }
2103 
2104     protected int compareFamilies(final byte[] left, final int loffset, final int lfamilylength,
2105         final byte[] right, final int roffset, final int rfamilylength) {
2106       int diff = Bytes.compareTo(left, loffset, lfamilylength, right, roffset, rfamilylength);
2107       return diff;
2108     }
2109 
2110     protected int compareColumns(final byte[] left, final int loffset, final int lquallength,
2111         final byte[] right, final int roffset, final int rquallength) {
2112       int diff = Bytes.compareTo(left, loffset, lquallength, right, roffset, rquallength);
2113       return diff;
2114     }
2115     /**
2116      * Compares the row and column of two keyvalues for equality
2117      * @param left
2118      * @param right
2119      * @return True if same row and column.
2120      */
2121     public boolean matchingRowColumn(final Cell left,
2122         final Cell right) {
2123       short lrowlength = left.getRowLength();
2124       short rrowlength = right.getRowLength();
2125 
2126       // TsOffset = end of column data. just comparing Row+CF length of each
2127       if ((left.getRowLength() + left.getFamilyLength() + left.getQualifierLength()) != (right
2128           .getRowLength() + right.getFamilyLength() + right.getQualifierLength())) {
2129         return false;
2130       }
2131 
2132       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2133         return false;
2134       }
2135 
2136       int lfoffset = left.getFamilyOffset();
2137       int rfoffset = right.getFamilyOffset();
2138       int lclength = left.getQualifierLength();
2139       int rclength = right.getQualifierLength();
2140       int lfamilylength = left.getFamilyLength();
2141       int rfamilylength = right.getFamilyLength();
2142       int diff = compareFamilies(left.getFamilyArray(), lfoffset, lfamilylength,
2143           right.getFamilyArray(), rfoffset, rfamilylength);
2144       if (diff != 0) {
2145         return false;
2146       } else {
2147         diff = compareColumns(left.getQualifierArray(), left.getQualifierOffset(), lclength,
2148             right.getQualifierArray(), right.getQualifierOffset(), rclength);
2149         return diff == 0;
2150       }
2151     }
2152 
2153     /**
2154      * Compares the row of two keyvalues for equality
2155      * @param left
2156      * @param right
2157      * @return True if rows match.
2158      */
2159     public boolean matchingRows(final Cell left, final Cell right) {
2160       short lrowlength = left.getRowLength();
2161       short rrowlength = right.getRowLength();
2162       return matchingRows(left, lrowlength, right, rrowlength);
2163     }
2164 
2165     /**
2166      * @param left
2167      * @param lrowlength
2168      * @param right
2169      * @param rrowlength
2170      * @return True if rows match.
2171      */
2172     private boolean matchingRows(final Cell left, final short lrowlength,
2173         final Cell right, final short rrowlength) {
2174       return lrowlength == rrowlength &&
2175           matchingRows(left.getRowArray(), left.getRowOffset(), lrowlength,
2176               right.getRowArray(), right.getRowOffset(), rrowlength);
2177     }
2178 
2179     /**
2180      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2181      * @param left Left row array.
2182      * @param loffset Left row offset.
2183      * @param llength Left row length.
2184      * @param right Right row array.
2185      * @param roffset Right row offset.
2186      * @param rlength Right row length.
2187      * @return Whether rows are the same row.
2188      */
2189     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2190         final byte [] right, final int roffset, final int rlength) {
2191       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2192     }
2193 
2194     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2195       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2196       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2197         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2198             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2199             + Bytes.toStringBinary(firstKeyInBlock));
2200         return firstKeyInBlock;
2201       }
2202       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2203         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2204             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2205             Bytes.toStringBinary(fakeKey));
2206         return firstKeyInBlock;
2207       }
2208       return fakeKey;
2209     }
2210 
2211     /**
2212      * This is a HFile block index key optimization.
2213      * @param leftKey
2214      * @param rightKey
2215      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2216      */
2217     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2218       if (rightKey == null) {
2219         throw new IllegalArgumentException("rightKey can not be null");
2220       }
2221       if (leftKey == null) {
2222         return Arrays.copyOf(rightKey, rightKey.length);
2223       }
2224       if (compareFlatKey(leftKey, rightKey) >= 0) {
2225         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2226           + ", rightKey:" + Bytes.toString(rightKey));
2227       }
2228 
2229       short leftRowLength = Bytes.toShort(leftKey, 0);
2230       short rightRowLength = Bytes.toShort(rightKey, 0);
2231       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2232       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2233       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2234       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2235       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2236       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2237       // rows are equal
2238       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2239         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2240         // Compare family & qualifier together.
2241         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2242           rightCommonLength, rightColumnLength);
2243         // same with "row + family + qualifier", return rightKey directly
2244         if (comparison == 0) {
2245           return Arrays.copyOf(rightKey, rightKey.length);
2246         }
2247         // "family + qualifier" are different, generate a faked key per rightKey
2248         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2249         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2250         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2251         return newKey;
2252       }
2253       // rows are different
2254       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2255       short diffIdx = 0;
2256       while (diffIdx < minLength
2257           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2258         diffIdx++;
2259       }
2260       byte[] newRowKey = null;
2261       if (diffIdx >= minLength) {
2262         // leftKey's row is prefix of rightKey's.
2263         newRowKey = new byte[diffIdx + 1];
2264         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2265       } else {
2266         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2267         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2268             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2269           newRowKey = new byte[diffIdx + 1];
2270           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2271           newRowKey[diffIdx] = (byte) (diffByte + 1);
2272         } else {
2273           newRowKey = new byte[diffIdx + 1];
2274           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2275         }
2276       }
2277       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2278         Type.Maximum).getKey();
2279     }
2280 
2281     @Override
2282     protected Object clone() throws CloneNotSupportedException {
2283       super.clone();
2284       return new KVComparator();
2285     }
2286 
2287   }
2288 
2289   /**
2290    * @param b
2291    * @return A KeyValue made of a byte array that holds the key-only part.
2292    * Needed to convert hfile index members to KeyValues.
2293    */
2294   public static KeyValue createKeyValueFromKey(final byte [] b) {
2295     return createKeyValueFromKey(b, 0, b.length);
2296   }
2297 
2298   /**
2299    * @param bb
2300    * @return A KeyValue made of a byte buffer that holds the key-only part.
2301    * Needed to convert hfile index members to KeyValues.
2302    */
2303   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2304     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2305   }
2306 
2307   /**
2308    * @param b
2309    * @param o
2310    * @param l
2311    * @return A KeyValue made of a byte array that holds the key-only part.
2312    * Needed to convert hfile index members to KeyValues.
2313    */
2314   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2315       final int l) {
2316     byte [] newb = new byte[l + ROW_OFFSET];
2317     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2318     Bytes.putInt(newb, 0, l);
2319     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2320     return new KeyValue(newb);
2321   }
2322 
2323   /**
2324    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2325    * backing bytes copied from the steam.
2326    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2327    * of zero, we will return null which can be useful marking a stream as done.
2328    * @throws IOException
2329    */
2330   public static KeyValue create(final DataInput in) throws IOException {
2331     return create(in.readInt(), in);
2332   }
2333 
2334   /**
2335    * Create a KeyValue reading <code>length</code> from <code>in</code>
2336    * @param length
2337    * @param in
2338    * @return Created KeyValue OR if we find a length of zero, we will return null which
2339    * can be useful marking a stream as done.
2340    * @throws IOException
2341    */
2342   public static KeyValue create(int length, final DataInput in) throws IOException {
2343 
2344     if (length <= 0) {
2345       if (length == 0) return null;
2346       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2347     }
2348 
2349     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2350     byte [] bytes = new byte[length];
2351     in.readFully(bytes);
2352     return new KeyValue(bytes, 0, length);
2353   }
2354   
2355   /**
2356    * Create a new KeyValue by copying existing cell and adding new tags
2357    * @param c
2358    * @param newTags
2359    * @return a new KeyValue instance with new tags
2360    */
2361   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2362     List<Tag> existingTags = null;
2363     if(c.getTagsLength() > 0) {
2364       existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
2365       existingTags.addAll(newTags);
2366     } else {
2367       existingTags = newTags;
2368     }
2369     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2370       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
2371       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
2372       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
2373       c.getValueLength(), existingTags);
2374   }
2375 
2376   /**
2377    * Create a KeyValue reading from the raw InputStream.
2378    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2379    * @param in
2380    * @return Created KeyValue OR if we find a length of zero, we will return null which
2381    * can be useful marking a stream as done.
2382    * @throws IOException
2383    */
2384   public static KeyValue iscreate(final InputStream in) throws IOException {
2385     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2386     int bytesRead = 0;
2387     while (bytesRead < intBytes.length) {
2388       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2389       if (n < 0) {
2390         if (bytesRead == 0) return null; // EOF at start is ok
2391         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2392       }
2393       bytesRead += n;
2394     }
2395     // TODO: perhaps some sanity check is needed here.
2396     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2397     IOUtils.readFully(in, bytes, 0, bytes.length);
2398     return new KeyValue(bytes, 0, bytes.length);
2399   }
2400 
2401   /**
2402    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2403    * @param kv
2404    * @param out
2405    * @return Length written on stream
2406    * @throws IOException
2407    * @see #create(DataInput) for the inverse function
2408    */
2409   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2410     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2411     // work for all implementations.
2412     int length = kv.getLength();
2413     out.writeInt(length);
2414     out.write(kv.getBuffer(), kv.getOffset(), length);
2415     return length + Bytes.SIZEOF_INT;
2416   }
2417 
2418   /**
2419    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2420    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2421    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2422    * @param kv
2423    * @param out
2424    * @return Length written on stream
2425    * @throws IOException
2426    * @see #create(DataInput) for the inverse function
2427    * @see #write(KeyValue, DataOutput)
2428    * @deprecated use {@link #oswrite(KeyValue, OutputStream, boolean)} instead
2429    */
2430   @Deprecated
2431   public static long oswrite(final KeyValue kv, final OutputStream out)
2432       throws IOException {
2433     int length = kv.getLength();
2434     // This does same as DataOuput#writeInt (big-endian, etc.)
2435     out.write(Bytes.toBytes(length));
2436     out.write(kv.getBuffer(), kv.getOffset(), length);
2437     return length + Bytes.SIZEOF_INT;
2438   }
2439 
2440   /**
2441    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2442    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2443    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2444    * @param kv
2445    * @param out
2446    * @param withTags
2447    * @return Length written on stream
2448    * @throws IOException
2449    * @see #create(DataInput) for the inverse function
2450    * @see #write(KeyValue, DataOutput)
2451    * @see KeyValueUtil#oswrite(Cell, OutputStream, boolean)
2452    */
2453   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2454       throws IOException {
2455     // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
2456     // check KeyValueUtil#oswrite also and do necessary changes.
2457     int length = kv.getLength();
2458     if (!withTags) {
2459       length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2460     }
2461     // This does same as DataOuput#writeInt (big-endian, etc.)
2462     StreamUtils.writeInt(out, length);
2463     out.write(kv.getBuffer(), kv.getOffset(), length);
2464     return length + Bytes.SIZEOF_INT;
2465   }
2466 
2467   /**
2468    * Comparator that compares row component only of a KeyValue.
2469    */
2470   public static class RowOnlyComparator implements Comparator<KeyValue> {
2471     final KVComparator comparator;
2472 
2473     public RowOnlyComparator(final KVComparator c) {
2474       this.comparator = c;
2475     }
2476 
2477     public int compare(KeyValue left, KeyValue right) {
2478       return comparator.compareRows(left, right);
2479     }
2480   }
2481 
2482 
2483   /**
2484    * Avoids redundant comparisons for better performance.
2485    * 
2486    * TODO get rid of this wart
2487    */
2488   public interface SamePrefixComparator<T> {
2489     /**
2490      * Compare two keys assuming that the first n bytes are the same.
2491      * @param commonPrefix How many bytes are the same.
2492      */
2493     int compareIgnoringPrefix(
2494       int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
2495     );
2496   }
2497 
2498   /**
2499    * This is a TEST only Comparator used in TestSeekTo and TestReseekTo.
2500    */
2501   public static class RawBytesComparator extends KVComparator {
2502     /**
2503      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2504      * instantiate the appropriate comparator.
2505      * TODO: With V3 consider removing this.
2506      * @return legacy class name for FileFileTrailer#comparatorClassName
2507      */
2508     public String getLegacyKeyComparatorName() {
2509       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2510     }
2511 
2512     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2513         int roffset, int rlength) {
2514       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2515     }
2516 
2517     @Override
2518     public int compare(Cell left, Cell right) {
2519       return compareOnlyKeyPortion(left, right);
2520     }
2521 
2522     @VisibleForTesting
2523     public int compareOnlyKeyPortion(Cell left, Cell right) {
2524       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
2525           left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
2526       if (c != 0) {
2527         return c;
2528       }
2529       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getFamilyArray(), left.getFamilyOffset(),
2530           left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
2531           right.getFamilyLength());
2532       if (c != 0) {
2533         return c;
2534       }
2535       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getQualifierArray(), left.getQualifierOffset(),
2536           left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
2537           right.getQualifierLength());
2538       if (c != 0) {
2539         return c;
2540       }
2541       c = compareTimestamps(left.getTimestamp(), right.getTimestamp());
2542       if (c != 0) {
2543         return c;
2544       }
2545       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
2546     }
2547 
2548     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2549       return firstKeyInBlock;
2550     }
2551 
2552   }
2553 
2554   /**
2555    * HeapSize implementation
2556    *
2557    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2558    * MemStore.
2559    */
2560   @Override
2561   public long heapSize() {
2562     int sum = 0;
2563     sum += ClassSize.OBJECT;// the KeyValue object itself
2564     sum += ClassSize.REFERENCE;// pointer to "bytes"
2565     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2566     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2567     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2568     sum += Bytes.SIZEOF_LONG;// memstoreTS
2569     return ClassSize.align(sum);
2570   }
2571 
2572   /**
2573    * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[]
2574    * Mainly used in places where we need to compare two cells.  Avoids copying of bytes
2575    * In places like block index keys, we need to compare the key byte[] with a cell.
2576    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
2577    */
2578   public static class KeyOnlyKeyValue extends KeyValue {
2579     private int length = 0;
2580     private int offset = 0;
2581     private byte[] b;
2582 
2583     public KeyOnlyKeyValue() {
2584 
2585     }
2586 
2587     public KeyOnlyKeyValue(byte[] b, int offset, int length) {
2588       this.b = b;
2589       this.length = length;
2590       this.offset = offset;
2591     }
2592 
2593     @Override
2594     public int getKeyOffset() {
2595       return this.offset;
2596     }
2597 
2598     /**
2599      * A setter that helps to avoid object creation every time and whenever
2600      * there is a need to create new KeyOnlyKeyValue.
2601      * @param key
2602      * @param offset
2603      * @param length
2604      */
2605     public void setKey(byte[] key, int offset, int length) {
2606       this.b = key;
2607       this.offset = offset;
2608       this.length = length;
2609     }
2610 
2611     @Override
2612     public byte[] getKey() {
2613       int keylength = getKeyLength();
2614       byte[] key = new byte[keylength];
2615       System.arraycopy(this.b, getKeyOffset(), key, 0, keylength);
2616       return key;
2617     }
2618 
2619     @Override
2620     public byte[] getRowArray() {
2621       return b;
2622     }
2623 
2624     @Override
2625     public int getRowOffset() {
2626       return getKeyOffset() + Bytes.SIZEOF_SHORT;
2627     }
2628 
2629     @Override
2630     public byte[] getFamilyArray() {
2631       return b;
2632     }
2633 
2634     @Override
2635     public byte getFamilyLength() {
2636       return this.b[getFamilyOffset() - 1];
2637     }
2638 
2639     @Override
2640     public int getFamilyOffset() {
2641       return this.offset + Bytes.SIZEOF_SHORT + getRowLength() + Bytes.SIZEOF_BYTE;
2642     }
2643 
2644     @Override
2645     public byte[] getQualifierArray() {
2646       return b;
2647     }
2648 
2649     @Override
2650     public int getQualifierLength() {
2651       return getQualifierLength(getRowLength(), getFamilyLength());
2652     }
2653 
2654     @Override
2655     public int getQualifierOffset() {
2656       return getFamilyOffset() + getFamilyLength();
2657     }
2658 
2659     @Override
2660     public int getKeyLength() {
2661       return length;
2662     }
2663 
2664     @Override
2665     public short getRowLength() {
2666       return Bytes.toShort(this.b, getKeyOffset());
2667     }
2668 
2669     @Override
2670     public byte getTypeByte() {
2671       return this.b[this.offset + getKeyLength() - 1];
2672     }
2673 
2674     private int getQualifierLength(int rlength, int flength) {
2675       return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
2676     }
2677 
2678     @Override
2679     public long getTimestamp() {
2680       int tsOffset = getTimestampOffset();
2681       return Bytes.toLong(this.b, tsOffset);
2682     }
2683 
2684     @Override
2685     public int getTimestampOffset() {
2686       return getKeyOffset() + getKeyLength() - TIMESTAMP_TYPE_SIZE;
2687     }
2688 
2689     @Override
2690     public byte[] getTagsArray() {
2691       return HConstants.EMPTY_BYTE_ARRAY;
2692     }
2693 
2694     @Override
2695     public int getTagsOffset() {
2696       return 0;
2697     }
2698 
2699     @Override
2700     public byte[] getValueArray() {
2701       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2702     }
2703 
2704     @Override
2705     public int getValueOffset() {
2706       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2707     }
2708 
2709     @Override
2710     public int getValueLength() {
2711       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2712     }
2713 
2714     @Override
2715     public int getTagsLength() {
2716       return 0;
2717     }
2718 
2719     @Override
2720     public String toString() {
2721       if (this.b == null || this.b.length == 0) {
2722         return "empty";
2723       }
2724       return keyToString(this.b, this.offset, getKeyLength()) + "/vlen=0/mvcc=0";
2725     }
2726 
2727     @Override
2728     public int hashCode() {
2729       return super.hashCode();
2730     }
2731 
2732     @Override
2733     public boolean equals(Object other) {
2734       return super.equals(other);
2735     }
2736   }
2737 }