View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.io.HeapSize;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.IOUtils;
44  import org.apache.hadoop.io.RawComparator;
45  
46  import com.google.common.annotations.VisibleForTesting;
47  
48  /**
49   * An HBase Key/Value. This is the fundamental HBase Type.  
50   * <p>
51   * HBase applications and users should use the Cell interface and avoid directly using KeyValue
52   * and member functions not defined in Cell.
53   * <p>
54   * If being used client-side, the primary methods to access individual fields are {@link #getRow()},
55   * {@link #getFamily()}, {@link #getQualifier()}, {@link #getTimestamp()}, and {@link #getValue()}.
56   * These methods allocate new byte arrays and return copies. Avoid their use server-side.
57   * <p>
58   * Instances of this class are immutable. They do not implement Comparable but Comparators are
59   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
60   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
61   * Hfiles, and bloom filter keys.
62   * <p>
63   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
64   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
65   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code> Key is further decomposed as:
66   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier>
67   * &lt;timestamp> &lt;keytype></code>
68   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>, column family length maximum
69   * is <code>Byte.MAX_SIZE</code>, and column qualifier + key length must be <
70   * <code>Integer.MAX_SIZE</code>. The column does not contain the family/qualifier delimiter,
71   * {@link #COLUMN_FAMILY_DELIMITER}<br>
72   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
73   * the value part. The format for this part is: <code>&lt;tagslength>&lt;tagsbytes></code>.
74   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
75   * contain one or more tags where as each tag is of the form
76   * <code>&lt;taglength>&lt;tagtype>&lt;tagbytes></code>.  <code>tagtype</code> is one byte and
77   * <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type length
78   * and actual tag bytes length.
79   */
80  @InterfaceAudience.Private
81  public class KeyValue implements Cell, HeapSize, Cloneable {
82    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
83  
84    static final Log LOG = LogFactory.getLog(KeyValue.class);
85  
86    /**
87     * Colon character in UTF-8
88     */
89    public static final char COLUMN_FAMILY_DELIMITER = ':';
90  
91    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
92      new byte[]{COLUMN_FAMILY_DELIMITER};
93  
94    /**
95     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
96     * of KeyValue only.
97     */
98    public static final KVComparator COMPARATOR = new KVComparator();
99    /**
100    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
101    * {@link KeyValue}s.
102    */
103   public static final KVComparator META_COMPARATOR = new MetaComparator();
104 
105   /**
106    * Needed for Bloom Filters.
107    */
108   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
109 
110   /** Size of the key length field in bytes*/
111   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
112 
113   /** Size of the key type field in bytes */
114   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
115 
116   /** Size of the row length field in bytes */
117   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
118 
119   /** Size of the family length field in bytes */
120   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
121 
122   /** Size of the timestamp field in bytes */
123   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
124 
125   // Size of the timestamp and type byte on end of a key -- a long + a byte.
126   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
127 
128   // Size of the length shorts and bytes in key.
129   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
130       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
131 
132   // How far into the key the row starts at. First thing to read is the short
133   // that says how long the row is.
134   public static final int ROW_OFFSET =
135     Bytes.SIZEOF_INT /*keylength*/ +
136     Bytes.SIZEOF_INT /*valuelength*/;
137 
138   // Size of the length ints in a KeyValue datastructure.
139   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
140 
141   /** Size of the tags length field in bytes */
142   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
143 
144   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
145 
146 
147   /**
148    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
149    * characteristics would take up for its underlying data structure.
150    *
151    * @param rlength row length
152    * @param flength family length
153    * @param qlength qualifier length
154    * @param vlength value length
155    *
156    * @return the <code>KeyValue</code> data structure length
157    */
158   public static long getKeyValueDataStructureSize(int rlength,
159       int flength, int qlength, int vlength) {
160     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
161         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
162   }
163 
164   /**
165    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
166    * characteristics would take up for its underlying data structure.
167    *
168    * @param rlength row length
169    * @param flength family length
170    * @param qlength qualifier length
171    * @param vlength value length
172    * @param tagsLength total length of the tags
173    *
174    * @return the <code>KeyValue</code> data structure length
175    */
176   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
177       int vlength, int tagsLength) {
178     if (tagsLength == 0) {
179       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
180     }
181     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
182         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
183   }
184 
185   /**
186    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
187    * characteristics would take up for its underlying data structure.
188    *
189    * @param klength key length
190    * @param vlength value length
191    * @param tagsLength total length of the tags
192    *
193    * @return the <code>KeyValue</code> data structure length
194    */
195   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
196     if (tagsLength == 0) {
197       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
198     }
199     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
200   }
201 
202   /**
203    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
204    * characteristics would take up in its underlying data structure for the key.
205    *
206    * @param rlength row length
207    * @param flength family length
208    * @param qlength qualifier length
209    *
210    * @return the key data structure length
211    */
212   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
213     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
214   }
215 
216   /**
217    * Key type.
218    * Has space for other key types to be added later.  Cannot rely on
219    * enum ordinals . They change if item is removed or moved.  Do our own codes.
220    */
221   public static enum Type {
222     Minimum((byte)0),
223     Put((byte)4),
224 
225     Delete((byte)8),
226     DeleteFamilyVersion((byte)10),
227     DeleteColumn((byte)12),
228     DeleteFamily((byte)14),
229 
230     // Maximum is used when searching; you look from maximum on down.
231     Maximum((byte)255);
232 
233     private final byte code;
234 
235     Type(final byte c) {
236       this.code = c;
237     }
238 
239     public byte getCode() {
240       return this.code;
241     }
242 
243     /**
244      * Cannot rely on enum ordinals . They change if item is removed or moved.
245      * Do our own codes.
246      * @param b
247      * @return Type associated with passed code.
248      */
249     public static Type codeToType(final byte b) {
250       for (Type t : Type.values()) {
251         if (t.getCode() == b) {
252           return t;
253         }
254       }
255       throw new RuntimeException("Unknown code " + b);
256     }
257   }
258 
259   /**
260    * Lowest possible key.
261    * Makes a Key with highest possible Timestamp, empty row and column.  No
262    * key can be equal or lower than this one in memstore or in store file.
263    */
264   public static final KeyValue LOWESTKEY =
265     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
266 
267   ////
268   // KeyValue core instance fields.
269   private byte [] bytes = null;  // an immutable byte array that contains the KV
270   private int offset = 0;  // offset into bytes buffer KV starts at
271   private int length = 0;  // length of the KV starting from offset.
272 
273   /**
274    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
275    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
276    * KeyValue type.
277    */
278   public static boolean isDelete(byte t) {
279     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
280   }
281 
282   /** Here be dragons **/
283 
284   // used to achieve atomic operations in the memstore.
285   @Override
286   public long getMvccVersion() {
287     return mvcc;
288   }
289 
290   public void setMvccVersion(long mvccVersion){
291     this.mvcc = mvccVersion;
292   }
293 
294   // multi-version concurrency control version.  default value is 0, aka do not care.
295   private long mvcc = 0;  // this value is not part of a serialized KeyValue (not in HFiles)
296 
297   /** Dragon time over, return to normal business */
298 
299 
300   /** Writable Constructor -- DO NOT USE */
301   public KeyValue() {}
302 
303   /**
304    * Creates a KeyValue from the start of the specified byte array.
305    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
306    * @param bytes byte array
307    */
308   public KeyValue(final byte [] bytes) {
309     this(bytes, 0);
310   }
311 
312   /**
313    * Creates a KeyValue from the specified byte array and offset.
314    * Presumes <code>bytes</code> content starting at <code>offset</code> is
315    * formatted as a KeyValue blob.
316    * @param bytes byte array
317    * @param offset offset to start of KeyValue
318    */
319   public KeyValue(final byte [] bytes, final int offset) {
320     this(bytes, offset, getLength(bytes, offset));
321   }
322 
323   /**
324    * Creates a KeyValue from the specified byte array, starting at offset, and
325    * for length <code>length</code>.
326    * @param bytes byte array
327    * @param offset offset to start of the KeyValue
328    * @param length length of the KeyValue
329    */
330   public KeyValue(final byte [] bytes, final int offset, final int length) {
331     this.bytes = bytes;
332     this.offset = offset;
333     this.length = length;
334   }
335 
336   /**
337    * Creates a KeyValue from the specified byte array, starting at offset, and
338    * for length <code>length</code>.
339    *
340    * @param bytes  byte array
341    * @param offset offset to start of the KeyValue
342    * @param length length of the KeyValue
343    * @param ts
344    */
345   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
346     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
347   }
348 
349   /** Constructors that build a new backing byte array from fields */
350 
351   /**
352    * Constructs KeyValue structure filled with null value.
353    * Sets type to {@link KeyValue.Type#Maximum}
354    * @param row - row key (arbitrary byte array)
355    * @param timestamp
356    */
357   public KeyValue(final byte [] row, final long timestamp) {
358     this(row, null, null, timestamp, Type.Maximum, null);
359   }
360 
361   /**
362    * Constructs KeyValue structure filled with null value.
363    * @param row - row key (arbitrary byte array)
364    * @param timestamp
365    */
366   public KeyValue(final byte [] row, final long timestamp, Type type) {
367     this(row, null, null, timestamp, type, null);
368   }
369 
370   /**
371    * Constructs KeyValue structure filled with null value.
372    * Sets type to {@link KeyValue.Type#Maximum}
373    * @param row - row key (arbitrary byte array)
374    * @param family family name
375    * @param qualifier column qualifier
376    */
377   public KeyValue(final byte [] row, final byte [] family,
378       final byte [] qualifier) {
379     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
380   }
381 
382   /**
383    * Constructs KeyValue structure filled with null value.
384    * @param row - row key (arbitrary byte array)
385    * @param family family name
386    * @param qualifier column qualifier
387    */
388   public KeyValue(final byte [] row, final byte [] family,
389       final byte [] qualifier, final byte [] value) {
390     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
391   }
392 
393   /**
394    * Constructs KeyValue structure filled with specified values.
395    * @param row row key
396    * @param family family name
397    * @param qualifier column qualifier
398    * @param timestamp version timestamp
399    * @param type key type
400    * @throws IllegalArgumentException
401    */
402   public KeyValue(final byte[] row, final byte[] family,
403       final byte[] qualifier, final long timestamp, Type type) {
404     this(row, family, qualifier, timestamp, type, null);
405   }
406 
407   /**
408    * Constructs KeyValue structure filled with specified values.
409    * @param row row key
410    * @param family family name
411    * @param qualifier column qualifier
412    * @param timestamp version timestamp
413    * @param value column value
414    * @throws IllegalArgumentException
415    */
416   public KeyValue(final byte[] row, final byte[] family,
417       final byte[] qualifier, final long timestamp, final byte[] value) {
418     this(row, family, qualifier, timestamp, Type.Put, value);
419   }
420 
421   /**
422    * Constructs KeyValue structure filled with specified values.
423    * @param row row key
424    * @param family family name
425    * @param qualifier column qualifier
426    * @param timestamp version timestamp
427    * @param value column value
428    * @param tags tags
429    * @throws IllegalArgumentException
430    */
431   public KeyValue(final byte[] row, final byte[] family,
432       final byte[] qualifier, final long timestamp, final byte[] value,
433       final Tag[] tags) {
434     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
435   }
436 
437   /**
438    * Constructs KeyValue structure filled with specified values.
439    * @param row row key
440    * @param family family name
441    * @param qualifier column qualifier
442    * @param timestamp version timestamp
443    * @param value column value
444    * @param tags tags non-empty list of tags or null
445    * @throws IllegalArgumentException
446    */
447   public KeyValue(final byte[] row, final byte[] family,
448       final byte[] qualifier, final long timestamp, final byte[] value,
449       final List<Tag> tags) {
450     this(row, 0, row==null ? 0 : row.length,
451       family, 0, family==null ? 0 : family.length,
452       qualifier, 0, qualifier==null ? 0 : qualifier.length,
453       timestamp, Type.Put,
454       value, 0, value==null ? 0 : value.length, tags);
455   }
456 
457   /**
458    * Constructs KeyValue structure filled with specified values.
459    * @param row row key
460    * @param family family name
461    * @param qualifier column qualifier
462    * @param timestamp version timestamp
463    * @param type key type
464    * @param value column value
465    * @throws IllegalArgumentException
466    */
467   public KeyValue(final byte[] row, final byte[] family,
468       final byte[] qualifier, final long timestamp, Type type,
469       final byte[] value) {
470     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
471         timestamp, type,   value, 0, len(value));
472   }
473 
474   /**
475    * Constructs KeyValue structure filled with specified values.
476    * <p>
477    * Column is split into two fields, family and qualifier.
478    * @param row row key
479    * @param family family name
480    * @param qualifier column qualifier
481    * @param timestamp version timestamp
482    * @param type key type
483    * @param value column value
484    * @throws IllegalArgumentException
485    */
486   public KeyValue(final byte[] row, final byte[] family,
487       final byte[] qualifier, final long timestamp, Type type,
488       final byte[] value, final List<Tag> tags) {
489     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
490         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
491   }
492 
493   /**
494    * Constructs KeyValue structure filled with specified values.
495    * @param row row key
496    * @param family family name
497    * @param qualifier column qualifier
498    * @param timestamp version timestamp
499    * @param type key type
500    * @param value column value
501    * @throws IllegalArgumentException
502    */
503   public KeyValue(final byte[] row, final byte[] family,
504       final byte[] qualifier, final long timestamp, Type type,
505       final byte[] value, final byte[] tags) {
506     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
507         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
508   }
509 
510   /**
511    * Constructs KeyValue structure filled with specified values.
512    * @param row row key
513    * @param family family name
514    * @param qualifier column qualifier
515    * @param qoffset qualifier offset
516    * @param qlength qualifier length
517    * @param timestamp version timestamp
518    * @param type key type
519    * @param value column value
520    * @param voffset value offset
521    * @param vlength value length
522    * @throws IllegalArgumentException
523    */
524   public KeyValue(byte [] row, byte [] family,
525       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
526       byte [] value, int voffset, int vlength, List<Tag> tags) {
527     this(row, 0, row==null ? 0 : row.length,
528         family, 0, family==null ? 0 : family.length,
529         qualifier, qoffset, qlength, timestamp, type,
530         value, voffset, vlength, tags);
531   }
532 
533   /**
534    * @param row
535    * @param family
536    * @param qualifier
537    * @param qoffset
538    * @param qlength
539    * @param timestamp
540    * @param type
541    * @param value
542    * @param voffset
543    * @param vlength
544    * @param tags
545    */
546   public KeyValue(byte [] row, byte [] family,
547       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
548       byte [] value, int voffset, int vlength, byte[] tags) {
549     this(row, 0, row==null ? 0 : row.length,
550         family, 0, family==null ? 0 : family.length,
551         qualifier, qoffset, qlength, timestamp, type,
552         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
553   }
554 
555   /**
556    * Constructs KeyValue structure filled with specified values.
557    * <p>
558    * Column is split into two fields, family and qualifier.
559    * @param row row key
560    * @throws IllegalArgumentException
561    */
562   public KeyValue(final byte [] row, final int roffset, final int rlength,
563       final byte [] family, final int foffset, final int flength,
564       final byte [] qualifier, final int qoffset, final int qlength,
565       final long timestamp, final Type type,
566       final byte [] value, final int voffset, final int vlength) {
567     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
568       qlength, timestamp, type, value, voffset, vlength, null);
569   }
570   
571   /**
572    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
573    * data buffer.
574    * <p>
575    * Column is split into two fields, family and qualifier.
576    *
577    * @param buffer the bytes buffer to use
578    * @param boffset buffer offset
579    * @param row row key
580    * @param roffset row offset
581    * @param rlength row length
582    * @param family family name
583    * @param foffset family offset
584    * @param flength family length
585    * @param qualifier column qualifier
586    * @param qoffset qualifier offset
587    * @param qlength qualifier length
588    * @param timestamp version timestamp
589    * @param type key type
590    * @param value column value
591    * @param voffset value offset
592    * @param vlength value length
593    * @param tags non-empty list of tags or null
594    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
595    * remaining in the buffer
596    */
597   public KeyValue(byte [] buffer, final int boffset,
598       final byte [] row, final int roffset, final int rlength,
599       final byte [] family, final int foffset, final int flength,
600       final byte [] qualifier, final int qoffset, final int qlength,
601       final long timestamp, final Type type,
602       final byte [] value, final int voffset, final int vlength,
603       final Tag[] tags) {
604      this.bytes  = buffer;
605      this.length = writeByteArray(buffer, boffset,
606          row, roffset, rlength,
607          family, foffset, flength, qualifier, qoffset, qlength,
608         timestamp, type, value, voffset, vlength, tags);
609      this.offset = boffset;
610    }
611 
612   /**
613    * Constructs KeyValue structure filled with specified values.
614    * <p>
615    * Column is split into two fields, family and qualifier.
616    * @param row row key
617    * @param roffset row offset
618    * @param rlength row length
619    * @param family family name
620    * @param foffset family offset
621    * @param flength family length
622    * @param qualifier column qualifier
623    * @param qoffset qualifier offset
624    * @param qlength qualifier length
625    * @param timestamp version timestamp
626    * @param type key type
627    * @param value column value
628    * @param voffset value offset
629    * @param vlength value length
630    * @param tags tags
631    * @throws IllegalArgumentException
632    */
633   public KeyValue(final byte [] row, final int roffset, final int rlength,
634       final byte [] family, final int foffset, final int flength,
635       final byte [] qualifier, final int qoffset, final int qlength,
636       final long timestamp, final Type type,
637       final byte [] value, final int voffset, final int vlength,
638       final List<Tag> tags) {
639     this.bytes = createByteArray(row, roffset, rlength,
640         family, foffset, flength, qualifier, qoffset, qlength,
641         timestamp, type, value, voffset, vlength, tags);
642     this.length = bytes.length;
643     this.offset = 0;
644   }
645 
646   /**
647    * @param row
648    * @param roffset
649    * @param rlength
650    * @param family
651    * @param foffset
652    * @param flength
653    * @param qualifier
654    * @param qoffset
655    * @param qlength
656    * @param timestamp
657    * @param type
658    * @param value
659    * @param voffset
660    * @param vlength
661    * @param tags
662    */
663   public KeyValue(final byte [] row, final int roffset, final int rlength,
664       final byte [] family, final int foffset, final int flength,
665       final byte [] qualifier, final int qoffset, final int qlength,
666       final long timestamp, final Type type,
667       final byte [] value, final int voffset, final int vlength,
668       final byte[] tags, final int tagsOffset, final int tagsLength) {
669     this.bytes = createByteArray(row, roffset, rlength,
670         family, foffset, flength, qualifier, qoffset, qlength,
671         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
672     this.length = bytes.length;
673     this.offset = 0;
674   }
675 
676   /**
677    * Constructs an empty KeyValue structure, with specified sizes.
678    * This can be used to partially fill up KeyValues.
679    * <p>
680    * Column is split into two fields, family and qualifier.
681    * @param rlength row length
682    * @param flength family length
683    * @param qlength qualifier length
684    * @param timestamp version timestamp
685    * @param type key type
686    * @param vlength value length
687    * @throws IllegalArgumentException
688    */
689   public KeyValue(final int rlength,
690       final int flength,
691       final int qlength,
692       final long timestamp, final Type type,
693       final int vlength) {
694     this(rlength, flength, qlength, timestamp, type, vlength, 0);
695   }
696 
697   /**
698    * Constructs an empty KeyValue structure, with specified sizes.
699    * This can be used to partially fill up KeyValues.
700    * <p>
701    * Column is split into two fields, family and qualifier.
702    * @param rlength row length
703    * @param flength family length
704    * @param qlength qualifier length
705    * @param timestamp version timestamp
706    * @param type key type
707    * @param vlength value length
708    * @param tagsLength
709    * @throws IllegalArgumentException
710    */
711   public KeyValue(final int rlength,
712       final int flength,
713       final int qlength,
714       final long timestamp, final Type type,
715       final int vlength, final int tagsLength) {
716     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
717         tagsLength);
718     this.length = bytes.length;
719     this.offset = 0;
720   }
721 
722 
723   public KeyValue(byte[] row, int roffset, int rlength,
724                   byte[] family, int foffset, int flength,
725                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
726     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
727         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
728         value, 0, value == null ? 0 : value.remaining(), tags);
729     this.length = bytes.length;
730     this.offset = 0;
731   }
732 
733   public KeyValue(Cell c) {
734     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
735         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
736         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
737         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
738         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
739   }
740 
741   /**
742    * Create an empty byte[] representing a KeyValue
743    * All lengths are preset and can be filled in later.
744    * @param rlength
745    * @param flength
746    * @param qlength
747    * @param timestamp
748    * @param type
749    * @param vlength
750    * @return The newly created byte array.
751    */
752   private static byte[] createEmptyByteArray(final int rlength, int flength,
753       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
754     if (rlength > Short.MAX_VALUE) {
755       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
756     }
757     if (flength > Byte.MAX_VALUE) {
758       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
759     }
760     // Qualifier length
761     if (qlength > Integer.MAX_VALUE - rlength - flength) {
762       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
763     }
764     checkForTagsLength(tagsLength);
765     // Key length
766     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
767     if (longkeylength > Integer.MAX_VALUE) {
768       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
769         Integer.MAX_VALUE);
770     }
771     int keylength = (int)longkeylength;
772     // Value length
773     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
774       throw new IllegalArgumentException("Valuer > " +
775           HConstants.MAXIMUM_VALUE_LENGTH);
776     }
777 
778     // Allocate right-sized byte array.
779     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
780         tagsLength)];
781     // Write the correct size markers
782     int pos = 0;
783     pos = Bytes.putInt(bytes, pos, keylength);
784     pos = Bytes.putInt(bytes, pos, vlength);
785     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
786     pos += rlength;
787     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
788     pos += flength + qlength;
789     pos = Bytes.putLong(bytes, pos, timestamp);
790     pos = Bytes.putByte(bytes, pos, type.getCode());
791     pos += vlength;
792     if (tagsLength > 0) {
793       pos = Bytes.putShort(bytes, pos, (short)(tagsLength & 0x0000ffff));
794     }
795     return bytes;
796   }
797 
798   /**
799    * Checks the parameters passed to a constructor.
800    *
801    * @param row row key
802    * @param rlength row length
803    * @param family family name
804    * @param flength family length
805    * @param qlength qualifier length
806    * @param vlength value length
807    *
808    * @throws IllegalArgumentException an illegal value was passed
809    */
810   private static void checkParameters(final byte [] row, final int rlength,
811       final byte [] family, int flength, int qlength, int vlength)
812           throws IllegalArgumentException {
813     if (rlength > Short.MAX_VALUE) {
814       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
815     }
816     if (row == null) {
817       throw new IllegalArgumentException("Row is null");
818     }
819     // Family length
820     flength = family == null ? 0 : flength;
821     if (flength > Byte.MAX_VALUE) {
822       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
823     }
824     // Qualifier length
825     if (qlength > Integer.MAX_VALUE - rlength - flength) {
826       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
827     }
828     // Key length
829     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
830     if (longKeyLength > Integer.MAX_VALUE) {
831       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
832           Integer.MAX_VALUE);
833     }
834     // Value length
835     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
836       throw new IllegalArgumentException("Value length " + vlength + " > " +
837           HConstants.MAXIMUM_VALUE_LENGTH);
838     }
839   }
840 
841   /**
842    * Write KeyValue format into the provided byte array.
843    *
844    * @param buffer the bytes buffer to use
845    * @param boffset buffer offset
846    * @param row row key
847    * @param roffset row offset
848    * @param rlength row length
849    * @param family family name
850    * @param foffset family offset
851    * @param flength family length
852    * @param qualifier column qualifier
853    * @param qoffset qualifier offset
854    * @param qlength qualifier length
855    * @param timestamp version timestamp
856    * @param type key type
857    * @param value column value
858    * @param voffset value offset
859    * @param vlength value length
860    *
861    * @return The number of useful bytes in the buffer.
862    *
863    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
864    * remaining in the buffer
865    */
866   public static int writeByteArray(byte [] buffer, final int boffset,
867       final byte [] row, final int roffset, final int rlength,
868       final byte [] family, final int foffset, int flength,
869       final byte [] qualifier, final int qoffset, int qlength,
870       final long timestamp, final Type type,
871       final byte [] value, final int voffset, int vlength, Tag[] tags) {
872 
873     checkParameters(row, rlength, family, flength, qlength, vlength);
874 
875     // Calculate length of tags area
876     int tagsLength = 0;
877     if (tags != null && tags.length > 0) {
878       for (Tag t: tags) {
879         tagsLength += t.getLength();
880       }
881     }
882     checkForTagsLength(tagsLength);
883     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
884     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
885         tagsLength);
886     if (keyValueLength > buffer.length - boffset) {
887       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
888           keyValueLength);
889     }
890 
891     // Write key, value and key row length.
892     int pos = boffset;
893     pos = Bytes.putInt(buffer, pos, keyLength);
894     pos = Bytes.putInt(buffer, pos, vlength);
895     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
896     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
897     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
898     if (flength != 0) {
899       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
900     }
901     if (qlength != 0) {
902       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
903     }
904     pos = Bytes.putLong(buffer, pos, timestamp);
905     pos = Bytes.putByte(buffer, pos, type.getCode());
906     if (value != null && value.length > 0) {
907       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
908     }
909     // Write the number of tags. If it is 0 then it means there are no tags.
910     if (tagsLength > 0) {
911       pos = Bytes.putShort(buffer, pos, (short) tagsLength);
912       for (Tag t : tags) {
913         pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
914       }
915     }
916     return keyValueLength;
917   }
918 
919   private static void checkForTagsLength(int tagsLength) {
920     if (tagsLength > Short.MAX_VALUE) {
921       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + Short.MAX_VALUE);
922     }
923   }
924 
925   /**
926    * Write KeyValue format into a byte array.
927    * @param row row key
928    * @param roffset row offset
929    * @param rlength row length
930    * @param family family name
931    * @param foffset family offset
932    * @param flength family length
933    * @param qualifier column qualifier
934    * @param qoffset qualifier offset
935    * @param qlength qualifier length
936    * @param timestamp version timestamp
937    * @param type key type
938    * @param value column value
939    * @param voffset value offset
940    * @param vlength value length
941    * @return The newly created byte array.
942    */
943   private static byte [] createByteArray(final byte [] row, final int roffset,
944       final int rlength, final byte [] family, final int foffset, int flength,
945       final byte [] qualifier, final int qoffset, int qlength,
946       final long timestamp, final Type type,
947       final byte [] value, final int voffset, 
948       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
949 
950     checkParameters(row, rlength, family, flength, qlength, vlength);
951     checkForTagsLength(tagsLength);
952     // Allocate right-sized byte array.
953     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
954     byte [] bytes =
955         new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
956     // Write key, value and key row length.
957     int pos = 0;
958     pos = Bytes.putInt(bytes, pos, keyLength);
959     pos = Bytes.putInt(bytes, pos, vlength);
960     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
961     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
962     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
963     if(flength != 0) {
964       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
965     }
966     if(qlength != 0) {
967       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
968     }
969     pos = Bytes.putLong(bytes, pos, timestamp);
970     pos = Bytes.putByte(bytes, pos, type.getCode());
971     if (value != null && value.length > 0) {
972       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
973     }
974     // Add the tags after the value part
975     if (tagsLength > 0) {
976       pos = Bytes.putShort(bytes, pos, (short) (tagsLength));
977       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
978     }
979     return bytes;
980   }
981 
982   /**
983    * @param qualifier can be a ByteBuffer or a byte[], or null.
984    * @param value can be a ByteBuffer or a byte[], or null.
985    */
986   private static byte [] createByteArray(final byte [] row, final int roffset,
987       final int rlength, final byte [] family, final int foffset, int flength,
988       final Object qualifier, final int qoffset, int qlength,
989       final long timestamp, final Type type,
990       final Object value, final int voffset, int vlength, List<Tag> tags) {
991 
992     checkParameters(row, rlength, family, flength, qlength, vlength);
993 
994     // Calculate length of tags area
995     int tagsLength = 0;
996     if (tags != null && !tags.isEmpty()) {
997       for (Tag t : tags) {
998         tagsLength += t.getLength();
999       }
1000     }
1001     checkForTagsLength(tagsLength);
1002     // Allocate right-sized byte array.
1003     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1004     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1005         tagsLength)];
1006 
1007     // Write key, value and key row length.
1008     int pos = 0;
1009     pos = Bytes.putInt(bytes, pos, keyLength);
1010 
1011     pos = Bytes.putInt(bytes, pos, vlength);
1012     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1013     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1014     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1015     if(flength != 0) {
1016       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1017     }
1018     if (qlength > 0) {
1019       if (qualifier instanceof ByteBuffer) {
1020         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1021       } else {
1022         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1023       }
1024     }
1025     pos = Bytes.putLong(bytes, pos, timestamp);
1026     pos = Bytes.putByte(bytes, pos, type.getCode());
1027     if (vlength > 0) {
1028       if (value instanceof ByteBuffer) {
1029         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1030       } else {
1031         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1032       }
1033     }
1034     // Add the tags after the value part
1035     if (tagsLength > 0) {
1036       pos = Bytes.putShort(bytes, pos, (short) (tagsLength));
1037       for (Tag t : tags) {
1038         pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
1039       }
1040     }
1041     return bytes;
1042   }
1043 
1044   /**
1045    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1046    */
1047   @Override
1048   public boolean equals(Object other) {
1049     if (!(other instanceof Cell)) {
1050       return false;
1051     }
1052     return CellComparator.equals(this, (Cell)other);
1053   }
1054 
1055   @Override
1056   public int hashCode() {
1057     byte[] b = getBuffer();
1058     int start = getOffset(), end = getOffset() + getLength();
1059     int h = b[start++];
1060     for (int i = start; i < end; i++) {
1061       h = (h * 13) ^ b[i];
1062     }
1063     return h;
1064   }
1065 
1066   //---------------------------------------------------------------------------
1067   //
1068   //  KeyValue cloning
1069   //
1070   //---------------------------------------------------------------------------
1071 
1072   /**
1073    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1074    * @return Fully copied clone of this KeyValue
1075    * @throws CloneNotSupportedException
1076    */
1077   @Override
1078   public KeyValue clone() throws CloneNotSupportedException {
1079     super.clone();
1080     byte [] b = new byte[this.length];
1081     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1082     KeyValue ret = new KeyValue(b, 0, b.length);
1083     // Important to clone the memstoreTS as well - otherwise memstore's
1084     // update-in-place methods (eg increment) will end up creating
1085     // new entries
1086     ret.setMvccVersion(mvcc);
1087     return ret;
1088   }
1089 
1090   /**
1091    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1092    * http://en.wikipedia.org/wiki/Object_copy
1093    * @return Shallow copy of this KeyValue
1094    */
1095   public KeyValue shallowCopy() {
1096     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1097     shallowCopy.setMvccVersion(this.mvcc);
1098     return shallowCopy;
1099   }
1100 
1101   //---------------------------------------------------------------------------
1102   //
1103   //  String representation
1104   //
1105   //---------------------------------------------------------------------------
1106 
1107   public String toString() {
1108     if (this.bytes == null || this.bytes.length == 0) {
1109       return "empty";
1110     }
1111     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
1112       "/vlen=" + getValueLength() + "/mvcc=" + mvcc;
1113   }
1114 
1115   /**
1116    * @param k Key portion of a KeyValue.
1117    * @return Key as a String, empty string if k is null. 
1118    */
1119   public static String keyToString(final byte [] k) {
1120     if (k == null) { 
1121       return "";
1122     }
1123     return keyToString(k, 0, k.length);
1124   }
1125 
1126   /**
1127    * Produces a string map for this key/value pair. Useful for programmatic use
1128    * and manipulation of the data stored in an HLogKey, for example, printing
1129    * as JSON. Values are left out due to their tendency to be large. If needed,
1130    * they can be added manually.
1131    *
1132    * @return the Map<String,?> containing data from this key
1133    */
1134   public Map<String, Object> toStringMap() {
1135     Map<String, Object> stringMap = new HashMap<String, Object>();
1136     stringMap.put("row", Bytes.toStringBinary(getRow()));
1137     stringMap.put("family", Bytes.toStringBinary(getFamily()));
1138     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
1139     stringMap.put("timestamp", getTimestamp());
1140     stringMap.put("vlen", getValueLength());
1141     List<Tag> tags = getTags();
1142     if (tags != null) {
1143       List<String> tagsString = new ArrayList<String>();
1144       for (Tag t : tags) {
1145         tagsString.add((t.getType()) + ":" +Bytes.toStringBinary(t.getValue()));
1146       }
1147       stringMap.put("tag", tagsString);
1148     }
1149     return stringMap;
1150   }
1151 
1152   /**
1153    * Use for logging.
1154    * @param b Key portion of a KeyValue.
1155    * @param o Offset to start of key
1156    * @param l Length of key.
1157    * @return Key as a String.
1158    */
1159   public static String keyToString(final byte [] b, final int o, final int l) {
1160     if (b == null) return "";
1161     int rowlength = Bytes.toShort(b, o);
1162     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1163     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1164     int familylength = b[columnoffset - 1];
1165     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1166     String family = familylength == 0? "":
1167       Bytes.toStringBinary(b, columnoffset, familylength);
1168     String qualifier = columnlength == 0? "":
1169       Bytes.toStringBinary(b, columnoffset + familylength,
1170       columnlength - familylength);
1171     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1172     String timestampStr = humanReadableTimestamp(timestamp);
1173     byte type = b[o + l - 1];
1174     return row + "/" + family +
1175       (family != null && family.length() > 0? ":" :"") +
1176       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1177   }
1178 
1179   public static String humanReadableTimestamp(final long timestamp) {
1180     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1181       return "LATEST_TIMESTAMP";
1182     }
1183     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1184       return "OLDEST_TIMESTAMP";
1185     }
1186     return String.valueOf(timestamp);
1187   }
1188 
1189   //---------------------------------------------------------------------------
1190   //
1191   //  Public Member Accessors
1192   //
1193   //---------------------------------------------------------------------------
1194 
1195   /**
1196    * @return The byte array backing this KeyValue.
1197    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1198    */
1199   @Deprecated
1200   public byte [] getBuffer() {
1201     return this.bytes;
1202   }
1203 
1204   /**
1205    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1206    */
1207   public int getOffset() {
1208     return this.offset;
1209   }
1210 
1211   /**
1212    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1213    */
1214   public int getLength() {
1215     return length;
1216   }
1217 
1218   //---------------------------------------------------------------------------
1219   //
1220   //  Length and Offset Calculators
1221   //
1222   //---------------------------------------------------------------------------
1223 
1224   /**
1225    * Determines the total length of the KeyValue stored in the specified
1226    * byte array and offset.  Includes all headers.
1227    * @param bytes byte array
1228    * @param offset offset to start of the KeyValue
1229    * @return length of entire KeyValue, in bytes
1230    */
1231   private static int getLength(byte [] bytes, int offset) {
1232     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1233     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1234     return klength + vlength;
1235   }
1236 
1237   /**
1238    * @return Key offset in backing buffer..
1239    */
1240   public int getKeyOffset() {
1241     return this.offset + ROW_OFFSET;
1242   }
1243 
1244   public String getKeyString() {
1245     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1246   }
1247 
1248   /**
1249    * @return Length of key portion.
1250    */
1251   public int getKeyLength() {
1252     return Bytes.toInt(this.bytes, this.offset);
1253   }
1254 
1255   /**
1256    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1257    */
1258   @Override
1259   public byte[] getValueArray() {
1260     return bytes;
1261   }
1262 
1263   /**
1264    * @return the value offset
1265    */
1266   @Override
1267   public int getValueOffset() {
1268     int voffset = getKeyOffset() + getKeyLength();
1269     return voffset;
1270   }
1271 
1272   /**
1273    * @return Value length
1274    */
1275   @Override
1276   public int getValueLength() {
1277     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1278     return vlength;
1279   }
1280 
1281   /**
1282    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1283    */
1284   @Override
1285   public byte[] getRowArray() {
1286     return bytes;
1287   }
1288 
1289   /**
1290    * @return Row offset
1291    */
1292   @Override
1293   public int getRowOffset() {
1294     return getKeyOffset() + Bytes.SIZEOF_SHORT;
1295   }
1296 
1297   /**
1298    * @return Row length
1299    */
1300   @Override
1301   public short getRowLength() {
1302     return Bytes.toShort(this.bytes, getKeyOffset());
1303   }
1304 
1305   /**
1306    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1307    */
1308   @Override
1309   public byte[] getFamilyArray() {
1310     return bytes;
1311   }
1312 
1313   /**
1314    * @return Family offset
1315    */
1316   @Override
1317   public int getFamilyOffset() {
1318     return getFamilyOffset(getRowLength());
1319   }
1320 
1321   /**
1322    * @return Family offset
1323    */
1324   private int getFamilyOffset(int rlength) {
1325     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
1326   }
1327 
1328   /**
1329    * @return Family length
1330    */
1331   @Override
1332   public byte getFamilyLength() {
1333     return getFamilyLength(getFamilyOffset());
1334   }
1335 
1336   /**
1337    * @return Family length
1338    */
1339   public byte getFamilyLength(int foffset) {
1340     return this.bytes[foffset-1];
1341   }
1342 
1343   /**
1344    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1345    */
1346   @Override
1347   public byte[] getQualifierArray() {
1348     return bytes;
1349   }
1350 
1351   /**
1352    * @return Qualifier offset
1353    */
1354   @Override
1355   public int getQualifierOffset() {
1356     return getQualifierOffset(getFamilyOffset());
1357   }
1358 
1359   /**
1360    * @return Qualifier offset
1361    */
1362   private int getQualifierOffset(int foffset) {
1363     return foffset + getFamilyLength(foffset);
1364   }
1365 
1366   /**
1367    * @return Qualifier length
1368    */
1369   @Override
1370   public int getQualifierLength() {
1371     return getQualifierLength(getRowLength(),getFamilyLength());
1372   }
1373 
1374   /**
1375    * @return Qualifier length
1376    */
1377   private int getQualifierLength(int rlength, int flength) {
1378     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1379   }
1380 
1381   /**
1382    * @return Column (family + qualifier) length
1383    */
1384   private int getTotalColumnLength(int rlength, int foffset) {
1385     int flength = getFamilyLength(foffset);
1386     int qlength = getQualifierLength(rlength,flength);
1387     return flength + qlength;
1388   }
1389 
1390   /**
1391    * @return Timestamp offset
1392    */
1393   public int getTimestampOffset() {
1394     return getTimestampOffset(getKeyLength());
1395   }
1396 
1397   /**
1398    * @param keylength Pass if you have it to save on a int creation.
1399    * @return Timestamp offset
1400    */
1401   private int getTimestampOffset(final int keylength) {
1402     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1403   }
1404 
1405   /**
1406    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1407    */
1408   public boolean isLatestTimestamp() {
1409     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1410       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1411   }
1412 
1413   /**
1414    * @param now Time to set into <code>this</code> IFF timestamp ==
1415    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1416    * @return True is we modified this.
1417    */
1418   public boolean updateLatestStamp(final byte [] now) {
1419     if (this.isLatestTimestamp()) {
1420       int tsOffset = getTimestampOffset();
1421       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1422       // clear cache or else getTimestamp() possibly returns an old value
1423       return true;
1424     }
1425     return false;
1426   }
1427 
1428   //---------------------------------------------------------------------------
1429   //
1430   //  Methods that return copies of fields
1431   //
1432   //---------------------------------------------------------------------------
1433 
1434   /**
1435    * Do not use unless you have to.  Used internally for compacting and testing.
1436    *
1437    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
1438    * {@link #getValue()} if accessing a KeyValue client-side.
1439    * @return Copy of the key portion only.
1440    */
1441   public byte [] getKey() {
1442     int keylength = getKeyLength();
1443     byte [] key = new byte[keylength];
1444     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1445     return key;
1446   }
1447 
1448   /**
1449    * Returns value in a new byte array.
1450    * Primarily for use client-side. If server-side, use
1451    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1452    * save on allocations.
1453    * @return Value in a new byte array.
1454    */
1455   @Deprecated // use CellUtil.getValueArray()
1456   public byte [] getValue() {
1457     return CellUtil.cloneValue(this);
1458   }
1459 
1460   /**
1461    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1462    * byte array.<p>
1463    *
1464    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1465    * lengths instead.
1466    * @return Row in a new byte array.
1467    */
1468   @Deprecated // use CellUtil.getRowArray()
1469   public byte [] getRow() {
1470     return CellUtil.cloneRow(this);
1471   }
1472 
1473   /**
1474    *
1475    * @return Timestamp
1476    */
1477   @Override
1478   public long getTimestamp() {
1479     return getTimestamp(getKeyLength());
1480   }
1481 
1482   /**
1483    * @param keylength Pass if you have it to save on a int creation.
1484    * @return Timestamp
1485    */
1486   long getTimestamp(final int keylength) {
1487     int tsOffset = getTimestampOffset(keylength);
1488     return Bytes.toLong(this.bytes, tsOffset);
1489   }
1490 
1491   /**
1492    * @return Type of this KeyValue.
1493    */
1494   @Deprecated
1495   public byte getType() {
1496     return getTypeByte();
1497   }
1498 
1499   /**
1500    * @return KeyValue.TYPE byte representation
1501    */
1502   @Override
1503   public byte getTypeByte() {
1504     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1505   }
1506 
1507   /**
1508    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1509    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1510    * KeyValue type.
1511    */
1512   @Deprecated // use CellUtil#isDelete
1513   public boolean isDelete() {
1514     return KeyValue.isDelete(getType());
1515   }
1516 
1517   /**
1518    * @return True if this KV is a {@link KeyValue.Type#Delete} type.
1519    */
1520   public boolean isDeleteType() {
1521     // TODO: Fix this method name vis-a-vis isDelete!
1522     return getTypeByte() == Type.Delete.getCode();
1523   }
1524 
1525   /**
1526    * @return True if this KV is a delete family type.
1527    */
1528   public boolean isDeleteFamily() {
1529     return getTypeByte() == Type.DeleteFamily.getCode();
1530   }
1531 
1532   /**
1533    * @return True if this KV is a delete family-version type.
1534    */
1535   public boolean isDeleteFamilyVersion() {
1536     return getTypeByte() == Type.DeleteFamilyVersion.getCode();
1537   }
1538 
1539   /**
1540    *
1541    * @return True if this KV is a delete family or column type.
1542    */
1543   public boolean isDeleteColumnOrFamily() {
1544     int t = getTypeByte();
1545     return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
1546   }
1547 
1548   /**
1549    * Primarily for use client-side.  Returns the family of this KeyValue in a
1550    * new byte array.<p>
1551    *
1552    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1553    * lengths instead.
1554    * @return Returns family. Makes a copy.
1555    */
1556   @Deprecated // use CellUtil.getFamilyArray
1557   public byte [] getFamily() {
1558     return CellUtil.cloneFamily(this);
1559   }
1560 
1561   /**
1562    * Primarily for use client-side.  Returns the column qualifier of this
1563    * KeyValue in a new byte array.<p>
1564    *
1565    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1566    * lengths instead.
1567    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1568    * @return Returns qualifier. Makes a copy.
1569    */
1570   @Deprecated // use CellUtil.getQualifierArray
1571   public byte [] getQualifier() {
1572     return CellUtil.cloneQualifier(this);
1573   }
1574 
1575   /**
1576    * This returns the offset where the tag actually starts.
1577    */
1578   @Override
1579   public int getTagsOffset() {
1580     short tagsLen = getTagsLength();
1581     if (tagsLen == 0) {
1582       return this.offset + this.length;
1583     }
1584     return this.offset + this.length - tagsLen;
1585   }
1586 
1587   /**
1588    * This returns the total length of the tag bytes
1589    */
1590   @Override
1591   public short getTagsLength() {
1592     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1593     if (tagsLen > 0) {
1594       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1595       // length
1596       tagsLen -= TAGS_LENGTH_SIZE;
1597     }
1598     return (short) tagsLen;
1599   }
1600 
1601   /**
1602    * Returns any tags embedded in the KeyValue.  Used in testcases.
1603    * @return The tags
1604    */
1605   public List<Tag> getTags() {
1606     short tagsLength = getTagsLength();
1607     if (tagsLength == 0) {
1608       return EMPTY_ARRAY_LIST;
1609     }
1610     return Tag.asList(getTagsArray(), getTagsOffset(), tagsLength);
1611   }
1612 
1613   /**
1614    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1615    */
1616   @Override
1617   public byte[] getTagsArray() {
1618     return bytes;
1619   }
1620 
1621   /**
1622    * Creates a new KeyValue that only contains the key portion (the value is
1623    * set to be null).
1624    *
1625    * TODO only used by KeyOnlyFilter -- move there.
1626    * @param lenAsVal replace value with the actual value length (false=empty)
1627    */
1628   public KeyValue createKeyOnly(boolean lenAsVal) {
1629     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1630     // Rebuild as: <keylen:4><0:4><key:keylen>
1631     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1632     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1633     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1634         Math.min(newBuffer.length,this.length));
1635     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1636     if (lenAsVal) {
1637       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1638     }
1639     return new KeyValue(newBuffer);
1640   }
1641 
1642   /**
1643    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1644    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1645    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1646    * <p>
1647    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1648    * </p>
1649    * <p>
1650    * Not recommend to be used as this is old-style API.
1651    * </p>
1652    * @param c The column.
1653    * @return The parsed column.
1654    */
1655   public static byte [][] parseColumn(byte [] c) {
1656     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1657     if (index == -1) {
1658       // If no delimiter, return array of size 1
1659       return new byte [][] { c };
1660     } else if(index == c.length - 1) {
1661       // family with empty qualifier, return array size 2
1662       byte [] family = new byte[c.length-1];
1663       System.arraycopy(c, 0, family, 0, family.length);
1664       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1665     }
1666     // Family and column, return array size 2
1667     final byte [][] result = new byte [2][];
1668     result[0] = new byte [index];
1669     System.arraycopy(c, 0, result[0], 0, index);
1670     final int len = c.length - (index + 1);
1671     result[1] = new byte[len];
1672     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1673     return result;
1674   }
1675 
1676   /**
1677    * Makes a column in family:qualifier form from separate byte arrays.
1678    * <p>
1679    * Not recommended for usage as this is old-style API.
1680    * @param family
1681    * @param qualifier
1682    * @return family:qualifier
1683    */
1684   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1685     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1686   }
1687 
1688   /**
1689    * This function is only used in Meta key comparisons so its error message
1690    * is specific for meta key errors.
1691    */
1692   static int getRequiredDelimiterInReverse(final byte [] b,
1693       final int offset, final int length, final int delimiter) {
1694     int index = getDelimiterInReverse(b, offset, length, delimiter);
1695     if (index < 0) {
1696       throw new IllegalArgumentException("hbase:meta key must have two '" + (char)delimiter + "' "
1697         + "delimiters and have the following format: '<table>,<key>,<etc>'");
1698     }
1699     return index;
1700   }
1701 
1702   /**
1703    * @param b
1704    * @param delimiter
1705    * @return Index of delimiter having started from start of <code>b</code>
1706    * moving rightward.
1707    */
1708   public static int getDelimiter(final byte [] b, int offset, final int length,
1709       final int delimiter) {
1710     if (b == null) {
1711       throw new IllegalArgumentException("Passed buffer is null");
1712     }
1713     int result = -1;
1714     for (int i = offset; i < length + offset; i++) {
1715       if (b[i] == delimiter) {
1716         result = i;
1717         break;
1718       }
1719     }
1720     return result;
1721   }
1722 
1723   /**
1724    * Find index of passed delimiter walking from end of buffer backwards.
1725    * @param b
1726    * @param delimiter
1727    * @return Index of delimiter
1728    */
1729   public static int getDelimiterInReverse(final byte [] b, final int offset,
1730       final int length, final int delimiter) {
1731     if (b == null) {
1732       throw new IllegalArgumentException("Passed buffer is null");
1733     }
1734     int result = -1;
1735     for (int i = (offset + length) - 1; i >= offset; i--) {
1736       if (b[i] == delimiter) {
1737         result = i;
1738         break;
1739       }
1740     }
1741     return result;
1742   }
1743 
1744   /**
1745    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1746    * {@link KeyValue}s.
1747    */
1748   public static class MetaComparator extends KVComparator {
1749     /**
1750      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1751      * table.
1752      */
1753     @Override
1754     public int compare(final Cell left, final Cell right) {
1755       int c = compareRowKey(left, right);
1756       if (c != 0) {
1757         return c;
1758       }
1759       return CellComparator.compareWithoutRow(left, right);
1760     }
1761 
1762     @Override
1763     public int compareOnlyKeyPortion(Cell left, Cell right) {
1764       return compare(left, right);
1765     }
1766 
1767     @Override
1768     public int compareRows(byte [] left, int loffset, int llength,
1769         byte [] right, int roffset, int rlength) {
1770       int leftDelimiter = getDelimiter(left, loffset, llength,
1771           HConstants.DELIMITER);
1772       int rightDelimiter = getDelimiter(right, roffset, rlength,
1773           HConstants.DELIMITER);
1774       if (leftDelimiter < 0 && rightDelimiter >= 0) {
1775         // Nothing between hbase:meta and regionid.  Its first key.
1776         return -1;
1777       } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1778         return 1;
1779       } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1780         return 0;
1781       }
1782       // Compare up to the delimiter
1783       int result = Bytes.compareTo(left, loffset, leftDelimiter - loffset,
1784           right, roffset, rightDelimiter - roffset);
1785       if (result != 0) {
1786         return result;
1787       }
1788       // Compare middle bit of the row.
1789       // Move past delimiter
1790       leftDelimiter++;
1791       rightDelimiter++;
1792       int leftFarDelimiter = getRequiredDelimiterInReverse(left, leftDelimiter,
1793           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1794       int rightFarDelimiter = getRequiredDelimiterInReverse(right,
1795           rightDelimiter, rlength - (rightDelimiter - roffset),
1796           HConstants.DELIMITER);
1797       // Now compare middlesection of row.
1798       result = super.compareRows(left, leftDelimiter,
1799           leftFarDelimiter - leftDelimiter, right, rightDelimiter,
1800           rightFarDelimiter - rightDelimiter);
1801       if (result != 0) {
1802         return result;
1803       }
1804       // Compare last part of row, the rowid.
1805       leftFarDelimiter++;
1806       rightFarDelimiter++;
1807       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1808           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1809       return result;
1810     }
1811 
1812     /**
1813      * Don't do any fancy Block Index splitting tricks.
1814      */
1815     @Override
1816     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1817       return Arrays.copyOf(rightKey, rightKey.length);
1818     }
1819 
1820     /**
1821      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1822      * instantiate the appropriate comparator.
1823      * TODO: With V3 consider removing this.
1824      * @return legacy class name for FileFileTrailer#comparatorClassName
1825      */
1826     @Override
1827     public String getLegacyKeyComparatorName() {
1828       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1829     }
1830 
1831     @Override
1832     protected Object clone() throws CloneNotSupportedException {
1833       return new MetaComparator();
1834     }
1835 
1836     /**
1837      * Override the row key comparison to parse and compare the meta row key parts.
1838      */
1839     @Override
1840     protected int compareRowKey(final Cell l, final Cell r) {
1841       byte[] left = l.getRowArray();
1842       int loffset = l.getRowOffset();
1843       int llength = l.getRowLength();
1844       byte[] right = r.getRowArray();
1845       int roffset = r.getRowOffset();
1846       int rlength = r.getRowLength();
1847       return compareRows(left, loffset, llength, right, roffset, rlength);
1848     }
1849   }
1850 
1851   /**
1852    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1853    * portion.  This means two KeyValues with same Key but different Values are
1854    * considered the same as far as this Comparator is concerned.
1855    */
1856   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1857 
1858     /**
1859      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1860      * instantiate the appropriate comparator.
1861      * TODO: With V3 consider removing this.
1862      * @return legacy class name for FileFileTrailer#comparatorClassName
1863      */
1864     public String getLegacyKeyComparatorName() {
1865       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1866     }
1867 
1868     @Override // RawComparator
1869     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1870       return compareFlatKey(l,loff,llen, r,roff,rlen);
1871     }
1872 
1873     
1874     /**
1875      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1876      * @param left
1877      * @param right
1878      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1879      */
1880     protected int compareRowKey(final Cell left, final Cell right) {
1881       return CellComparator.compareRows(left, right);
1882     }
1883 
1884     /**
1885      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1886      * full KVs laid out in a flat byte[]s.
1887      * @param left
1888      * @param loffset
1889      * @param llength
1890      * @param right
1891      * @param roffset
1892      * @param rlength
1893      * @return  0 if equal, <0 if left smaller, >0 if right smaller
1894      */
1895     public int compareFlatKey(byte[] left, int loffset, int llength,
1896         byte[] right, int roffset, int rlength) {
1897       // Compare row
1898       short lrowlength = Bytes.toShort(left, loffset);
1899       short rrowlength = Bytes.toShort(right, roffset);
1900       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1901           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1902       if (compare != 0) {
1903         return compare;
1904       }
1905 
1906       // Compare the rest of the two KVs without making any assumptions about
1907       // the common prefix. This function will not compare rows anyway, so we
1908       // don't need to tell it that the common prefix includes the row.
1909       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1910           rlength, rrowlength);
1911     }
1912 
1913     public int compareFlatKey(byte[] left, byte[] right) {
1914       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1915     }
1916 
1917     public int compareOnlyKeyPortion(Cell left, Cell right) {
1918       return CellComparator.compareStatic(left, right, true);
1919     }
1920 
1921     /**
1922      * Compares the Key of a cell -- with fields being more significant in this order:
1923      * rowkey, colfam/qual, timestamp, type, mvcc
1924      */
1925     @Override
1926     public int compare(final Cell left, final Cell right) {
1927       int compare = CellComparator.compareStatic(left, right, false);
1928       return compare;
1929     }
1930 
1931     public int compareTimestamps(final Cell left, final Cell right) {
1932       return CellComparator.compareTimestamps(left, right);
1933     }
1934 
1935     /**
1936      * @param left
1937      * @param right
1938      * @return Result comparing rows.
1939      */
1940     public int compareRows(final Cell left, final Cell right) {
1941       return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(),
1942       right.getRowArray(), right.getRowOffset(), right.getRowLength());
1943     }
1944 
1945     /**
1946      * Get the b[],o,l for left and right rowkey portions and compare.
1947      * @param left
1948      * @param loffset
1949      * @param llength
1950      * @param right
1951      * @param roffset
1952      * @param rlength
1953      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1954      */
1955     public int compareRows(byte [] left, int loffset, int llength,
1956         byte [] right, int roffset, int rlength) {
1957       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
1958     }
1959 
1960     int compareColumns(final Cell left, final short lrowlength, final Cell right,
1961         final short rrowlength) {
1962       return CellComparator.compareColumns(left, right);
1963     }
1964 
1965     protected int compareColumns(
1966         byte [] left, int loffset, int llength, final int lfamilylength,
1967         byte [] right, int roffset, int rlength, final int rfamilylength) {
1968       // Compare family portion first.
1969       int diff = Bytes.compareTo(left, loffset, lfamilylength,
1970         right, roffset, rfamilylength);
1971       if (diff != 0) {
1972         return diff;
1973       }
1974       // Compare qualifier portion
1975       return Bytes.compareTo(left, loffset + lfamilylength,
1976         llength - lfamilylength,
1977         right, roffset + rfamilylength, rlength - rfamilylength);
1978       }
1979 
1980     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
1981       // The below older timestamps sorting ahead of newer timestamps looks
1982       // wrong but it is intentional. This way, newer timestamps are first
1983       // found when we iterate over a memstore and newer versions are the
1984       // first we trip over when reading from a store file.
1985       if (ltimestamp < rtimestamp) {
1986         return 1;
1987       } else if (ltimestamp > rtimestamp) {
1988         return -1;
1989       }
1990       return 0;
1991     }
1992 
1993     /**
1994      * Overridden
1995      * @param commonPrefix
1996      * @param left
1997      * @param loffset
1998      * @param llength
1999      * @param right
2000      * @param roffset
2001      * @param rlength
2002      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2003      */
2004     @Override // SamePrefixComparator
2005     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2006         int loffset, int llength, byte[] right, int roffset, int rlength) {
2007       // Compare row
2008       short lrowlength = Bytes.toShort(left, loffset);
2009       short rrowlength;
2010 
2011       int comparisonResult = 0;
2012       if (commonPrefix < ROW_LENGTH_SIZE) {
2013         // almost nothing in common
2014         rrowlength = Bytes.toShort(right, roffset);
2015         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2016             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2017       } else { // the row length is the same
2018         rrowlength = lrowlength;
2019         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2020           // The rows are not the same. Exclude the common prefix and compare
2021           // the rest of the two rows.
2022           int common = commonPrefix - ROW_LENGTH_SIZE;
2023           comparisonResult = compareRows(
2024               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2025               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2026         }
2027       }
2028       if (comparisonResult != 0) {
2029         return comparisonResult;
2030       }
2031 
2032       assert lrowlength == rrowlength;
2033       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2034           roffset, rlength, lrowlength);
2035     }
2036 
2037     /**
2038      * Compare columnFamily, qualifier, timestamp, and key type (everything
2039      * except the row). This method is used both in the normal comparator and
2040      * the "same-prefix" comparator. Note that we are assuming that row portions
2041      * of both KVs have already been parsed and found identical, and we don't
2042      * validate that assumption here.
2043      * @param commonPrefix
2044      *          the length of the common prefix of the two key-values being
2045      *          compared, including row length and row
2046      */
2047     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2048         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2049       /***
2050        * KeyValue Format and commonLength:
2051        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2052        * ------------------|-------commonLength--------|--------------
2053        */
2054       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2055 
2056       // commonLength + TIMESTAMP_TYPE_SIZE
2057       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2058       // ColumnFamily + Qualifier length.
2059       int lcolumnlength = llength - commonLengthWithTSAndType;
2060       int rcolumnlength = rlength - commonLengthWithTSAndType;
2061 
2062       byte ltype = left[loffset + (llength - 1)];
2063       byte rtype = right[roffset + (rlength - 1)];
2064 
2065       // If the column is not specified, the "minimum" key type appears the
2066       // latest in the sorted order, regardless of the timestamp. This is used
2067       // for specifying the last key/value in a given row, because there is no
2068       // "lexicographically last column" (it would be infinitely long). The
2069       // "maximum" key type does not need this behavior.
2070       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2071         // left is "bigger", i.e. it appears later in the sorted order
2072         return 1;
2073       }
2074       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2075         return -1;
2076       }
2077 
2078       int lfamilyoffset = commonLength + loffset;
2079       int rfamilyoffset = commonLength + roffset;
2080 
2081       // Column family length.
2082       int lfamilylength = left[lfamilyoffset - 1];
2083       int rfamilylength = right[rfamilyoffset - 1];
2084       // If left family size is not equal to right family size, we need not
2085       // compare the qualifiers.
2086       boolean sameFamilySize = (lfamilylength == rfamilylength);
2087       int common = 0;
2088       if (commonPrefix > 0) {
2089         common = Math.max(0, commonPrefix - commonLength);
2090         if (!sameFamilySize) {
2091           // Common should not be larger than Math.min(lfamilylength,
2092           // rfamilylength).
2093           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2094         } else {
2095           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2096         }
2097       }
2098       if (!sameFamilySize) {
2099         // comparing column family is enough.
2100         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2101             - common, right, rfamilyoffset + common, rfamilylength - common);
2102       }
2103       // Compare family & qualifier together.
2104       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2105           lcolumnlength - common, right, rfamilyoffset + common,
2106           rcolumnlength - common);
2107       if (comparison != 0) {
2108         return comparison;
2109       }
2110 
2111       ////
2112       // Next compare timestamps.
2113       long ltimestamp = Bytes.toLong(left,
2114           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2115       long rtimestamp = Bytes.toLong(right,
2116           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2117       int compare = compareTimestamps(ltimestamp, rtimestamp);
2118       if (compare != 0) {
2119         return compare;
2120       }
2121 
2122       // Compare types. Let the delete types sort ahead of puts; i.e. types
2123       // of higher numbers sort before those of lesser numbers. Maximum (255)
2124       // appears ahead of everything, and minimum (0) appears after
2125       // everything.
2126       return (0xff & rtype) - (0xff & ltype);
2127     }
2128 
2129     protected int compareFamilies(final byte[] left, final int loffset, final int lfamilylength,
2130         final byte[] right, final int roffset, final int rfamilylength) {
2131       int diff = Bytes.compareTo(left, loffset, lfamilylength, right, roffset, rfamilylength);
2132       return diff;
2133     }
2134 
2135     protected int compareColumns(final byte[] left, final int loffset, final int lquallength,
2136         final byte[] right, final int roffset, final int rquallength) {
2137       int diff = Bytes.compareTo(left, loffset, lquallength, right, roffset, rquallength);
2138       return diff;
2139     }
2140     /**
2141      * Compares the row and column of two keyvalues for equality
2142      * @param left
2143      * @param right
2144      * @return True if same row and column.
2145      */
2146     public boolean matchingRowColumn(final Cell left,
2147         final Cell right) {
2148       short lrowlength = left.getRowLength();
2149       short rrowlength = right.getRowLength();
2150 
2151       // TsOffset = end of column data. just comparing Row+CF length of each
2152       if ((left.getRowLength() + left.getFamilyLength() + left.getQualifierLength()) != (right
2153           .getRowLength() + right.getFamilyLength() + right.getQualifierLength())) {
2154         return false;
2155       }
2156 
2157       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2158         return false;
2159       }
2160 
2161       int lfoffset = left.getFamilyOffset();
2162       int rfoffset = right.getFamilyOffset();
2163       int lclength = left.getQualifierLength();
2164       int rclength = right.getQualifierLength();
2165       int lfamilylength = left.getFamilyLength();
2166       int rfamilylength = right.getFamilyLength();
2167       int diff = compareFamilies(left.getFamilyArray(), lfoffset, lfamilylength,
2168           right.getFamilyArray(), rfoffset, rfamilylength);
2169       if (diff != 0) {
2170         return false;
2171       } else {
2172         diff = compareColumns(left.getQualifierArray(), left.getQualifierOffset(), lclength,
2173             right.getQualifierArray(), right.getQualifierOffset(), rclength);
2174         return diff == 0;
2175       }
2176     }
2177 
2178     /**
2179      * Compares the row of two keyvalues for equality
2180      * @param left
2181      * @param right
2182      * @return True if rows match.
2183      */
2184     public boolean matchingRows(final Cell left, final Cell right) {
2185       short lrowlength = left.getRowLength();
2186       short rrowlength = right.getRowLength();
2187       return matchingRows(left, lrowlength, right, rrowlength);
2188     }
2189 
2190     /**
2191      * @param left
2192      * @param lrowlength
2193      * @param right
2194      * @param rrowlength
2195      * @return True if rows match.
2196      */
2197     private boolean matchingRows(final Cell left, final short lrowlength,
2198         final Cell right, final short rrowlength) {
2199       return lrowlength == rrowlength &&
2200           matchingRows(left.getRowArray(), left.getRowOffset(), lrowlength,
2201               right.getRowArray(), right.getRowOffset(), rrowlength);
2202     }
2203 
2204     /**
2205      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2206      * @param left Left row array.
2207      * @param loffset Left row offset.
2208      * @param llength Left row length.
2209      * @param right Right row array.
2210      * @param roffset Right row offset.
2211      * @param rlength Right row length.
2212      * @return Whether rows are the same row.
2213      */
2214     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2215         final byte [] right, final int roffset, final int rlength) {
2216       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2217     }
2218 
2219     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2220       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2221       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2222         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2223             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2224             + Bytes.toStringBinary(firstKeyInBlock));
2225         return firstKeyInBlock;
2226       }
2227       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2228         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2229             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2230             Bytes.toStringBinary(fakeKey));
2231         return firstKeyInBlock;
2232       }
2233       return fakeKey;
2234     }
2235 
2236     /**
2237      * This is a HFile block index key optimization.
2238      * @param leftKey
2239      * @param rightKey
2240      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2241      */
2242     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2243       if (rightKey == null) {
2244         throw new IllegalArgumentException("rightKey can not be null");
2245       }
2246       if (leftKey == null) {
2247         return Arrays.copyOf(rightKey, rightKey.length);
2248       }
2249       if (compareFlatKey(leftKey, rightKey) >= 0) {
2250         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2251           + ", rightKey:" + Bytes.toString(rightKey));
2252       }
2253 
2254       short leftRowLength = Bytes.toShort(leftKey, 0);
2255       short rightRowLength = Bytes.toShort(rightKey, 0);
2256       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2257       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2258       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2259       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2260       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2261       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2262       // rows are equal
2263       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2264         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2265         // Compare family & qualifier together.
2266         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2267           rightCommonLength, rightColumnLength);
2268         // same with "row + family + qualifier", return rightKey directly
2269         if (comparison == 0) {
2270           return Arrays.copyOf(rightKey, rightKey.length);
2271         }
2272         // "family + qualifier" are different, generate a faked key per rightKey
2273         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2274         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2275         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2276         return newKey;
2277       }
2278       // rows are different
2279       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2280       short diffIdx = 0;
2281       while (diffIdx < minLength
2282           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2283         diffIdx++;
2284       }
2285       byte[] newRowKey = null;
2286       if (diffIdx >= minLength) {
2287         // leftKey's row is prefix of rightKey's.
2288         newRowKey = new byte[diffIdx + 1];
2289         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2290       } else {
2291         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2292         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2293             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2294           newRowKey = new byte[diffIdx + 1];
2295           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2296           newRowKey[diffIdx] = (byte) (diffByte + 1);
2297         } else {
2298           newRowKey = new byte[diffIdx + 1];
2299           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2300         }
2301       }
2302       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2303         Type.Maximum).getKey();
2304     }
2305 
2306     @Override
2307     protected Object clone() throws CloneNotSupportedException {
2308       return new KVComparator();
2309     }
2310 
2311   }
2312 
2313   /**
2314    * @param b
2315    * @return A KeyValue made of a byte array that holds the key-only part.
2316    * Needed to convert hfile index members to KeyValues.
2317    */
2318   public static KeyValue createKeyValueFromKey(final byte [] b) {
2319     return createKeyValueFromKey(b, 0, b.length);
2320   }
2321 
2322   /**
2323    * @param bb
2324    * @return A KeyValue made of a byte buffer that holds the key-only part.
2325    * Needed to convert hfile index members to KeyValues.
2326    */
2327   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2328     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2329   }
2330 
2331   /**
2332    * @param b
2333    * @param o
2334    * @param l
2335    * @return A KeyValue made of a byte array that holds the key-only part.
2336    * Needed to convert hfile index members to KeyValues.
2337    */
2338   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2339       final int l) {
2340     byte [] newb = new byte[l + ROW_OFFSET];
2341     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2342     Bytes.putInt(newb, 0, l);
2343     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2344     return new KeyValue(newb);
2345   }
2346 
2347   /**
2348    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2349    * backing bytes copied from the steam.
2350    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2351    * of zero, we will return null which can be useful marking a stream as done.
2352    * @throws IOException
2353    */
2354   public static KeyValue create(final DataInput in) throws IOException {
2355     return create(in.readInt(), in);
2356   }
2357 
2358   /**
2359    * Create a KeyValue reading <code>length</code> from <code>in</code>
2360    * @param length
2361    * @param in
2362    * @return Created KeyValue OR if we find a length of zero, we will return null which
2363    * can be useful marking a stream as done.
2364    * @throws IOException
2365    */
2366   public static KeyValue create(int length, final DataInput in) throws IOException {
2367 
2368     if (length <= 0) {
2369       if (length == 0) return null;
2370       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2371     }
2372 
2373     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2374     byte [] bytes = new byte[length];
2375     in.readFully(bytes);
2376     return new KeyValue(bytes, 0, length);
2377   }
2378   
2379   /**
2380    * Create a new KeyValue by copying existing cell and adding new tags
2381    * @param c
2382    * @param newTags
2383    * @return a new KeyValue instance with new tags
2384    */
2385   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2386     List<Tag> existingTags = null;
2387     if(c.getTagsLength() > 0) {
2388       existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
2389       existingTags.addAll(newTags);
2390     } else {
2391       existingTags = newTags;
2392     }
2393     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2394       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
2395       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
2396       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
2397       c.getValueLength(), existingTags);
2398   }
2399 
2400   /**
2401    * Create a KeyValue reading from the raw InputStream.
2402    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2403    * @param in
2404    * @return Created KeyValue OR if we find a length of zero, we will return null which
2405    * can be useful marking a stream as done.
2406    * @throws IOException
2407    */
2408   public static KeyValue iscreate(final InputStream in) throws IOException {
2409     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2410     int bytesRead = 0;
2411     while (bytesRead < intBytes.length) {
2412       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2413       if (n < 0) {
2414         if (bytesRead == 0) return null; // EOF at start is ok
2415         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2416       }
2417       bytesRead += n;
2418     }
2419     // TODO: perhaps some sanity check is needed here.
2420     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2421     IOUtils.readFully(in, bytes, 0, bytes.length);
2422     return new KeyValue(bytes, 0, bytes.length);
2423   }
2424 
2425   /**
2426    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2427    * @param kv
2428    * @param out
2429    * @return Length written on stream
2430    * @throws IOException
2431    * @see #create(DataInput) for the inverse function
2432    */
2433   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2434     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2435     // work for all implementations.
2436     int length = kv.getLength();
2437     out.writeInt(length);
2438     out.write(kv.getBuffer(), kv.getOffset(), length);
2439     return length + Bytes.SIZEOF_INT;
2440   }
2441 
2442   /**
2443    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2444    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2445    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2446    * @param kv
2447    * @param out
2448    * @return Length written on stream
2449    * @throws IOException
2450    * @see #create(DataInput) for the inverse function
2451    * @see #write(KeyValue, DataOutput)
2452    */
2453   @Deprecated
2454   public static long oswrite(final KeyValue kv, final OutputStream out)
2455       throws IOException {
2456     int length = kv.getLength();
2457     // This does same as DataOuput#writeInt (big-endian, etc.)
2458     out.write(Bytes.toBytes(length));
2459     out.write(kv.getBuffer(), kv.getOffset(), length);
2460     return length + Bytes.SIZEOF_INT;
2461   }
2462 
2463   /**
2464    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2465    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2466    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2467    * @param kv
2468    * @param out
2469    * @param withTags
2470    * @return Length written on stream
2471    * @throws IOException
2472    * @see #create(DataInput) for the inverse function
2473    * @see #write(KeyValue, DataOutput)
2474    */
2475   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2476       throws IOException {
2477     int length = kv.getLength();
2478     if (!withTags) {
2479       length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2480     }
2481     // This does same as DataOuput#writeInt (big-endian, etc.)
2482     out.write(Bytes.toBytes(length));
2483     out.write(kv.getBuffer(), kv.getOffset(), length);
2484     return length + Bytes.SIZEOF_INT;
2485   }
2486 
2487   /**
2488    * Comparator that compares row component only of a KeyValue.
2489    */
2490   public static class RowOnlyComparator implements Comparator<KeyValue> {
2491     final KVComparator comparator;
2492 
2493     public RowOnlyComparator(final KVComparator c) {
2494       this.comparator = c;
2495     }
2496 
2497     public int compare(KeyValue left, KeyValue right) {
2498       return comparator.compareRows(left, right);
2499     }
2500   }
2501 
2502 
2503   /**
2504    * Avoids redundant comparisons for better performance.
2505    * 
2506    * TODO get rid of this wart
2507    */
2508   public interface SamePrefixComparator<T> {
2509     /**
2510      * Compare two keys assuming that the first n bytes are the same.
2511      * @param commonPrefix How many bytes are the same.
2512      */
2513     int compareIgnoringPrefix(
2514       int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
2515     );
2516   }
2517 
2518   /**
2519    * This is a TEST only Comparator used in TestSeekTo and TestReseekTo.
2520    */
2521   public static class RawBytesComparator extends KVComparator {
2522     /**
2523      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2524      * instantiate the appropriate comparator.
2525      * TODO: With V3 consider removing this.
2526      * @return legacy class name for FileFileTrailer#comparatorClassName
2527      */
2528     public String getLegacyKeyComparatorName() {
2529       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2530     }
2531 
2532     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2533         int roffset, int rlength) {
2534       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2535     }
2536 
2537     @Override
2538     public int compare(Cell left, Cell right) {
2539       return compareOnlyKeyPortion(left, right);
2540     }
2541 
2542     @VisibleForTesting
2543     public int compareOnlyKeyPortion(Cell left, Cell right) {
2544       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
2545           left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
2546       if (c != 0) {
2547         return c;
2548       }
2549       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getFamilyArray(), left.getFamilyOffset(),
2550           left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
2551           right.getFamilyLength());
2552       if (c != 0) {
2553         return c;
2554       }
2555       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getQualifierArray(), left.getQualifierOffset(),
2556           left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
2557           right.getQualifierLength());
2558       if (c != 0) {
2559         return c;
2560       }
2561       c = compareTimestamps(left.getTimestamp(), right.getTimestamp());
2562       if (c != 0) {
2563         return c;
2564       }
2565       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
2566     }
2567 
2568     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2569       return firstKeyInBlock;
2570     }
2571 
2572   }
2573 
2574   /**
2575    * HeapSize implementation
2576    *
2577    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2578    * MemStore.
2579    */
2580   @Override
2581   public long heapSize() {
2582     int sum = 0;
2583     sum += ClassSize.OBJECT;// the KeyValue object itself
2584     sum += ClassSize.REFERENCE;// pointer to "bytes"
2585     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2586     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2587     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2588     sum += Bytes.SIZEOF_LONG;// memstoreTS
2589     return ClassSize.align(sum);
2590   }
2591 
2592   /**
2593    * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[]
2594    * Mainly used in places where we need to compare two cells.  Avoids copying of bytes
2595    * In places like block index keys, we need to compare the key byte[] with a cell.
2596    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
2597    */
2598   public static class KeyOnlyKeyValue extends KeyValue {
2599     private int length = 0;
2600     private int offset = 0;
2601     private byte[] b;
2602 
2603     public KeyOnlyKeyValue() {
2604 
2605     }
2606 
2607     public KeyOnlyKeyValue(byte[] b, int offset, int length) {
2608       this.b = b;
2609       this.length = length;
2610       this.offset = offset;
2611     }
2612 
2613     @Override
2614     public int getKeyOffset() {
2615       return this.offset;
2616     }
2617 
2618     /**
2619      * A setter that helps to avoid object creation every time and whenever
2620      * there is a need to create new KeyOnlyKeyValue.
2621      * @param key
2622      * @param offset
2623      * @param length
2624      */
2625     public void setKey(byte[] key, int offset, int length) {
2626       this.b = key;
2627       this.offset = offset;
2628       this.length = length;
2629     }
2630 
2631     @Override
2632     public byte[] getKey() {
2633       int keylength = getKeyLength();
2634       byte[] key = new byte[keylength];
2635       System.arraycopy(this.b, getKeyOffset(), key, 0, keylength);
2636       return key;
2637     }
2638 
2639     @Override
2640     public byte[] getRowArray() {
2641       return b;
2642     }
2643 
2644     @Override
2645     public int getRowOffset() {
2646       return getKeyOffset() + Bytes.SIZEOF_SHORT;
2647     }
2648 
2649     @Override
2650     public byte[] getFamilyArray() {
2651       return b;
2652     }
2653 
2654     @Override
2655     public byte getFamilyLength() {
2656       return this.b[getFamilyOffset() - 1];
2657     }
2658 
2659     @Override
2660     public int getFamilyOffset() {
2661       return this.offset + Bytes.SIZEOF_SHORT + getRowLength() + Bytes.SIZEOF_BYTE;
2662     }
2663 
2664     @Override
2665     public byte[] getQualifierArray() {
2666       return b;
2667     }
2668 
2669     @Override
2670     public int getQualifierLength() {
2671       return getQualifierLength(getRowLength(), getFamilyLength());
2672     }
2673 
2674     @Override
2675     public int getQualifierOffset() {
2676       return getFamilyOffset() + getFamilyLength();
2677     }
2678 
2679     @Override
2680     public int getKeyLength() {
2681       return length;
2682     }
2683 
2684     @Override
2685     public short getRowLength() {
2686       return Bytes.toShort(this.b, getKeyOffset());
2687     }
2688 
2689     @Override
2690     public byte getTypeByte() {
2691       return this.b[this.offset + getKeyLength() - 1];
2692     }
2693 
2694     private int getQualifierLength(int rlength, int flength) {
2695       return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
2696     }
2697 
2698     @Override
2699     public long getTimestamp() {
2700       int tsOffset = getTimestampOffset();
2701       return Bytes.toLong(this.b, tsOffset);
2702     }
2703 
2704     @Override
2705     public int getTimestampOffset() {
2706       return getKeyOffset() + getKeyLength() - TIMESTAMP_TYPE_SIZE;
2707     }
2708 
2709     @Override
2710     public byte[] getTagsArray() {
2711       return HConstants.EMPTY_BYTE_ARRAY;
2712     }
2713 
2714     @Override
2715     public int getTagsOffset() {
2716       return (short) 0;
2717     }
2718 
2719     @Override
2720     public byte[] getValueArray() {
2721       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2722     }
2723 
2724     @Override
2725     public int getValueOffset() {
2726       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2727     }
2728 
2729     @Override
2730     public int getValueLength() {
2731       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2732     }
2733 
2734     @Override
2735     public short getTagsLength() {
2736       return (short) 0;
2737     }
2738 
2739     @Override
2740     public String toString() {
2741       if (this.b == null || this.b.length == 0) {
2742         return "empty";
2743       }
2744       return keyToString(this.b, this.offset, getKeyLength()) + "/vlen=0/mvcc=0";
2745     }
2746 
2747     @Override
2748     public int hashCode() {
2749       return super.hashCode();
2750     }
2751 
2752     @Override
2753     public boolean equals(Object other) {
2754       return super.equals(other);
2755     }
2756   }
2757 }