View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.io.HeapSize;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.IOUtils;
44  import org.apache.hadoop.io.RawComparator;
45  
46  import com.google.common.annotations.VisibleForTesting;
47  
48  /**
49   * An HBase Key/Value. This is the fundamental HBase Type.  
50   * <p>
51   * HBase applications and users should use the Cell interface and avoid directly using KeyValue
52   * and member functions not defined in Cell.
53   * <p>
54   * If being used client-side, the primary methods to access individual fields are {@link #getRow()},
55   * {@link #getFamily()}, {@link #getQualifier()}, {@link #getTimestamp()}, and {@link #getValue()}.
56   * These methods allocate new byte arrays and return copies. Avoid their use server-side.
57   * <p>
58   * Instances of this class are immutable. They do not implement Comparable but Comparators are
59   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
60   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
61   * Hfiles, and bloom filter keys.
62   * <p>
63   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
64   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
65   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code> Key is further decomposed as:
66   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier>
67   * &lt;timestamp> &lt;keytype></code>
68   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>, column family length maximum
69   * is <code>Byte.MAX_SIZE</code>, and column qualifier + key length must be <
70   * <code>Integer.MAX_SIZE</code>. The column does not contain the family/qualifier delimiter,
71   * {@link #COLUMN_FAMILY_DELIMITER}<br>
72   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
73   * the value part. The format for this part is: <code>&lt;tagslength>&lt;tagsbytes></code>.
74   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
75   * contain one or more tags where as each tag is of the form
76   * <code>&lt;taglength>&lt;tagtype>&lt;tagbytes></code>.  <code>tagtype</code> is one byte and
77   * <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type length
78   * and actual tag bytes length.
79   */
80  @InterfaceAudience.Private
81  public class KeyValue implements Cell, HeapSize, Cloneable {
82    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
83  
84    static final Log LOG = LogFactory.getLog(KeyValue.class);
85  
86    /**
87     * Colon character in UTF-8
88     */
89    public static final char COLUMN_FAMILY_DELIMITER = ':';
90  
91    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
92      new byte[]{COLUMN_FAMILY_DELIMITER};
93  
94    /**
95     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
96     * of KeyValue only.
97     */
98    public static final KVComparator COMPARATOR = new KVComparator();
99    /**
100    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
101    * {@link KeyValue}s.
102    */
103   public static final KVComparator META_COMPARATOR = new MetaComparator();
104 
105   /**
106    * Needed for Bloom Filters.
107    */
108   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
109 
110   /** Size of the key length field in bytes*/
111   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
112 
113   /** Size of the key type field in bytes */
114   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
115 
116   /** Size of the row length field in bytes */
117   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
118 
119   /** Size of the family length field in bytes */
120   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
121 
122   /** Size of the timestamp field in bytes */
123   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
124 
125   // Size of the timestamp and type byte on end of a key -- a long + a byte.
126   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
127 
128   // Size of the length shorts and bytes in key.
129   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
130       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
131 
132   // How far into the key the row starts at. First thing to read is the short
133   // that says how long the row is.
134   public static final int ROW_OFFSET =
135     Bytes.SIZEOF_INT /*keylength*/ +
136     Bytes.SIZEOF_INT /*valuelength*/;
137 
138   // Size of the length ints in a KeyValue datastructure.
139   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
140 
141   /** Size of the tags length field in bytes */
142   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
143 
144   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
145 
146   private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
147 
148   /**
149    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
150    * characteristics would take up for its underlying data structure.
151    *
152    * @param rlength row length
153    * @param flength family length
154    * @param qlength qualifier length
155    * @param vlength value length
156    *
157    * @return the <code>KeyValue</code> data structure length
158    */
159   public static long getKeyValueDataStructureSize(int rlength,
160       int flength, int qlength, int vlength) {
161     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
162         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
163   }
164 
165   /**
166    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
167    * characteristics would take up for its underlying data structure.
168    *
169    * @param rlength row length
170    * @param flength family length
171    * @param qlength qualifier length
172    * @param vlength value length
173    * @param tagsLength total length of the tags
174    *
175    * @return the <code>KeyValue</code> data structure length
176    */
177   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
178       int vlength, int tagsLength) {
179     if (tagsLength == 0) {
180       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
181     }
182     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
183         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
184   }
185 
186   /**
187    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
188    * characteristics would take up for its underlying data structure.
189    *
190    * @param klength key length
191    * @param vlength value length
192    * @param tagsLength total length of the tags
193    *
194    * @return the <code>KeyValue</code> data structure length
195    */
196   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
197     if (tagsLength == 0) {
198       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
199     }
200     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
201   }
202 
203   /**
204    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
205    * characteristics would take up in its underlying data structure for the key.
206    *
207    * @param rlength row length
208    * @param flength family length
209    * @param qlength qualifier length
210    *
211    * @return the key data structure length
212    */
213   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
214     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
215   }
216 
217   /**
218    * Key type.
219    * Has space for other key types to be added later.  Cannot rely on
220    * enum ordinals . They change if item is removed or moved.  Do our own codes.
221    */
222   public static enum Type {
223     Minimum((byte)0),
224     Put((byte)4),
225 
226     Delete((byte)8),
227     DeleteFamilyVersion((byte)10),
228     DeleteColumn((byte)12),
229     DeleteFamily((byte)14),
230 
231     // Maximum is used when searching; you look from maximum on down.
232     Maximum((byte)255);
233 
234     private final byte code;
235 
236     Type(final byte c) {
237       this.code = c;
238     }
239 
240     public byte getCode() {
241       return this.code;
242     }
243 
244     /**
245      * Cannot rely on enum ordinals . They change if item is removed or moved.
246      * Do our own codes.
247      * @param b
248      * @return Type associated with passed code.
249      */
250     public static Type codeToType(final byte b) {
251       for (Type t : Type.values()) {
252         if (t.getCode() == b) {
253           return t;
254         }
255       }
256       throw new RuntimeException("Unknown code " + b);
257     }
258   }
259 
260   /**
261    * Lowest possible key.
262    * Makes a Key with highest possible Timestamp, empty row and column.  No
263    * key can be equal or lower than this one in memstore or in store file.
264    */
265   public static final KeyValue LOWESTKEY =
266     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
267 
268   ////
269   // KeyValue core instance fields.
270   private byte [] bytes = null;  // an immutable byte array that contains the KV
271   private int offset = 0;  // offset into bytes buffer KV starts at
272   private int length = 0;  // length of the KV starting from offset.
273 
274   /**
275    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
276    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
277    * KeyValue type.
278    */
279   public static boolean isDelete(byte t) {
280     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
281   }
282 
283   /** Here be dragons **/
284 
285   // used to achieve atomic operations in the memstore.
286   @Override
287   public long getMvccVersion() {
288     return this.getSequenceId();
289   }
290 
291   /**
292    * used to achieve atomic operations in the memstore.
293    */
294   @Override
295   public long getSequenceId() {
296     return seqId;
297   }
298 
299   public void setSequenceId(long seqId) {
300     this.seqId = seqId;
301   }
302 
303   // multi-version concurrency control version.  default value is 0, aka do not care.
304   private long seqId = 0;
305 
306   /** Dragon time over, return to normal business */
307 
308 
309   /** Writable Constructor -- DO NOT USE */
310   public KeyValue() {}
311 
312   /**
313    * Creates a KeyValue from the start of the specified byte array.
314    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
315    * @param bytes byte array
316    */
317   public KeyValue(final byte [] bytes) {
318     this(bytes, 0);
319   }
320 
321   /**
322    * Creates a KeyValue from the specified byte array and offset.
323    * Presumes <code>bytes</code> content starting at <code>offset</code> is
324    * formatted as a KeyValue blob.
325    * @param bytes byte array
326    * @param offset offset to start of KeyValue
327    */
328   public KeyValue(final byte [] bytes, final int offset) {
329     this(bytes, offset, getLength(bytes, offset));
330   }
331 
332   /**
333    * Creates a KeyValue from the specified byte array, starting at offset, and
334    * for length <code>length</code>.
335    * @param bytes byte array
336    * @param offset offset to start of the KeyValue
337    * @param length length of the KeyValue
338    */
339   public KeyValue(final byte [] bytes, final int offset, final int length) {
340     this.bytes = bytes;
341     this.offset = offset;
342     this.length = length;
343   }
344 
345   /**
346    * Creates a KeyValue from the specified byte array, starting at offset, and
347    * for length <code>length</code>.
348    *
349    * @param bytes  byte array
350    * @param offset offset to start of the KeyValue
351    * @param length length of the KeyValue
352    * @param ts
353    */
354   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
355     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
356   }
357 
358   /** Constructors that build a new backing byte array from fields */
359 
360   /**
361    * Constructs KeyValue structure filled with null value.
362    * Sets type to {@link KeyValue.Type#Maximum}
363    * @param row - row key (arbitrary byte array)
364    * @param timestamp
365    */
366   public KeyValue(final byte [] row, final long timestamp) {
367     this(row, null, null, timestamp, Type.Maximum, null);
368   }
369 
370   /**
371    * Constructs KeyValue structure filled with null value.
372    * @param row - row key (arbitrary byte array)
373    * @param timestamp
374    */
375   public KeyValue(final byte [] row, final long timestamp, Type type) {
376     this(row, null, null, timestamp, type, null);
377   }
378 
379   /**
380    * Constructs KeyValue structure filled with null value.
381    * Sets type to {@link KeyValue.Type#Maximum}
382    * @param row - row key (arbitrary byte array)
383    * @param family family name
384    * @param qualifier column qualifier
385    */
386   public KeyValue(final byte [] row, final byte [] family,
387       final byte [] qualifier) {
388     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
389   }
390 
391   /**
392    * Constructs KeyValue structure filled with null value.
393    * @param row - row key (arbitrary byte array)
394    * @param family family name
395    * @param qualifier column qualifier
396    */
397   public KeyValue(final byte [] row, final byte [] family,
398       final byte [] qualifier, final byte [] value) {
399     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
400   }
401 
402   /**
403    * Constructs KeyValue structure filled with specified values.
404    * @param row row key
405    * @param family family name
406    * @param qualifier column qualifier
407    * @param timestamp version timestamp
408    * @param type key type
409    * @throws IllegalArgumentException
410    */
411   public KeyValue(final byte[] row, final byte[] family,
412       final byte[] qualifier, final long timestamp, Type type) {
413     this(row, family, qualifier, timestamp, type, null);
414   }
415 
416   /**
417    * Constructs KeyValue structure filled with specified values.
418    * @param row row key
419    * @param family family name
420    * @param qualifier column qualifier
421    * @param timestamp version timestamp
422    * @param value column value
423    * @throws IllegalArgumentException
424    */
425   public KeyValue(final byte[] row, final byte[] family,
426       final byte[] qualifier, final long timestamp, final byte[] value) {
427     this(row, family, qualifier, timestamp, Type.Put, value);
428   }
429 
430   /**
431    * Constructs KeyValue structure filled with specified values.
432    * @param row row key
433    * @param family family name
434    * @param qualifier column qualifier
435    * @param timestamp version timestamp
436    * @param value column value
437    * @param tags tags
438    * @throws IllegalArgumentException
439    */
440   public KeyValue(final byte[] row, final byte[] family,
441       final byte[] qualifier, final long timestamp, final byte[] value,
442       final Tag[] tags) {
443     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
444   }
445 
446   /**
447    * Constructs KeyValue structure filled with specified values.
448    * @param row row key
449    * @param family family name
450    * @param qualifier column qualifier
451    * @param timestamp version timestamp
452    * @param value column value
453    * @param tags tags non-empty list of tags or null
454    * @throws IllegalArgumentException
455    */
456   public KeyValue(final byte[] row, final byte[] family,
457       final byte[] qualifier, final long timestamp, final byte[] value,
458       final List<Tag> tags) {
459     this(row, 0, row==null ? 0 : row.length,
460       family, 0, family==null ? 0 : family.length,
461       qualifier, 0, qualifier==null ? 0 : qualifier.length,
462       timestamp, Type.Put,
463       value, 0, value==null ? 0 : value.length, tags);
464   }
465 
466   /**
467    * Constructs KeyValue structure filled with specified values.
468    * @param row row key
469    * @param family family name
470    * @param qualifier column qualifier
471    * @param timestamp version timestamp
472    * @param type key type
473    * @param value column value
474    * @throws IllegalArgumentException
475    */
476   public KeyValue(final byte[] row, final byte[] family,
477       final byte[] qualifier, final long timestamp, Type type,
478       final byte[] value) {
479     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
480         timestamp, type,   value, 0, len(value));
481   }
482 
483   /**
484    * Constructs KeyValue structure filled with specified values.
485    * <p>
486    * Column is split into two fields, family and qualifier.
487    * @param row row key
488    * @param family family name
489    * @param qualifier column qualifier
490    * @param timestamp version timestamp
491    * @param type key type
492    * @param value column value
493    * @throws IllegalArgumentException
494    */
495   public KeyValue(final byte[] row, final byte[] family,
496       final byte[] qualifier, final long timestamp, Type type,
497       final byte[] value, final List<Tag> tags) {
498     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
499         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
500   }
501 
502   /**
503    * Constructs KeyValue structure filled with specified values.
504    * @param row row key
505    * @param family family name
506    * @param qualifier column qualifier
507    * @param timestamp version timestamp
508    * @param type key type
509    * @param value column value
510    * @throws IllegalArgumentException
511    */
512   public KeyValue(final byte[] row, final byte[] family,
513       final byte[] qualifier, final long timestamp, Type type,
514       final byte[] value, final byte[] tags) {
515     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
516         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
517   }
518 
519   /**
520    * Constructs KeyValue structure filled with specified values.
521    * @param row row key
522    * @param family family name
523    * @param qualifier column qualifier
524    * @param qoffset qualifier offset
525    * @param qlength qualifier length
526    * @param timestamp version timestamp
527    * @param type key type
528    * @param value column value
529    * @param voffset value offset
530    * @param vlength value length
531    * @throws IllegalArgumentException
532    */
533   public KeyValue(byte [] row, byte [] family,
534       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
535       byte [] value, int voffset, int vlength, List<Tag> tags) {
536     this(row, 0, row==null ? 0 : row.length,
537         family, 0, family==null ? 0 : family.length,
538         qualifier, qoffset, qlength, timestamp, type,
539         value, voffset, vlength, tags);
540   }
541 
542   /**
543    * @param row
544    * @param family
545    * @param qualifier
546    * @param qoffset
547    * @param qlength
548    * @param timestamp
549    * @param type
550    * @param value
551    * @param voffset
552    * @param vlength
553    * @param tags
554    */
555   public KeyValue(byte [] row, byte [] family,
556       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
557       byte [] value, int voffset, int vlength, byte[] tags) {
558     this(row, 0, row==null ? 0 : row.length,
559         family, 0, family==null ? 0 : family.length,
560         qualifier, qoffset, qlength, timestamp, type,
561         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
562   }
563 
564   /**
565    * Constructs KeyValue structure filled with specified values.
566    * <p>
567    * Column is split into two fields, family and qualifier.
568    * @param row row key
569    * @throws IllegalArgumentException
570    */
571   public KeyValue(final byte [] row, final int roffset, final int rlength,
572       final byte [] family, final int foffset, final int flength,
573       final byte [] qualifier, final int qoffset, final int qlength,
574       final long timestamp, final Type type,
575       final byte [] value, final int voffset, final int vlength) {
576     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
577       qlength, timestamp, type, value, voffset, vlength, null);
578   }
579   
580   /**
581    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
582    * data buffer.
583    * <p>
584    * Column is split into two fields, family and qualifier.
585    *
586    * @param buffer the bytes buffer to use
587    * @param boffset buffer offset
588    * @param row row key
589    * @param roffset row offset
590    * @param rlength row length
591    * @param family family name
592    * @param foffset family offset
593    * @param flength family length
594    * @param qualifier column qualifier
595    * @param qoffset qualifier offset
596    * @param qlength qualifier length
597    * @param timestamp version timestamp
598    * @param type key type
599    * @param value column value
600    * @param voffset value offset
601    * @param vlength value length
602    * @param tags non-empty list of tags or null
603    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
604    * remaining in the buffer
605    */
606   public KeyValue(byte [] buffer, final int boffset,
607       final byte [] row, final int roffset, final int rlength,
608       final byte [] family, final int foffset, final int flength,
609       final byte [] qualifier, final int qoffset, final int qlength,
610       final long timestamp, final Type type,
611       final byte [] value, final int voffset, final int vlength,
612       final Tag[] tags) {
613      this.bytes  = buffer;
614      this.length = writeByteArray(buffer, boffset,
615          row, roffset, rlength,
616          family, foffset, flength, qualifier, qoffset, qlength,
617         timestamp, type, value, voffset, vlength, tags);
618      this.offset = boffset;
619    }
620 
621   /**
622    * Constructs KeyValue structure filled with specified values.
623    * <p>
624    * Column is split into two fields, family and qualifier.
625    * @param row row key
626    * @param roffset row offset
627    * @param rlength row length
628    * @param family family name
629    * @param foffset family offset
630    * @param flength family length
631    * @param qualifier column qualifier
632    * @param qoffset qualifier offset
633    * @param qlength qualifier length
634    * @param timestamp version timestamp
635    * @param type key type
636    * @param value column value
637    * @param voffset value offset
638    * @param vlength value length
639    * @param tags tags
640    * @throws IllegalArgumentException
641    */
642   public KeyValue(final byte [] row, final int roffset, final int rlength,
643       final byte [] family, final int foffset, final int flength,
644       final byte [] qualifier, final int qoffset, final int qlength,
645       final long timestamp, final Type type,
646       final byte [] value, final int voffset, final int vlength,
647       final List<Tag> tags) {
648     this.bytes = createByteArray(row, roffset, rlength,
649         family, foffset, flength, qualifier, qoffset, qlength,
650         timestamp, type, value, voffset, vlength, tags);
651     this.length = bytes.length;
652     this.offset = 0;
653   }
654 
655   /**
656    * @param row
657    * @param roffset
658    * @param rlength
659    * @param family
660    * @param foffset
661    * @param flength
662    * @param qualifier
663    * @param qoffset
664    * @param qlength
665    * @param timestamp
666    * @param type
667    * @param value
668    * @param voffset
669    * @param vlength
670    * @param tags
671    */
672   public KeyValue(final byte [] row, final int roffset, final int rlength,
673       final byte [] family, final int foffset, final int flength,
674       final byte [] qualifier, final int qoffset, final int qlength,
675       final long timestamp, final Type type,
676       final byte [] value, final int voffset, final int vlength,
677       final byte[] tags, final int tagsOffset, final int tagsLength) {
678     this.bytes = createByteArray(row, roffset, rlength,
679         family, foffset, flength, qualifier, qoffset, qlength,
680         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
681     this.length = bytes.length;
682     this.offset = 0;
683   }
684 
685   /**
686    * Constructs an empty KeyValue structure, with specified sizes.
687    * This can be used to partially fill up KeyValues.
688    * <p>
689    * Column is split into two fields, family and qualifier.
690    * @param rlength row length
691    * @param flength family length
692    * @param qlength qualifier length
693    * @param timestamp version timestamp
694    * @param type key type
695    * @param vlength value length
696    * @throws IllegalArgumentException
697    */
698   public KeyValue(final int rlength,
699       final int flength,
700       final int qlength,
701       final long timestamp, final Type type,
702       final int vlength) {
703     this(rlength, flength, qlength, timestamp, type, vlength, 0);
704   }
705 
706   /**
707    * Constructs an empty KeyValue structure, with specified sizes.
708    * This can be used to partially fill up KeyValues.
709    * <p>
710    * Column is split into two fields, family and qualifier.
711    * @param rlength row length
712    * @param flength family length
713    * @param qlength qualifier length
714    * @param timestamp version timestamp
715    * @param type key type
716    * @param vlength value length
717    * @param tagsLength
718    * @throws IllegalArgumentException
719    */
720   public KeyValue(final int rlength,
721       final int flength,
722       final int qlength,
723       final long timestamp, final Type type,
724       final int vlength, final int tagsLength) {
725     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
726         tagsLength);
727     this.length = bytes.length;
728     this.offset = 0;
729   }
730 
731 
732   public KeyValue(byte[] row, int roffset, int rlength,
733                   byte[] family, int foffset, int flength,
734                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
735     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
736         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
737         value, 0, value == null ? 0 : value.remaining(), tags);
738     this.length = bytes.length;
739     this.offset = 0;
740   }
741 
742   public KeyValue(Cell c) {
743     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
744         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
745         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
746         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
747         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
748   }
749 
750   /**
751    * Create an empty byte[] representing a KeyValue
752    * All lengths are preset and can be filled in later.
753    * @param rlength
754    * @param flength
755    * @param qlength
756    * @param timestamp
757    * @param type
758    * @param vlength
759    * @return The newly created byte array.
760    */
761   private static byte[] createEmptyByteArray(final int rlength, int flength,
762       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
763     if (rlength > Short.MAX_VALUE) {
764       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
765     }
766     if (flength > Byte.MAX_VALUE) {
767       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
768     }
769     // Qualifier length
770     if (qlength > Integer.MAX_VALUE - rlength - flength) {
771       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
772     }
773     checkForTagsLength(tagsLength);
774     // Key length
775     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
776     if (longkeylength > Integer.MAX_VALUE) {
777       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
778         Integer.MAX_VALUE);
779     }
780     int keylength = (int)longkeylength;
781     // Value length
782     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
783       throw new IllegalArgumentException("Valuer > " +
784           HConstants.MAXIMUM_VALUE_LENGTH);
785     }
786 
787     // Allocate right-sized byte array.
788     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
789         tagsLength)];
790     // Write the correct size markers
791     int pos = 0;
792     pos = Bytes.putInt(bytes, pos, keylength);
793     pos = Bytes.putInt(bytes, pos, vlength);
794     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
795     pos += rlength;
796     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
797     pos += flength + qlength;
798     pos = Bytes.putLong(bytes, pos, timestamp);
799     pos = Bytes.putByte(bytes, pos, type.getCode());
800     pos += vlength;
801     if (tagsLength > 0) {
802       pos = Bytes.putAsShort(bytes, pos, tagsLength);
803     }
804     return bytes;
805   }
806 
807   /**
808    * Checks the parameters passed to a constructor.
809    *
810    * @param row row key
811    * @param rlength row length
812    * @param family family name
813    * @param flength family length
814    * @param qlength qualifier length
815    * @param vlength value length
816    *
817    * @throws IllegalArgumentException an illegal value was passed
818    */
819   private static void checkParameters(final byte [] row, final int rlength,
820       final byte [] family, int flength, int qlength, int vlength)
821           throws IllegalArgumentException {
822     if (rlength > Short.MAX_VALUE) {
823       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
824     }
825     if (row == null) {
826       throw new IllegalArgumentException("Row is null");
827     }
828     // Family length
829     flength = family == null ? 0 : flength;
830     if (flength > Byte.MAX_VALUE) {
831       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
832     }
833     // Qualifier length
834     if (qlength > Integer.MAX_VALUE - rlength - flength) {
835       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
836     }
837     // Key length
838     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
839     if (longKeyLength > Integer.MAX_VALUE) {
840       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
841           Integer.MAX_VALUE);
842     }
843     // Value length
844     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
845       throw new IllegalArgumentException("Value length " + vlength + " > " +
846           HConstants.MAXIMUM_VALUE_LENGTH);
847     }
848   }
849 
850   /**
851    * Write KeyValue format into the provided byte array.
852    *
853    * @param buffer the bytes buffer to use
854    * @param boffset buffer offset
855    * @param row row key
856    * @param roffset row offset
857    * @param rlength row length
858    * @param family family name
859    * @param foffset family offset
860    * @param flength family length
861    * @param qualifier column qualifier
862    * @param qoffset qualifier offset
863    * @param qlength qualifier length
864    * @param timestamp version timestamp
865    * @param type key type
866    * @param value column value
867    * @param voffset value offset
868    * @param vlength value length
869    *
870    * @return The number of useful bytes in the buffer.
871    *
872    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
873    * remaining in the buffer
874    */
875   public static int writeByteArray(byte [] buffer, final int boffset,
876       final byte [] row, final int roffset, final int rlength,
877       final byte [] family, final int foffset, int flength,
878       final byte [] qualifier, final int qoffset, int qlength,
879       final long timestamp, final Type type,
880       final byte [] value, final int voffset, int vlength, Tag[] tags) {
881 
882     checkParameters(row, rlength, family, flength, qlength, vlength);
883 
884     // Calculate length of tags area
885     int tagsLength = 0;
886     if (tags != null && tags.length > 0) {
887       for (Tag t: tags) {
888         tagsLength += t.getLength();
889       }
890     }
891     checkForTagsLength(tagsLength);
892     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
893     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
894         tagsLength);
895     if (keyValueLength > buffer.length - boffset) {
896       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
897           keyValueLength);
898     }
899 
900     // Write key, value and key row length.
901     int pos = boffset;
902     pos = Bytes.putInt(buffer, pos, keyLength);
903     pos = Bytes.putInt(buffer, pos, vlength);
904     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
905     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
906     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
907     if (flength != 0) {
908       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
909     }
910     if (qlength != 0) {
911       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
912     }
913     pos = Bytes.putLong(buffer, pos, timestamp);
914     pos = Bytes.putByte(buffer, pos, type.getCode());
915     if (value != null && value.length > 0) {
916       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
917     }
918     // Write the number of tags. If it is 0 then it means there are no tags.
919     if (tagsLength > 0) {
920       pos = Bytes.putAsShort(buffer, pos, tagsLength);
921       for (Tag t : tags) {
922         pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
923       }
924     }
925     return keyValueLength;
926   }
927 
928   private static void checkForTagsLength(int tagsLength) {
929     if (tagsLength > MAX_TAGS_LENGTH) {
930       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
931     }
932   }
933 
934   /**
935    * Write KeyValue format into a byte array.
936    * @param row row key
937    * @param roffset row offset
938    * @param rlength row length
939    * @param family family name
940    * @param foffset family offset
941    * @param flength family length
942    * @param qualifier column qualifier
943    * @param qoffset qualifier offset
944    * @param qlength qualifier length
945    * @param timestamp version timestamp
946    * @param type key type
947    * @param value column value
948    * @param voffset value offset
949    * @param vlength value length
950    * @return The newly created byte array.
951    */
952   private static byte [] createByteArray(final byte [] row, final int roffset,
953       final int rlength, final byte [] family, final int foffset, int flength,
954       final byte [] qualifier, final int qoffset, int qlength,
955       final long timestamp, final Type type,
956       final byte [] value, final int voffset, 
957       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
958 
959     checkParameters(row, rlength, family, flength, qlength, vlength);
960     checkForTagsLength(tagsLength);
961     // Allocate right-sized byte array.
962     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
963     byte [] bytes =
964         new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
965     // Write key, value and key row length.
966     int pos = 0;
967     pos = Bytes.putInt(bytes, pos, keyLength);
968     pos = Bytes.putInt(bytes, pos, vlength);
969     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
970     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
971     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
972     if(flength != 0) {
973       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
974     }
975     if(qlength != 0) {
976       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
977     }
978     pos = Bytes.putLong(bytes, pos, timestamp);
979     pos = Bytes.putByte(bytes, pos, type.getCode());
980     if (value != null && value.length > 0) {
981       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
982     }
983     // Add the tags after the value part
984     if (tagsLength > 0) {
985       pos = Bytes.putAsShort(bytes, pos, tagsLength);
986       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
987     }
988     return bytes;
989   }
990 
991   /**
992    * @param qualifier can be a ByteBuffer or a byte[], or null.
993    * @param value can be a ByteBuffer or a byte[], or null.
994    */
995   private static byte [] createByteArray(final byte [] row, final int roffset,
996       final int rlength, final byte [] family, final int foffset, int flength,
997       final Object qualifier, final int qoffset, int qlength,
998       final long timestamp, final Type type,
999       final Object value, final int voffset, int vlength, List<Tag> tags) {
1000 
1001     checkParameters(row, rlength, family, flength, qlength, vlength);
1002 
1003     // Calculate length of tags area
1004     int tagsLength = 0;
1005     if (tags != null && !tags.isEmpty()) {
1006       for (Tag t : tags) {
1007         tagsLength += t.getLength();
1008       }
1009     }
1010     checkForTagsLength(tagsLength);
1011     // Allocate right-sized byte array.
1012     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1013     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1014         tagsLength)];
1015 
1016     // Write key, value and key row length.
1017     int pos = 0;
1018     pos = Bytes.putInt(bytes, pos, keyLength);
1019 
1020     pos = Bytes.putInt(bytes, pos, vlength);
1021     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1022     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1023     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1024     if(flength != 0) {
1025       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1026     }
1027     if (qlength > 0) {
1028       if (qualifier instanceof ByteBuffer) {
1029         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1030       } else {
1031         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1032       }
1033     }
1034     pos = Bytes.putLong(bytes, pos, timestamp);
1035     pos = Bytes.putByte(bytes, pos, type.getCode());
1036     if (vlength > 0) {
1037       if (value instanceof ByteBuffer) {
1038         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1039       } else {
1040         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1041       }
1042     }
1043     // Add the tags after the value part
1044     if (tagsLength > 0) {
1045       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1046       for (Tag t : tags) {
1047         pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
1048       }
1049     }
1050     return bytes;
1051   }
1052 
1053   /**
1054    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1055    */
1056   @Override
1057   public boolean equals(Object other) {
1058     if (!(other instanceof Cell)) {
1059       return false;
1060     }
1061     return CellComparator.equals(this, (Cell)other);
1062   }
1063 
1064   @Override
1065   public int hashCode() {
1066     byte[] b = getBuffer();
1067     int start = getOffset(), end = getOffset() + getLength();
1068     int h = b[start++];
1069     for (int i = start; i < end; i++) {
1070       h = (h * 13) ^ b[i];
1071     }
1072     return h;
1073   }
1074 
1075   //---------------------------------------------------------------------------
1076   //
1077   //  KeyValue cloning
1078   //
1079   //---------------------------------------------------------------------------
1080 
1081   /**
1082    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1083    * @return Fully copied clone of this KeyValue
1084    * @throws CloneNotSupportedException
1085    */
1086   @Override
1087   public KeyValue clone() throws CloneNotSupportedException {
1088     super.clone();
1089     byte [] b = new byte[this.length];
1090     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1091     KeyValue ret = new KeyValue(b, 0, b.length);
1092     // Important to clone the memstoreTS as well - otherwise memstore's
1093     // update-in-place methods (eg increment) will end up creating
1094     // new entries
1095     ret.setSequenceId(seqId);
1096     return ret;
1097   }
1098 
1099   /**
1100    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1101    * http://en.wikipedia.org/wiki/Object_copy
1102    * @return Shallow copy of this KeyValue
1103    */
1104   public KeyValue shallowCopy() {
1105     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1106     shallowCopy.setSequenceId(this.seqId);
1107     return shallowCopy;
1108   }
1109 
1110   //---------------------------------------------------------------------------
1111   //
1112   //  String representation
1113   //
1114   //---------------------------------------------------------------------------
1115 
1116   public String toString() {
1117     if (this.bytes == null || this.bytes.length == 0) {
1118       return "empty";
1119     }
1120     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
1121       + getValueLength() + "/seqid=" + seqId;
1122   }
1123 
1124   /**
1125    * @param k Key portion of a KeyValue.
1126    * @return Key as a String, empty string if k is null. 
1127    */
1128   public static String keyToString(final byte [] k) {
1129     if (k == null) { 
1130       return "";
1131     }
1132     return keyToString(k, 0, k.length);
1133   }
1134 
1135   /**
1136    * Produces a string map for this key/value pair. Useful for programmatic use
1137    * and manipulation of the data stored in an HLogKey, for example, printing
1138    * as JSON. Values are left out due to their tendency to be large. If needed,
1139    * they can be added manually.
1140    *
1141    * @return the Map<String,?> containing data from this key
1142    */
1143   public Map<String, Object> toStringMap() {
1144     Map<String, Object> stringMap = new HashMap<String, Object>();
1145     stringMap.put("row", Bytes.toStringBinary(getRow()));
1146     stringMap.put("family", Bytes.toStringBinary(getFamily()));
1147     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
1148     stringMap.put("timestamp", getTimestamp());
1149     stringMap.put("vlen", getValueLength());
1150     List<Tag> tags = getTags();
1151     if (tags != null) {
1152       List<String> tagsString = new ArrayList<String>();
1153       for (Tag t : tags) {
1154         tagsString.add((t.getType()) + ":" +Bytes.toStringBinary(t.getValue()));
1155       }
1156       stringMap.put("tag", tagsString);
1157     }
1158     return stringMap;
1159   }
1160 
1161   /**
1162    * Use for logging.
1163    * @param b Key portion of a KeyValue.
1164    * @param o Offset to start of key
1165    * @param l Length of key.
1166    * @return Key as a String.
1167    */
1168   public static String keyToString(final byte [] b, final int o, final int l) {
1169     if (b == null) return "";
1170     int rowlength = Bytes.toShort(b, o);
1171     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1172     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1173     int familylength = b[columnoffset - 1];
1174     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1175     String family = familylength == 0? "":
1176       Bytes.toStringBinary(b, columnoffset, familylength);
1177     String qualifier = columnlength == 0? "":
1178       Bytes.toStringBinary(b, columnoffset + familylength,
1179       columnlength - familylength);
1180     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1181     String timestampStr = humanReadableTimestamp(timestamp);
1182     byte type = b[o + l - 1];
1183     return row + "/" + family +
1184       (family != null && family.length() > 0? ":" :"") +
1185       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1186   }
1187 
1188   public static String humanReadableTimestamp(final long timestamp) {
1189     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1190       return "LATEST_TIMESTAMP";
1191     }
1192     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1193       return "OLDEST_TIMESTAMP";
1194     }
1195     return String.valueOf(timestamp);
1196   }
1197 
1198   //---------------------------------------------------------------------------
1199   //
1200   //  Public Member Accessors
1201   //
1202   //---------------------------------------------------------------------------
1203 
1204   /**
1205    * @return The byte array backing this KeyValue.
1206    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1207    */
1208   @Deprecated
1209   public byte [] getBuffer() {
1210     return this.bytes;
1211   }
1212 
1213   /**
1214    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1215    */
1216   public int getOffset() {
1217     return this.offset;
1218   }
1219 
1220   /**
1221    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1222    */
1223   public int getLength() {
1224     return length;
1225   }
1226 
1227   //---------------------------------------------------------------------------
1228   //
1229   //  Length and Offset Calculators
1230   //
1231   //---------------------------------------------------------------------------
1232 
1233   /**
1234    * Determines the total length of the KeyValue stored in the specified
1235    * byte array and offset.  Includes all headers.
1236    * @param bytes byte array
1237    * @param offset offset to start of the KeyValue
1238    * @return length of entire KeyValue, in bytes
1239    */
1240   private static int getLength(byte [] bytes, int offset) {
1241     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1242     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1243     return klength + vlength;
1244   }
1245 
1246   /**
1247    * @return Key offset in backing buffer..
1248    */
1249   public int getKeyOffset() {
1250     return this.offset + ROW_OFFSET;
1251   }
1252 
1253   public String getKeyString() {
1254     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1255   }
1256 
1257   /**
1258    * @return Length of key portion.
1259    */
1260   public int getKeyLength() {
1261     return Bytes.toInt(this.bytes, this.offset);
1262   }
1263 
1264   /**
1265    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1266    */
1267   @Override
1268   public byte[] getValueArray() {
1269     return bytes;
1270   }
1271 
1272   /**
1273    * @return the value offset
1274    */
1275   @Override
1276   public int getValueOffset() {
1277     int voffset = getKeyOffset() + getKeyLength();
1278     return voffset;
1279   }
1280 
1281   /**
1282    * @return Value length
1283    */
1284   @Override
1285   public int getValueLength() {
1286     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1287     return vlength;
1288   }
1289 
1290   /**
1291    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1292    */
1293   @Override
1294   public byte[] getRowArray() {
1295     return bytes;
1296   }
1297 
1298   /**
1299    * @return Row offset
1300    */
1301   @Override
1302   public int getRowOffset() {
1303     return getKeyOffset() + Bytes.SIZEOF_SHORT;
1304   }
1305 
1306   /**
1307    * @return Row length
1308    */
1309   @Override
1310   public short getRowLength() {
1311     return Bytes.toShort(this.bytes, getKeyOffset());
1312   }
1313 
1314   /**
1315    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1316    */
1317   @Override
1318   public byte[] getFamilyArray() {
1319     return bytes;
1320   }
1321 
1322   /**
1323    * @return Family offset
1324    */
1325   @Override
1326   public int getFamilyOffset() {
1327     return getFamilyOffset(getRowLength());
1328   }
1329 
1330   /**
1331    * @return Family offset
1332    */
1333   private int getFamilyOffset(int rlength) {
1334     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
1335   }
1336 
1337   /**
1338    * @return Family length
1339    */
1340   @Override
1341   public byte getFamilyLength() {
1342     return getFamilyLength(getFamilyOffset());
1343   }
1344 
1345   /**
1346    * @return Family length
1347    */
1348   public byte getFamilyLength(int foffset) {
1349     return this.bytes[foffset-1];
1350   }
1351 
1352   /**
1353    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1354    */
1355   @Override
1356   public byte[] getQualifierArray() {
1357     return bytes;
1358   }
1359 
1360   /**
1361    * @return Qualifier offset
1362    */
1363   @Override
1364   public int getQualifierOffset() {
1365     return getQualifierOffset(getFamilyOffset());
1366   }
1367 
1368   /**
1369    * @return Qualifier offset
1370    */
1371   private int getQualifierOffset(int foffset) {
1372     return foffset + getFamilyLength(foffset);
1373   }
1374 
1375   /**
1376    * @return Qualifier length
1377    */
1378   @Override
1379   public int getQualifierLength() {
1380     return getQualifierLength(getRowLength(),getFamilyLength());
1381   }
1382 
1383   /**
1384    * @return Qualifier length
1385    */
1386   private int getQualifierLength(int rlength, int flength) {
1387     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1388   }
1389 
1390   /**
1391    * @return Column (family + qualifier) length
1392    */
1393   private int getTotalColumnLength(int rlength, int foffset) {
1394     int flength = getFamilyLength(foffset);
1395     int qlength = getQualifierLength(rlength,flength);
1396     return flength + qlength;
1397   }
1398 
1399   /**
1400    * @return Timestamp offset
1401    */
1402   public int getTimestampOffset() {
1403     return getTimestampOffset(getKeyLength());
1404   }
1405 
1406   /**
1407    * @param keylength Pass if you have it to save on a int creation.
1408    * @return Timestamp offset
1409    */
1410   private int getTimestampOffset(final int keylength) {
1411     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1412   }
1413 
1414   /**
1415    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1416    */
1417   public boolean isLatestTimestamp() {
1418     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1419       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1420   }
1421 
1422   /**
1423    * @param now Time to set into <code>this</code> IFF timestamp ==
1424    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1425    * @return True is we modified this.
1426    */
1427   public boolean updateLatestStamp(final byte [] now) {
1428     if (this.isLatestTimestamp()) {
1429       int tsOffset = getTimestampOffset();
1430       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1431       // clear cache or else getTimestamp() possibly returns an old value
1432       return true;
1433     }
1434     return false;
1435   }
1436 
1437   //---------------------------------------------------------------------------
1438   //
1439   //  Methods that return copies of fields
1440   //
1441   //---------------------------------------------------------------------------
1442 
1443   /**
1444    * Do not use unless you have to.  Used internally for compacting and testing.
1445    *
1446    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
1447    * {@link #getValue()} if accessing a KeyValue client-side.
1448    * @return Copy of the key portion only.
1449    */
1450   public byte [] getKey() {
1451     int keylength = getKeyLength();
1452     byte [] key = new byte[keylength];
1453     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1454     return key;
1455   }
1456 
1457   /**
1458    * Returns value in a new byte array.
1459    * Primarily for use client-side. If server-side, use
1460    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1461    * save on allocations.
1462    * @return Value in a new byte array.
1463    */
1464   @Deprecated // use CellUtil.getValueArray()
1465   public byte [] getValue() {
1466     return CellUtil.cloneValue(this);
1467   }
1468 
1469   /**
1470    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1471    * byte array.<p>
1472    *
1473    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1474    * lengths instead.
1475    * @return Row in a new byte array.
1476    */
1477   @Deprecated // use CellUtil.getRowArray()
1478   public byte [] getRow() {
1479     return CellUtil.cloneRow(this);
1480   }
1481 
1482   /**
1483    *
1484    * @return Timestamp
1485    */
1486   @Override
1487   public long getTimestamp() {
1488     return getTimestamp(getKeyLength());
1489   }
1490 
1491   /**
1492    * @param keylength Pass if you have it to save on a int creation.
1493    * @return Timestamp
1494    */
1495   long getTimestamp(final int keylength) {
1496     int tsOffset = getTimestampOffset(keylength);
1497     return Bytes.toLong(this.bytes, tsOffset);
1498   }
1499 
1500   /**
1501    * @return Type of this KeyValue.
1502    */
1503   @Deprecated
1504   public byte getType() {
1505     return getTypeByte();
1506   }
1507 
1508   /**
1509    * @return KeyValue.TYPE byte representation
1510    */
1511   @Override
1512   public byte getTypeByte() {
1513     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1514   }
1515 
1516   /**
1517    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1518    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1519    * KeyValue type.
1520    */
1521   @Deprecated // use CellUtil#isDelete
1522   public boolean isDelete() {
1523     return KeyValue.isDelete(getType());
1524   }
1525 
1526   /**
1527    * Primarily for use client-side.  Returns the family of this KeyValue in a
1528    * new byte array.<p>
1529    *
1530    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1531    * lengths instead.
1532    * @return Returns family. Makes a copy.
1533    */
1534   @Deprecated // use CellUtil.getFamilyArray
1535   public byte [] getFamily() {
1536     return CellUtil.cloneFamily(this);
1537   }
1538 
1539   /**
1540    * Primarily for use client-side.  Returns the column qualifier of this
1541    * KeyValue in a new byte array.<p>
1542    *
1543    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1544    * lengths instead.
1545    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1546    * @return Returns qualifier. Makes a copy.
1547    */
1548   @Deprecated // use CellUtil.getQualifierArray
1549   public byte [] getQualifier() {
1550     return CellUtil.cloneQualifier(this);
1551   }
1552 
1553   /**
1554    * This returns the offset where the tag actually starts.
1555    */
1556   @Override
1557   public int getTagsOffset() {
1558     int tagsLen = getTagsLength();
1559     if (tagsLen == 0) {
1560       return this.offset + this.length;
1561     }
1562     return this.offset + this.length - tagsLen;
1563   }
1564 
1565   /**
1566    * This returns the total length of the tag bytes
1567    */
1568   @Override
1569   public int getTagsLength() {
1570     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1571     if (tagsLen > 0) {
1572       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1573       // length
1574       tagsLen -= TAGS_LENGTH_SIZE;
1575     }
1576     return tagsLen;
1577   }
1578 
1579   /**
1580    * Returns any tags embedded in the KeyValue.  Used in testcases.
1581    * @return The tags
1582    */
1583   public List<Tag> getTags() {
1584     int tagsLength = getTagsLength();
1585     if (tagsLength == 0) {
1586       return EMPTY_ARRAY_LIST;
1587     }
1588     return Tag.asList(getTagsArray(), getTagsOffset(), tagsLength);
1589   }
1590 
1591   /**
1592    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1593    */
1594   @Override
1595   public byte[] getTagsArray() {
1596     return bytes;
1597   }
1598 
1599   /**
1600    * Creates a new KeyValue that only contains the key portion (the value is
1601    * set to be null).
1602    *
1603    * TODO only used by KeyOnlyFilter -- move there.
1604    * @param lenAsVal replace value with the actual value length (false=empty)
1605    */
1606   public KeyValue createKeyOnly(boolean lenAsVal) {
1607     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1608     // Rebuild as: <keylen:4><0:4><key:keylen>
1609     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1610     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1611     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1612         Math.min(newBuffer.length,this.length));
1613     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1614     if (lenAsVal) {
1615       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1616     }
1617     return new KeyValue(newBuffer);
1618   }
1619 
1620   /**
1621    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1622    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1623    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1624    * <p>
1625    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1626    * </p>
1627    * <p>
1628    * Not recommend to be used as this is old-style API.
1629    * </p>
1630    * @param c The column.
1631    * @return The parsed column.
1632    */
1633   public static byte [][] parseColumn(byte [] c) {
1634     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1635     if (index == -1) {
1636       // If no delimiter, return array of size 1
1637       return new byte [][] { c };
1638     } else if(index == c.length - 1) {
1639       // family with empty qualifier, return array size 2
1640       byte [] family = new byte[c.length-1];
1641       System.arraycopy(c, 0, family, 0, family.length);
1642       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1643     }
1644     // Family and column, return array size 2
1645     final byte [][] result = new byte [2][];
1646     result[0] = new byte [index];
1647     System.arraycopy(c, 0, result[0], 0, index);
1648     final int len = c.length - (index + 1);
1649     result[1] = new byte[len];
1650     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1651     return result;
1652   }
1653 
1654   /**
1655    * Makes a column in family:qualifier form from separate byte arrays.
1656    * <p>
1657    * Not recommended for usage as this is old-style API.
1658    * @param family
1659    * @param qualifier
1660    * @return family:qualifier
1661    */
1662   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1663     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1664   }
1665 
1666   /**
1667    * @param b
1668    * @param delimiter
1669    * @return Index of delimiter having started from start of <code>b</code>
1670    * moving rightward.
1671    */
1672   public static int getDelimiter(final byte [] b, int offset, final int length,
1673       final int delimiter) {
1674     if (b == null) {
1675       throw new IllegalArgumentException("Passed buffer is null");
1676     }
1677     int result = -1;
1678     for (int i = offset; i < length + offset; i++) {
1679       if (b[i] == delimiter) {
1680         result = i;
1681         break;
1682       }
1683     }
1684     return result;
1685   }
1686 
1687   /**
1688    * Find index of passed delimiter walking from end of buffer backwards.
1689    * @param b
1690    * @param delimiter
1691    * @return Index of delimiter
1692    */
1693   public static int getDelimiterInReverse(final byte [] b, final int offset,
1694       final int length, final int delimiter) {
1695     if (b == null) {
1696       throw new IllegalArgumentException("Passed buffer is null");
1697     }
1698     int result = -1;
1699     for (int i = (offset + length) - 1; i >= offset; i--) {
1700       if (b[i] == delimiter) {
1701         result = i;
1702         break;
1703       }
1704     }
1705     return result;
1706   }
1707 
1708   /**
1709    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1710    * {@link KeyValue}s.
1711    */
1712   public static class MetaComparator extends KVComparator {
1713     /**
1714      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1715      * table.
1716      */
1717     @Override
1718     public int compare(final Cell left, final Cell right) {
1719       int c = compareRowKey(left, right);
1720       if (c != 0) {
1721         return c;
1722       }
1723       return CellComparator.compareWithoutRow(left, right);
1724     }
1725 
1726     @Override
1727     public int compareOnlyKeyPortion(Cell left, Cell right) {
1728       return compare(left, right);
1729     }
1730 
1731     @Override
1732     public int compareRows(byte [] left, int loffset, int llength,
1733         byte [] right, int roffset, int rlength) {
1734       int leftDelimiter = getDelimiter(left, loffset, llength,
1735           HConstants.DELIMITER);
1736       int rightDelimiter = getDelimiter(right, roffset, rlength,
1737           HConstants.DELIMITER);
1738       // Compare up to the delimiter
1739       int lpart = (leftDelimiter < 0 ? llength :leftDelimiter - loffset);
1740       int rpart = (rightDelimiter < 0 ? rlength :rightDelimiter - roffset);
1741       int result = Bytes.compareTo(left, loffset, lpart, right, roffset, rpart);
1742       if (result != 0) {
1743         return result;
1744       } else {
1745         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1746           return -1;
1747         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1748           return 1;
1749         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1750           return 0;
1751         }
1752       }
1753       // Compare middle bit of the row.
1754       // Move past delimiter
1755       leftDelimiter++;
1756       rightDelimiter++;
1757       int leftFarDelimiter = getDelimiterInReverse(left, leftDelimiter,
1758           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1759       int rightFarDelimiter = getDelimiterInReverse(right,
1760           rightDelimiter, rlength - (rightDelimiter - roffset),
1761           HConstants.DELIMITER);
1762       // Now compare middlesection of row.
1763       lpart = (leftFarDelimiter < 0 ? llength + loffset: leftFarDelimiter) - leftDelimiter;
1764       rpart = (rightFarDelimiter < 0 ? rlength + roffset: rightFarDelimiter)- rightDelimiter;
1765       result = super.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart);
1766       if (result != 0) {
1767         return result;
1768       }  else {
1769         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1770           return -1;
1771         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1772           return 1;
1773         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1774           return 0;
1775         }
1776       }
1777       // Compare last part of row, the rowid.
1778       leftFarDelimiter++;
1779       rightFarDelimiter++;
1780       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1781           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1782       return result;
1783     }
1784 
1785     /**
1786      * Don't do any fancy Block Index splitting tricks.
1787      */
1788     @Override
1789     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1790       return Arrays.copyOf(rightKey, rightKey.length);
1791     }
1792 
1793     /**
1794      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1795      * instantiate the appropriate comparator.
1796      * TODO: With V3 consider removing this.
1797      * @return legacy class name for FileFileTrailer#comparatorClassName
1798      */
1799     @Override
1800     public String getLegacyKeyComparatorName() {
1801       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1802     }
1803 
1804     @Override
1805     protected Object clone() throws CloneNotSupportedException {
1806       return new MetaComparator();
1807     }
1808 
1809     /**
1810      * Override the row key comparison to parse and compare the meta row key parts.
1811      */
1812     @Override
1813     protected int compareRowKey(final Cell l, final Cell r) {
1814       byte[] left = l.getRowArray();
1815       int loffset = l.getRowOffset();
1816       int llength = l.getRowLength();
1817       byte[] right = r.getRowArray();
1818       int roffset = r.getRowOffset();
1819       int rlength = r.getRowLength();
1820       return compareRows(left, loffset, llength, right, roffset, rlength);
1821     }
1822   }
1823 
1824   /**
1825    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1826    * portion.  This means two KeyValues with same Key but different Values are
1827    * considered the same as far as this Comparator is concerned.
1828    */
1829   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1830 
1831     /**
1832      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1833      * instantiate the appropriate comparator.
1834      * TODO: With V3 consider removing this.
1835      * @return legacy class name for FileFileTrailer#comparatorClassName
1836      */
1837     public String getLegacyKeyComparatorName() {
1838       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1839     }
1840 
1841     @Override // RawComparator
1842     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1843       return compareFlatKey(l,loff,llen, r,roff,rlen);
1844     }
1845 
1846     
1847     /**
1848      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1849      * @param left
1850      * @param right
1851      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1852      */
1853     protected int compareRowKey(final Cell left, final Cell right) {
1854       return CellComparator.compareRows(left, right);
1855     }
1856 
1857     /**
1858      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1859      * full KVs laid out in a flat byte[]s.
1860      * @param left
1861      * @param loffset
1862      * @param llength
1863      * @param right
1864      * @param roffset
1865      * @param rlength
1866      * @return  0 if equal, <0 if left smaller, >0 if right smaller
1867      */
1868     public int compareFlatKey(byte[] left, int loffset, int llength,
1869         byte[] right, int roffset, int rlength) {
1870       // Compare row
1871       short lrowlength = Bytes.toShort(left, loffset);
1872       short rrowlength = Bytes.toShort(right, roffset);
1873       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1874           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1875       if (compare != 0) {
1876         return compare;
1877       }
1878 
1879       // Compare the rest of the two KVs without making any assumptions about
1880       // the common prefix. This function will not compare rows anyway, so we
1881       // don't need to tell it that the common prefix includes the row.
1882       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1883           rlength, rrowlength);
1884     }
1885 
1886     public int compareFlatKey(byte[] left, byte[] right) {
1887       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1888     }
1889 
1890     public int compareOnlyKeyPortion(Cell left, Cell right) {
1891       return CellComparator.compareStatic(left, right, true);
1892     }
1893 
1894     /**
1895      * Compares the Key of a cell -- with fields being more significant in this order:
1896      * rowkey, colfam/qual, timestamp, type, mvcc
1897      */
1898     @Override
1899     public int compare(final Cell left, final Cell right) {
1900       int compare = CellComparator.compareStatic(left, right, false);
1901       return compare;
1902     }
1903 
1904     public int compareTimestamps(final Cell left, final Cell right) {
1905       return CellComparator.compareTimestamps(left, right);
1906     }
1907 
1908     /**
1909      * @param left
1910      * @param right
1911      * @return Result comparing rows.
1912      */
1913     public int compareRows(final Cell left, final Cell right) {
1914       return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(),
1915       right.getRowArray(), right.getRowOffset(), right.getRowLength());
1916     }
1917 
1918     /**
1919      * Get the b[],o,l for left and right rowkey portions and compare.
1920      * @param left
1921      * @param loffset
1922      * @param llength
1923      * @param right
1924      * @param roffset
1925      * @param rlength
1926      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1927      */
1928     public int compareRows(byte [] left, int loffset, int llength,
1929         byte [] right, int roffset, int rlength) {
1930       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
1931     }
1932 
1933     int compareColumns(final Cell left, final short lrowlength, final Cell right,
1934         final short rrowlength) {
1935       return CellComparator.compareColumns(left, right);
1936     }
1937 
1938     protected int compareColumns(
1939         byte [] left, int loffset, int llength, final int lfamilylength,
1940         byte [] right, int roffset, int rlength, final int rfamilylength) {
1941       // Compare family portion first.
1942       int diff = Bytes.compareTo(left, loffset, lfamilylength,
1943         right, roffset, rfamilylength);
1944       if (diff != 0) {
1945         return diff;
1946       }
1947       // Compare qualifier portion
1948       return Bytes.compareTo(left, loffset + lfamilylength,
1949         llength - lfamilylength,
1950         right, roffset + rfamilylength, rlength - rfamilylength);
1951       }
1952 
1953     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
1954       // The below older timestamps sorting ahead of newer timestamps looks
1955       // wrong but it is intentional. This way, newer timestamps are first
1956       // found when we iterate over a memstore and newer versions are the
1957       // first we trip over when reading from a store file.
1958       if (ltimestamp < rtimestamp) {
1959         return 1;
1960       } else if (ltimestamp > rtimestamp) {
1961         return -1;
1962       }
1963       return 0;
1964     }
1965 
1966     /**
1967      * Overridden
1968      * @param commonPrefix
1969      * @param left
1970      * @param loffset
1971      * @param llength
1972      * @param right
1973      * @param roffset
1974      * @param rlength
1975      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1976      */
1977     @Override // SamePrefixComparator
1978     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
1979         int loffset, int llength, byte[] right, int roffset, int rlength) {
1980       // Compare row
1981       short lrowlength = Bytes.toShort(left, loffset);
1982       short rrowlength;
1983 
1984       int comparisonResult = 0;
1985       if (commonPrefix < ROW_LENGTH_SIZE) {
1986         // almost nothing in common
1987         rrowlength = Bytes.toShort(right, roffset);
1988         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
1989             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
1990       } else { // the row length is the same
1991         rrowlength = lrowlength;
1992         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
1993           // The rows are not the same. Exclude the common prefix and compare
1994           // the rest of the two rows.
1995           int common = commonPrefix - ROW_LENGTH_SIZE;
1996           comparisonResult = compareRows(
1997               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
1998               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
1999         }
2000       }
2001       if (comparisonResult != 0) {
2002         return comparisonResult;
2003       }
2004 
2005       assert lrowlength == rrowlength;
2006       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2007           roffset, rlength, lrowlength);
2008     }
2009 
2010     /**
2011      * Compare columnFamily, qualifier, timestamp, and key type (everything
2012      * except the row). This method is used both in the normal comparator and
2013      * the "same-prefix" comparator. Note that we are assuming that row portions
2014      * of both KVs have already been parsed and found identical, and we don't
2015      * validate that assumption here.
2016      * @param commonPrefix
2017      *          the length of the common prefix of the two key-values being
2018      *          compared, including row length and row
2019      */
2020     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2021         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2022       /***
2023        * KeyValue Format and commonLength:
2024        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2025        * ------------------|-------commonLength--------|--------------
2026        */
2027       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2028 
2029       // commonLength + TIMESTAMP_TYPE_SIZE
2030       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2031       // ColumnFamily + Qualifier length.
2032       int lcolumnlength = llength - commonLengthWithTSAndType;
2033       int rcolumnlength = rlength - commonLengthWithTSAndType;
2034 
2035       byte ltype = left[loffset + (llength - 1)];
2036       byte rtype = right[roffset + (rlength - 1)];
2037 
2038       // If the column is not specified, the "minimum" key type appears the
2039       // latest in the sorted order, regardless of the timestamp. This is used
2040       // for specifying the last key/value in a given row, because there is no
2041       // "lexicographically last column" (it would be infinitely long). The
2042       // "maximum" key type does not need this behavior.
2043       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2044         // left is "bigger", i.e. it appears later in the sorted order
2045         return 1;
2046       }
2047       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2048         return -1;
2049       }
2050 
2051       int lfamilyoffset = commonLength + loffset;
2052       int rfamilyoffset = commonLength + roffset;
2053 
2054       // Column family length.
2055       int lfamilylength = left[lfamilyoffset - 1];
2056       int rfamilylength = right[rfamilyoffset - 1];
2057       // If left family size is not equal to right family size, we need not
2058       // compare the qualifiers.
2059       boolean sameFamilySize = (lfamilylength == rfamilylength);
2060       int common = 0;
2061       if (commonPrefix > 0) {
2062         common = Math.max(0, commonPrefix - commonLength);
2063         if (!sameFamilySize) {
2064           // Common should not be larger than Math.min(lfamilylength,
2065           // rfamilylength).
2066           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2067         } else {
2068           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2069         }
2070       }
2071       if (!sameFamilySize) {
2072         // comparing column family is enough.
2073         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2074             - common, right, rfamilyoffset + common, rfamilylength - common);
2075       }
2076       // Compare family & qualifier together.
2077       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2078           lcolumnlength - common, right, rfamilyoffset + common,
2079           rcolumnlength - common);
2080       if (comparison != 0) {
2081         return comparison;
2082       }
2083 
2084       ////
2085       // Next compare timestamps.
2086       long ltimestamp = Bytes.toLong(left,
2087           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2088       long rtimestamp = Bytes.toLong(right,
2089           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2090       int compare = compareTimestamps(ltimestamp, rtimestamp);
2091       if (compare != 0) {
2092         return compare;
2093       }
2094 
2095       // Compare types. Let the delete types sort ahead of puts; i.e. types
2096       // of higher numbers sort before those of lesser numbers. Maximum (255)
2097       // appears ahead of everything, and minimum (0) appears after
2098       // everything.
2099       return (0xff & rtype) - (0xff & ltype);
2100     }
2101 
2102     protected int compareFamilies(final byte[] left, final int loffset, final int lfamilylength,
2103         final byte[] right, final int roffset, final int rfamilylength) {
2104       int diff = Bytes.compareTo(left, loffset, lfamilylength, right, roffset, rfamilylength);
2105       return diff;
2106     }
2107 
2108     protected int compareColumns(final byte[] left, final int loffset, final int lquallength,
2109         final byte[] right, final int roffset, final int rquallength) {
2110       int diff = Bytes.compareTo(left, loffset, lquallength, right, roffset, rquallength);
2111       return diff;
2112     }
2113     /**
2114      * Compares the row and column of two keyvalues for equality
2115      * @param left
2116      * @param right
2117      * @return True if same row and column.
2118      */
2119     public boolean matchingRowColumn(final Cell left,
2120         final Cell right) {
2121       short lrowlength = left.getRowLength();
2122       short rrowlength = right.getRowLength();
2123 
2124       // TsOffset = end of column data. just comparing Row+CF length of each
2125       if ((left.getRowLength() + left.getFamilyLength() + left.getQualifierLength()) != (right
2126           .getRowLength() + right.getFamilyLength() + right.getQualifierLength())) {
2127         return false;
2128       }
2129 
2130       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2131         return false;
2132       }
2133 
2134       int lfoffset = left.getFamilyOffset();
2135       int rfoffset = right.getFamilyOffset();
2136       int lclength = left.getQualifierLength();
2137       int rclength = right.getQualifierLength();
2138       int lfamilylength = left.getFamilyLength();
2139       int rfamilylength = right.getFamilyLength();
2140       int diff = compareFamilies(left.getFamilyArray(), lfoffset, lfamilylength,
2141           right.getFamilyArray(), rfoffset, rfamilylength);
2142       if (diff != 0) {
2143         return false;
2144       } else {
2145         diff = compareColumns(left.getQualifierArray(), left.getQualifierOffset(), lclength,
2146             right.getQualifierArray(), right.getQualifierOffset(), rclength);
2147         return diff == 0;
2148       }
2149     }
2150 
2151     /**
2152      * Compares the row of two keyvalues for equality
2153      * @param left
2154      * @param right
2155      * @return True if rows match.
2156      */
2157     public boolean matchingRows(final Cell left, final Cell right) {
2158       short lrowlength = left.getRowLength();
2159       short rrowlength = right.getRowLength();
2160       return matchingRows(left, lrowlength, right, rrowlength);
2161     }
2162 
2163     /**
2164      * @param left
2165      * @param lrowlength
2166      * @param right
2167      * @param rrowlength
2168      * @return True if rows match.
2169      */
2170     private boolean matchingRows(final Cell left, final short lrowlength,
2171         final Cell right, final short rrowlength) {
2172       return lrowlength == rrowlength &&
2173           matchingRows(left.getRowArray(), left.getRowOffset(), lrowlength,
2174               right.getRowArray(), right.getRowOffset(), rrowlength);
2175     }
2176 
2177     /**
2178      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2179      * @param left Left row array.
2180      * @param loffset Left row offset.
2181      * @param llength Left row length.
2182      * @param right Right row array.
2183      * @param roffset Right row offset.
2184      * @param rlength Right row length.
2185      * @return Whether rows are the same row.
2186      */
2187     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2188         final byte [] right, final int roffset, final int rlength) {
2189       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2190     }
2191 
2192     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2193       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2194       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2195         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2196             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2197             + Bytes.toStringBinary(firstKeyInBlock));
2198         return firstKeyInBlock;
2199       }
2200       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2201         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2202             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2203             Bytes.toStringBinary(fakeKey));
2204         return firstKeyInBlock;
2205       }
2206       return fakeKey;
2207     }
2208 
2209     /**
2210      * This is a HFile block index key optimization.
2211      * @param leftKey
2212      * @param rightKey
2213      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2214      */
2215     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2216       if (rightKey == null) {
2217         throw new IllegalArgumentException("rightKey can not be null");
2218       }
2219       if (leftKey == null) {
2220         return Arrays.copyOf(rightKey, rightKey.length);
2221       }
2222       if (compareFlatKey(leftKey, rightKey) >= 0) {
2223         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2224           + ", rightKey:" + Bytes.toString(rightKey));
2225       }
2226 
2227       short leftRowLength = Bytes.toShort(leftKey, 0);
2228       short rightRowLength = Bytes.toShort(rightKey, 0);
2229       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2230       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2231       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2232       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2233       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2234       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2235       // rows are equal
2236       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2237         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2238         // Compare family & qualifier together.
2239         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2240           rightCommonLength, rightColumnLength);
2241         // same with "row + family + qualifier", return rightKey directly
2242         if (comparison == 0) {
2243           return Arrays.copyOf(rightKey, rightKey.length);
2244         }
2245         // "family + qualifier" are different, generate a faked key per rightKey
2246         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2247         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2248         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2249         return newKey;
2250       }
2251       // rows are different
2252       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2253       short diffIdx = 0;
2254       while (diffIdx < minLength
2255           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2256         diffIdx++;
2257       }
2258       byte[] newRowKey = null;
2259       if (diffIdx >= minLength) {
2260         // leftKey's row is prefix of rightKey's.
2261         newRowKey = new byte[diffIdx + 1];
2262         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2263       } else {
2264         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2265         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2266             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2267           newRowKey = new byte[diffIdx + 1];
2268           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2269           newRowKey[diffIdx] = (byte) (diffByte + 1);
2270         } else {
2271           newRowKey = new byte[diffIdx + 1];
2272           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2273         }
2274       }
2275       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2276         Type.Maximum).getKey();
2277     }
2278 
2279     @Override
2280     protected Object clone() throws CloneNotSupportedException {
2281       return new KVComparator();
2282     }
2283 
2284   }
2285 
2286   /**
2287    * @param b
2288    * @return A KeyValue made of a byte array that holds the key-only part.
2289    * Needed to convert hfile index members to KeyValues.
2290    */
2291   public static KeyValue createKeyValueFromKey(final byte [] b) {
2292     return createKeyValueFromKey(b, 0, b.length);
2293   }
2294 
2295   /**
2296    * @param bb
2297    * @return A KeyValue made of a byte buffer that holds the key-only part.
2298    * Needed to convert hfile index members to KeyValues.
2299    */
2300   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2301     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2302   }
2303 
2304   /**
2305    * @param b
2306    * @param o
2307    * @param l
2308    * @return A KeyValue made of a byte array that holds the key-only part.
2309    * Needed to convert hfile index members to KeyValues.
2310    */
2311   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2312       final int l) {
2313     byte [] newb = new byte[l + ROW_OFFSET];
2314     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2315     Bytes.putInt(newb, 0, l);
2316     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2317     return new KeyValue(newb);
2318   }
2319 
2320   /**
2321    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2322    * backing bytes copied from the steam.
2323    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2324    * of zero, we will return null which can be useful marking a stream as done.
2325    * @throws IOException
2326    */
2327   public static KeyValue create(final DataInput in) throws IOException {
2328     return create(in.readInt(), in);
2329   }
2330 
2331   /**
2332    * Create a KeyValue reading <code>length</code> from <code>in</code>
2333    * @param length
2334    * @param in
2335    * @return Created KeyValue OR if we find a length of zero, we will return null which
2336    * can be useful marking a stream as done.
2337    * @throws IOException
2338    */
2339   public static KeyValue create(int length, final DataInput in) throws IOException {
2340 
2341     if (length <= 0) {
2342       if (length == 0) return null;
2343       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2344     }
2345 
2346     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2347     byte [] bytes = new byte[length];
2348     in.readFully(bytes);
2349     return new KeyValue(bytes, 0, length);
2350   }
2351   
2352   /**
2353    * Create a new KeyValue by copying existing cell and adding new tags
2354    * @param c
2355    * @param newTags
2356    * @return a new KeyValue instance with new tags
2357    */
2358   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2359     List<Tag> existingTags = null;
2360     if(c.getTagsLength() > 0) {
2361       existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
2362       existingTags.addAll(newTags);
2363     } else {
2364       existingTags = newTags;
2365     }
2366     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2367       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
2368       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
2369       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
2370       c.getValueLength(), existingTags);
2371   }
2372 
2373   /**
2374    * Create a KeyValue reading from the raw InputStream.
2375    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2376    * @param in
2377    * @return Created KeyValue OR if we find a length of zero, we will return null which
2378    * can be useful marking a stream as done.
2379    * @throws IOException
2380    */
2381   public static KeyValue iscreate(final InputStream in) throws IOException {
2382     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2383     int bytesRead = 0;
2384     while (bytesRead < intBytes.length) {
2385       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2386       if (n < 0) {
2387         if (bytesRead == 0) return null; // EOF at start is ok
2388         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2389       }
2390       bytesRead += n;
2391     }
2392     // TODO: perhaps some sanity check is needed here.
2393     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2394     IOUtils.readFully(in, bytes, 0, bytes.length);
2395     return new KeyValue(bytes, 0, bytes.length);
2396   }
2397 
2398   /**
2399    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2400    * @param kv
2401    * @param out
2402    * @return Length written on stream
2403    * @throws IOException
2404    * @see #create(DataInput) for the inverse function
2405    */
2406   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2407     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2408     // work for all implementations.
2409     int length = kv.getLength();
2410     out.writeInt(length);
2411     out.write(kv.getBuffer(), kv.getOffset(), length);
2412     return length + Bytes.SIZEOF_INT;
2413   }
2414 
2415   /**
2416    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2417    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2418    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2419    * @param kv
2420    * @param out
2421    * @return Length written on stream
2422    * @throws IOException
2423    * @see #create(DataInput) for the inverse function
2424    * @see #write(KeyValue, DataOutput)
2425    */
2426   @Deprecated
2427   public static long oswrite(final KeyValue kv, final OutputStream out)
2428       throws IOException {
2429     int length = kv.getLength();
2430     // This does same as DataOuput#writeInt (big-endian, etc.)
2431     out.write(Bytes.toBytes(length));
2432     out.write(kv.getBuffer(), kv.getOffset(), length);
2433     return length + Bytes.SIZEOF_INT;
2434   }
2435 
2436   /**
2437    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2438    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2439    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2440    * @param kv
2441    * @param out
2442    * @param withTags
2443    * @return Length written on stream
2444    * @throws IOException
2445    * @see #create(DataInput) for the inverse function
2446    * @see #write(KeyValue, DataOutput)
2447    */
2448   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2449       throws IOException {
2450     int length = kv.getLength();
2451     if (!withTags) {
2452       length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2453     }
2454     // This does same as DataOuput#writeInt (big-endian, etc.)
2455     out.write(Bytes.toBytes(length));
2456     out.write(kv.getBuffer(), kv.getOffset(), length);
2457     return length + Bytes.SIZEOF_INT;
2458   }
2459 
2460   /**
2461    * Comparator that compares row component only of a KeyValue.
2462    */
2463   public static class RowOnlyComparator implements Comparator<KeyValue> {
2464     final KVComparator comparator;
2465 
2466     public RowOnlyComparator(final KVComparator c) {
2467       this.comparator = c;
2468     }
2469 
2470     public int compare(KeyValue left, KeyValue right) {
2471       return comparator.compareRows(left, right);
2472     }
2473   }
2474 
2475 
2476   /**
2477    * Avoids redundant comparisons for better performance.
2478    * 
2479    * TODO get rid of this wart
2480    */
2481   public interface SamePrefixComparator<T> {
2482     /**
2483      * Compare two keys assuming that the first n bytes are the same.
2484      * @param commonPrefix How many bytes are the same.
2485      */
2486     int compareIgnoringPrefix(
2487       int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
2488     );
2489   }
2490 
2491   /**
2492    * This is a TEST only Comparator used in TestSeekTo and TestReseekTo.
2493    */
2494   public static class RawBytesComparator extends KVComparator {
2495     /**
2496      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2497      * instantiate the appropriate comparator.
2498      * TODO: With V3 consider removing this.
2499      * @return legacy class name for FileFileTrailer#comparatorClassName
2500      */
2501     public String getLegacyKeyComparatorName() {
2502       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2503     }
2504 
2505     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2506         int roffset, int rlength) {
2507       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2508     }
2509 
2510     @Override
2511     public int compare(Cell left, Cell right) {
2512       return compareOnlyKeyPortion(left, right);
2513     }
2514 
2515     @VisibleForTesting
2516     public int compareOnlyKeyPortion(Cell left, Cell right) {
2517       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
2518           left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
2519       if (c != 0) {
2520         return c;
2521       }
2522       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getFamilyArray(), left.getFamilyOffset(),
2523           left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
2524           right.getFamilyLength());
2525       if (c != 0) {
2526         return c;
2527       }
2528       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getQualifierArray(), left.getQualifierOffset(),
2529           left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
2530           right.getQualifierLength());
2531       if (c != 0) {
2532         return c;
2533       }
2534       c = compareTimestamps(left.getTimestamp(), right.getTimestamp());
2535       if (c != 0) {
2536         return c;
2537       }
2538       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
2539     }
2540 
2541     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2542       return firstKeyInBlock;
2543     }
2544 
2545   }
2546 
2547   /**
2548    * HeapSize implementation
2549    *
2550    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2551    * MemStore.
2552    */
2553   @Override
2554   public long heapSize() {
2555     int sum = 0;
2556     sum += ClassSize.OBJECT;// the KeyValue object itself
2557     sum += ClassSize.REFERENCE;// pointer to "bytes"
2558     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2559     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2560     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2561     sum += Bytes.SIZEOF_LONG;// memstoreTS
2562     return ClassSize.align(sum);
2563   }
2564 
2565   /**
2566    * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[]
2567    * Mainly used in places where we need to compare two cells.  Avoids copying of bytes
2568    * In places like block index keys, we need to compare the key byte[] with a cell.
2569    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
2570    */
2571   public static class KeyOnlyKeyValue extends KeyValue {
2572     private int length = 0;
2573     private int offset = 0;
2574     private byte[] b;
2575 
2576     public KeyOnlyKeyValue() {
2577 
2578     }
2579 
2580     public KeyOnlyKeyValue(byte[] b, int offset, int length) {
2581       this.b = b;
2582       this.length = length;
2583       this.offset = offset;
2584     }
2585 
2586     @Override
2587     public int getKeyOffset() {
2588       return this.offset;
2589     }
2590 
2591     /**
2592      * A setter that helps to avoid object creation every time and whenever
2593      * there is a need to create new KeyOnlyKeyValue.
2594      * @param key
2595      * @param offset
2596      * @param length
2597      */
2598     public void setKey(byte[] key, int offset, int length) {
2599       this.b = key;
2600       this.offset = offset;
2601       this.length = length;
2602     }
2603 
2604     @Override
2605     public byte[] getKey() {
2606       int keylength = getKeyLength();
2607       byte[] key = new byte[keylength];
2608       System.arraycopy(this.b, getKeyOffset(), key, 0, keylength);
2609       return key;
2610     }
2611 
2612     @Override
2613     public byte[] getRowArray() {
2614       return b;
2615     }
2616 
2617     @Override
2618     public int getRowOffset() {
2619       return getKeyOffset() + Bytes.SIZEOF_SHORT;
2620     }
2621 
2622     @Override
2623     public byte[] getFamilyArray() {
2624       return b;
2625     }
2626 
2627     @Override
2628     public byte getFamilyLength() {
2629       return this.b[getFamilyOffset() - 1];
2630     }
2631 
2632     @Override
2633     public int getFamilyOffset() {
2634       return this.offset + Bytes.SIZEOF_SHORT + getRowLength() + Bytes.SIZEOF_BYTE;
2635     }
2636 
2637     @Override
2638     public byte[] getQualifierArray() {
2639       return b;
2640     }
2641 
2642     @Override
2643     public int getQualifierLength() {
2644       return getQualifierLength(getRowLength(), getFamilyLength());
2645     }
2646 
2647     @Override
2648     public int getQualifierOffset() {
2649       return getFamilyOffset() + getFamilyLength();
2650     }
2651 
2652     @Override
2653     public int getKeyLength() {
2654       return length;
2655     }
2656 
2657     @Override
2658     public short getRowLength() {
2659       return Bytes.toShort(this.b, getKeyOffset());
2660     }
2661 
2662     @Override
2663     public byte getTypeByte() {
2664       return this.b[this.offset + getKeyLength() - 1];
2665     }
2666 
2667     private int getQualifierLength(int rlength, int flength) {
2668       return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
2669     }
2670 
2671     @Override
2672     public long getTimestamp() {
2673       int tsOffset = getTimestampOffset();
2674       return Bytes.toLong(this.b, tsOffset);
2675     }
2676 
2677     @Override
2678     public int getTimestampOffset() {
2679       return getKeyOffset() + getKeyLength() - TIMESTAMP_TYPE_SIZE;
2680     }
2681 
2682     @Override
2683     public byte[] getTagsArray() {
2684       return HConstants.EMPTY_BYTE_ARRAY;
2685     }
2686 
2687     @Override
2688     public int getTagsOffset() {
2689       return 0;
2690     }
2691 
2692     @Override
2693     public byte[] getValueArray() {
2694       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2695     }
2696 
2697     @Override
2698     public int getValueOffset() {
2699       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2700     }
2701 
2702     @Override
2703     public int getValueLength() {
2704       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2705     }
2706 
2707     @Override
2708     public int getTagsLength() {
2709       return 0;
2710     }
2711 
2712     @Override
2713     public String toString() {
2714       if (this.b == null || this.b.length == 0) {
2715         return "empty";
2716       }
2717       return keyToString(this.b, this.offset, getKeyLength()) + "/vlen=0/mvcc=0";
2718     }
2719 
2720     @Override
2721     public int hashCode() {
2722       return super.hashCode();
2723     }
2724 
2725     @Override
2726     public boolean equals(Object other) {
2727       return super.equals(other);
2728     }
2729   }
2730 }