View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.OutputStream;
30  import java.nio.ByteBuffer;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.util.ByteBufferUtils;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.ClassSize;
45  import org.apache.hadoop.io.IOUtils;
46  import org.apache.hadoop.io.RawComparator;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  
50  /**
51   * An HBase Key/Value. This is the fundamental HBase Type.
52   * <p>
53   * HBase applications and users should use the Cell interface and avoid directly using KeyValue and
54   * member functions not defined in Cell.
55   * <p>
56   * If being used client-side, the primary methods to access individual fields are
57   * {@link #getRowArray()}, {@link #getFamilyArray()}, {@link #getQualifierArray()},
58   * {@link #getTimestamp()}, and {@link #getValueArray()}. These methods allocate new byte arrays
59   * and return copies. Avoid their use server-side.
60   * <p>
61   * Instances of this class are immutable. They do not implement Comparable but Comparators are
62   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
63   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
64   * Hfiles, and bloom filter keys.
65   * <p>
66   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
67   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
68   * <code>&lt;keylength&gt; &lt;valuelength&gt; &lt;key&gt; &lt;value&gt;</code> Key is further
69   * decomposed as: <code>&lt;rowlength&gt; &lt;row&gt; &lt;columnfamilylength&gt;
70   * &lt;columnfamily&gt; &lt;columnqualifier&gt;
71   * &lt;timestamp&gt; &lt;keytype&gt;</code> The <code>rowlength</code> maximum is
72   * <code>Short.MAX_SIZE</code>, column family length maximum is <code>Byte.MAX_SIZE</code>, and
73   * column qualifier + key length must be &lt; <code>Integer.MAX_SIZE</code>. The column does not
74   * contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}<br>
75   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
76   * the value part. The format for this part is: <code>&lt;tagslength&gt;&lt;tagsbytes&gt;</code>.
77   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
78   * contain one or more tags where as each tag is of the form
79   * <code>&lt;taglength&gt;&lt;tagtype&gt;&lt;tagbytes&gt;</code>. <code>tagtype</code> is one byte
80   * and <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type
81   * length and actual tag bytes length.
82   */
83  @InterfaceAudience.Private
84  public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
85      SettableTimestamp, Streamable {
86    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
87  
88    private static final Log LOG = LogFactory.getLog(KeyValue.class);
89  
90    /**
91     * Colon character in UTF-8
92     */
93    public static final char COLUMN_FAMILY_DELIMITER = ':';
94  
95    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
96      new byte[]{COLUMN_FAMILY_DELIMITER};
97  
98    /**
99     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
100    * of KeyValue only.
101    * @deprecated Use {@link CellComparator#COMPARATOR} instead
102    */
103   @Deprecated
104   public static final KVComparator COMPARATOR = new KVComparator();
105   /**
106    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
107    * {@link KeyValue}s.
108    * @deprecated Use {@link CellComparator#META_COMPARATOR} instead
109    */
110   @Deprecated
111   public static final KVComparator META_COMPARATOR = new MetaComparator();
112 
113   /**
114    * Needed for Bloom Filters.
115    *    * @deprecated Use {@link Bytes#BYTES_RAWCOMPARATOR} instead
116    */
117   @Deprecated
118   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
119 
120   /** Size of the key length field in bytes*/
121   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
122 
123   /** Size of the key type field in bytes */
124   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
125 
126   /** Size of the row length field in bytes */
127   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
128 
129   /** Size of the family length field in bytes */
130   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
131 
132   /** Size of the timestamp field in bytes */
133   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
134 
135   // Size of the timestamp and type byte on end of a key -- a long + a byte.
136   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
137 
138   // Size of the length shorts and bytes in key.
139   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
140       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
141 
142   // How far into the key the row starts at. First thing to read is the short
143   // that says how long the row is.
144   public static final int ROW_OFFSET =
145     Bytes.SIZEOF_INT /*keylength*/ +
146     Bytes.SIZEOF_INT /*valuelength*/;
147 
148   public static final int ROW_KEY_OFFSET = ROW_OFFSET + ROW_LENGTH_SIZE;
149 
150   // Size of the length ints in a KeyValue datastructure.
151   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
152 
153   /** Size of the tags length field in bytes */
154   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
155 
156   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
157 
158   private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
159 
160   /**
161    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
162    * characteristics would take up for its underlying data structure.
163    *
164    * @param rlength row length
165    * @param flength family length
166    * @param qlength qualifier length
167    * @param vlength value length
168    *
169    * @return the <code>KeyValue</code> data structure length
170    */
171   public static long getKeyValueDataStructureSize(int rlength,
172       int flength, int qlength, int vlength) {
173     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
174         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
175   }
176 
177   /**
178    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
179    * characteristics would take up for its underlying data structure.
180    *
181    * @param rlength row length
182    * @param flength family length
183    * @param qlength qualifier length
184    * @param vlength value length
185    * @param tagsLength total length of the tags
186    *
187    * @return the <code>KeyValue</code> data structure length
188    */
189   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
190       int vlength, int tagsLength) {
191     if (tagsLength == 0) {
192       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
193     }
194     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
195         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
196   }
197 
198   /**
199    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
200    * characteristics would take up for its underlying data structure.
201    *
202    * @param klength key length
203    * @param vlength value length
204    * @param tagsLength total length of the tags
205    *
206    * @return the <code>KeyValue</code> data structure length
207    */
208   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
209     if (tagsLength == 0) {
210       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
211     }
212     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
213   }
214 
215   /**
216    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
217    * characteristics would take up in its underlying data structure for the key.
218    *
219    * @param rlength row length
220    * @param flength family length
221    * @param qlength qualifier length
222    *
223    * @return the key data structure length
224    */
225   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
226     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
227   }
228 
229   /**
230    * Key type.
231    * Has space for other key types to be added later.  Cannot rely on
232    * enum ordinals . They change if item is removed or moved.  Do our own codes.
233    */
234   public static enum Type {
235     Minimum((byte)0),
236     Put((byte)4),
237 
238     Delete((byte)8),
239     DeleteFamilyVersion((byte)10),
240     DeleteColumn((byte)12),
241     DeleteFamily((byte)14),
242 
243     // Maximum is used when searching; you look from maximum on down.
244     Maximum((byte)255);
245 
246     private final byte code;
247 
248     Type(final byte c) {
249       this.code = c;
250     }
251 
252     public byte getCode() {
253       return this.code;
254     }
255 
256     /**
257      * Cannot rely on enum ordinals . They change if item is removed or moved.
258      * Do our own codes.
259      * @param b
260      * @return Type associated with passed code.
261      */
262     public static Type codeToType(final byte b) {
263       for (Type t : Type.values()) {
264         if (t.getCode() == b) {
265           return t;
266         }
267       }
268       throw new RuntimeException("Unknown code " + b);
269     }
270   }
271 
272   /**
273    * Lowest possible key.
274    * Makes a Key with highest possible Timestamp, empty row and column.  No
275    * key can be equal or lower than this one in memstore or in store file.
276    */
277   public static final KeyValue LOWESTKEY =
278     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
279 
280   ////
281   // KeyValue core instance fields.
282   protected byte [] bytes = null;  // an immutable byte array that contains the KV
283   protected int offset = 0;  // offset into bytes buffer KV starts at
284   protected int length = 0;  // length of the KV starting from offset.
285 
286   /**
287    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
288    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
289    * KeyValue type.
290    */
291   public static boolean isDelete(byte t) {
292     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
293   }
294 
295   /** Here be dragons **/
296 
297   /**
298    * used to achieve atomic operations in the memstore.
299    */
300   @Override
301   public long getSequenceId() {
302     return seqId;
303   }
304 
305   @Override
306   public void setSequenceId(long seqId) {
307     this.seqId = seqId;
308   }
309 
310   // multi-version concurrency control version.  default value is 0, aka do not care.
311   private long seqId = 0;
312 
313   /** Dragon time over, return to normal business */
314 
315 
316   /** Writable Constructor -- DO NOT USE */
317   public KeyValue() {}
318 
319   /**
320    * Creates a KeyValue from the start of the specified byte array.
321    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
322    * @param bytes byte array
323    */
324   public KeyValue(final byte [] bytes) {
325     this(bytes, 0);
326   }
327 
328   /**
329    * Creates a KeyValue from the specified byte array and offset.
330    * Presumes <code>bytes</code> content starting at <code>offset</code> is
331    * formatted as a KeyValue blob.
332    * @param bytes byte array
333    * @param offset offset to start of KeyValue
334    */
335   public KeyValue(final byte [] bytes, final int offset) {
336     this(bytes, offset, getLength(bytes, offset));
337   }
338 
339   /**
340    * Creates a KeyValue from the specified byte array, starting at offset, and
341    * for length <code>length</code>.
342    * @param bytes byte array
343    * @param offset offset to start of the KeyValue
344    * @param length length of the KeyValue
345    */
346   public KeyValue(final byte [] bytes, final int offset, final int length) {
347     this.bytes = bytes;
348     this.offset = offset;
349     this.length = length;
350   }
351 
352   /**
353    * Creates a KeyValue from the specified byte array, starting at offset, and
354    * for length <code>length</code>.
355    *
356    * @param bytes  byte array
357    * @param offset offset to start of the KeyValue
358    * @param length length of the KeyValue
359    * @param ts
360    */
361   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
362     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
363   }
364 
365   /** Constructors that build a new backing byte array from fields */
366 
367   /**
368    * Constructs KeyValue structure filled with null value.
369    * Sets type to {@link KeyValue.Type#Maximum}
370    * @param row - row key (arbitrary byte array)
371    * @param timestamp
372    */
373   public KeyValue(final byte [] row, final long timestamp) {
374     this(row, null, null, timestamp, Type.Maximum, null);
375   }
376 
377   /**
378    * Constructs KeyValue structure filled with null value.
379    * @param row - row key (arbitrary byte array)
380    * @param timestamp
381    */
382   public KeyValue(final byte [] row, final long timestamp, Type type) {
383     this(row, null, null, timestamp, type, null);
384   }
385 
386   /**
387    * Constructs KeyValue structure filled with null value.
388    * Sets type to {@link KeyValue.Type#Maximum}
389    * @param row - row key (arbitrary byte array)
390    * @param family family name
391    * @param qualifier column qualifier
392    */
393   public KeyValue(final byte [] row, final byte [] family,
394       final byte [] qualifier) {
395     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
396   }
397 
398   /**
399    * Constructs KeyValue structure as a put filled with specified values and
400    * LATEST_TIMESTAMP.
401    * @param row - row key (arbitrary byte array)
402    * @param family family name
403    * @param qualifier column qualifier
404    */
405   public KeyValue(final byte [] row, final byte [] family,
406       final byte [] qualifier, final byte [] value) {
407     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
408   }
409 
410   /**
411    * Constructs KeyValue structure filled with specified values.
412    * @param row row key
413    * @param family family name
414    * @param qualifier column qualifier
415    * @param timestamp version timestamp
416    * @param type key type
417    * @throws IllegalArgumentException
418    */
419   public KeyValue(final byte[] row, final byte[] family,
420       final byte[] qualifier, final long timestamp, Type type) {
421     this(row, family, qualifier, timestamp, type, null);
422   }
423 
424   /**
425    * Constructs KeyValue structure filled with specified values.
426    * @param row row key
427    * @param family family name
428    * @param qualifier column qualifier
429    * @param timestamp version timestamp
430    * @param value column value
431    * @throws IllegalArgumentException
432    */
433   public KeyValue(final byte[] row, final byte[] family,
434       final byte[] qualifier, final long timestamp, final byte[] value) {
435     this(row, family, qualifier, timestamp, Type.Put, value);
436   }
437 
438   /**
439    * Constructs KeyValue structure filled with specified values.
440    * @param row row key
441    * @param family family name
442    * @param qualifier column qualifier
443    * @param timestamp version timestamp
444    * @param value column value
445    * @param tags tags
446    * @throws IllegalArgumentException
447    */
448   public KeyValue(final byte[] row, final byte[] family,
449       final byte[] qualifier, final long timestamp, final byte[] value,
450       final Tag[] tags) {
451     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
452   }
453 
454   /**
455    * Constructs KeyValue structure filled with specified values.
456    * @param row row key
457    * @param family family name
458    * @param qualifier column qualifier
459    * @param timestamp version timestamp
460    * @param value column value
461    * @param tags tags non-empty list of tags or null
462    * @throws IllegalArgumentException
463    */
464   public KeyValue(final byte[] row, final byte[] family,
465       final byte[] qualifier, final long timestamp, final byte[] value,
466       final List<Tag> tags) {
467     this(row, 0, row==null ? 0 : row.length,
468       family, 0, family==null ? 0 : family.length,
469       qualifier, 0, qualifier==null ? 0 : qualifier.length,
470       timestamp, Type.Put,
471       value, 0, value==null ? 0 : value.length, tags);
472   }
473 
474   /**
475    * Constructs KeyValue structure filled with specified values.
476    * @param row row key
477    * @param family family name
478    * @param qualifier column qualifier
479    * @param timestamp version timestamp
480    * @param type key type
481    * @param value column value
482    * @throws IllegalArgumentException
483    */
484   public KeyValue(final byte[] row, final byte[] family,
485       final byte[] qualifier, final long timestamp, Type type,
486       final byte[] value) {
487     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
488         timestamp, type,   value, 0, len(value));
489   }
490 
491   /**
492    * Constructs KeyValue structure filled with specified values.
493    * <p>
494    * Column is split into two fields, family and qualifier.
495    * @param row row key
496    * @param family family name
497    * @param qualifier column qualifier
498    * @param timestamp version timestamp
499    * @param type key type
500    * @param value column value
501    * @throws IllegalArgumentException
502    */
503   public KeyValue(final byte[] row, final byte[] family,
504       final byte[] qualifier, final long timestamp, Type type,
505       final byte[] value, final List<Tag> tags) {
506     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
507         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
508   }
509 
510   /**
511    * Constructs KeyValue structure filled with specified values.
512    * @param row row key
513    * @param family family name
514    * @param qualifier column qualifier
515    * @param timestamp version timestamp
516    * @param type key type
517    * @param value column value
518    * @throws IllegalArgumentException
519    */
520   public KeyValue(final byte[] row, final byte[] family,
521       final byte[] qualifier, final long timestamp, Type type,
522       final byte[] value, final byte[] tags) {
523     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
524         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
525   }
526 
527   /**
528    * Constructs KeyValue structure filled with specified values.
529    * @param row row key
530    * @param family family name
531    * @param qualifier column qualifier
532    * @param qoffset qualifier offset
533    * @param qlength qualifier length
534    * @param timestamp version timestamp
535    * @param type key type
536    * @param value column value
537    * @param voffset value offset
538    * @param vlength value length
539    * @throws IllegalArgumentException
540    */
541   public KeyValue(byte [] row, byte [] family,
542       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
543       byte [] value, int voffset, int vlength, List<Tag> tags) {
544     this(row, 0, row==null ? 0 : row.length,
545         family, 0, family==null ? 0 : family.length,
546         qualifier, qoffset, qlength, timestamp, type,
547         value, voffset, vlength, tags);
548   }
549 
550   /**
551    * @param row
552    * @param family
553    * @param qualifier
554    * @param qoffset
555    * @param qlength
556    * @param timestamp
557    * @param type
558    * @param value
559    * @param voffset
560    * @param vlength
561    * @param tags
562    */
563   public KeyValue(byte [] row, byte [] family,
564       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
565       byte [] value, int voffset, int vlength, byte[] tags) {
566     this(row, 0, row==null ? 0 : row.length,
567         family, 0, family==null ? 0 : family.length,
568         qualifier, qoffset, qlength, timestamp, type,
569         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
570   }
571 
572   /**
573    * Constructs KeyValue structure filled with specified values.
574    * <p>
575    * Column is split into two fields, family and qualifier.
576    * @param row row key
577    * @throws IllegalArgumentException
578    */
579   public KeyValue(final byte [] row, final int roffset, final int rlength,
580       final byte [] family, final int foffset, final int flength,
581       final byte [] qualifier, final int qoffset, final int qlength,
582       final long timestamp, final Type type,
583       final byte [] value, final int voffset, final int vlength) {
584     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
585       qlength, timestamp, type, value, voffset, vlength, null);
586   }
587 
588   /**
589    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
590    * data buffer.
591    * <p>
592    * Column is split into two fields, family and qualifier.
593    *
594    * @param buffer the bytes buffer to use
595    * @param boffset buffer offset
596    * @param row row key
597    * @param roffset row offset
598    * @param rlength row length
599    * @param family family name
600    * @param foffset family offset
601    * @param flength family length
602    * @param qualifier column qualifier
603    * @param qoffset qualifier offset
604    * @param qlength qualifier length
605    * @param timestamp version timestamp
606    * @param type key type
607    * @param value column value
608    * @param voffset value offset
609    * @param vlength value length
610    * @param tags non-empty list of tags or null
611    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
612    * remaining in the buffer
613    */
614   public KeyValue(byte [] buffer, final int boffset,
615       final byte [] row, final int roffset, final int rlength,
616       final byte [] family, final int foffset, final int flength,
617       final byte [] qualifier, final int qoffset, final int qlength,
618       final long timestamp, final Type type,
619       final byte [] value, final int voffset, final int vlength,
620       final Tag[] tags) {
621      this.bytes  = buffer;
622      this.length = writeByteArray(buffer, boffset,
623          row, roffset, rlength,
624          family, foffset, flength, qualifier, qoffset, qlength,
625         timestamp, type, value, voffset, vlength, tags);
626      this.offset = boffset;
627    }
628 
629   /**
630    * Constructs KeyValue structure filled with specified values.
631    * <p>
632    * Column is split into two fields, family and qualifier.
633    * @param row row key
634    * @param roffset row offset
635    * @param rlength row length
636    * @param family family name
637    * @param foffset family offset
638    * @param flength family length
639    * @param qualifier column qualifier
640    * @param qoffset qualifier offset
641    * @param qlength qualifier length
642    * @param timestamp version timestamp
643    * @param type key type
644    * @param value column value
645    * @param voffset value offset
646    * @param vlength value length
647    * @param tags tags
648    * @throws IllegalArgumentException
649    */
650   public KeyValue(final byte [] row, final int roffset, final int rlength,
651       final byte [] family, final int foffset, final int flength,
652       final byte [] qualifier, final int qoffset, final int qlength,
653       final long timestamp, final Type type,
654       final byte [] value, final int voffset, final int vlength,
655       final List<Tag> tags) {
656     this.bytes = createByteArray(row, roffset, rlength,
657         family, foffset, flength, qualifier, qoffset, qlength,
658         timestamp, type, value, voffset, vlength, tags);
659     this.length = bytes.length;
660     this.offset = 0;
661   }
662 
663   /**
664    * @param row
665    * @param roffset
666    * @param rlength
667    * @param family
668    * @param foffset
669    * @param flength
670    * @param qualifier
671    * @param qoffset
672    * @param qlength
673    * @param timestamp
674    * @param type
675    * @param value
676    * @param voffset
677    * @param vlength
678    * @param tags
679    */
680   public KeyValue(final byte [] row, final int roffset, final int rlength,
681       final byte [] family, final int foffset, final int flength,
682       final byte [] qualifier, final int qoffset, final int qlength,
683       final long timestamp, final Type type,
684       final byte [] value, final int voffset, final int vlength,
685       final byte[] tags, final int tagsOffset, final int tagsLength) {
686     this.bytes = createByteArray(row, roffset, rlength,
687         family, foffset, flength, qualifier, qoffset, qlength,
688         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
689     this.length = bytes.length;
690     this.offset = 0;
691   }
692 
693   /**
694    * Constructs an empty KeyValue structure, with specified sizes.
695    * This can be used to partially fill up KeyValues.
696    * <p>
697    * Column is split into two fields, family and qualifier.
698    * @param rlength row length
699    * @param flength family length
700    * @param qlength qualifier length
701    * @param timestamp version timestamp
702    * @param type key type
703    * @param vlength value length
704    * @throws IllegalArgumentException
705    */
706   public KeyValue(final int rlength,
707       final int flength,
708       final int qlength,
709       final long timestamp, final Type type,
710       final int vlength) {
711     this(rlength, flength, qlength, timestamp, type, vlength, 0);
712   }
713 
714   /**
715    * Constructs an empty KeyValue structure, with specified sizes.
716    * This can be used to partially fill up KeyValues.
717    * <p>
718    * Column is split into two fields, family and qualifier.
719    * @param rlength row length
720    * @param flength family length
721    * @param qlength qualifier length
722    * @param timestamp version timestamp
723    * @param type key type
724    * @param vlength value length
725    * @param tagsLength
726    * @throws IllegalArgumentException
727    */
728   public KeyValue(final int rlength,
729       final int flength,
730       final int qlength,
731       final long timestamp, final Type type,
732       final int vlength, final int tagsLength) {
733     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
734         tagsLength);
735     this.length = bytes.length;
736     this.offset = 0;
737   }
738 
739 
740   public KeyValue(byte[] row, int roffset, int rlength,
741                   byte[] family, int foffset, int flength,
742                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
743     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
744         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
745         value, 0, value == null ? 0 : value.remaining(), tags);
746     this.length = bytes.length;
747     this.offset = 0;
748   }
749 
750   public KeyValue(Cell c) {
751     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
752         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
753         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
754         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
755         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
756     this.seqId = c.getSequenceId();
757   }
758 
759   /**
760    * Create an empty byte[] representing a KeyValue
761    * All lengths are preset and can be filled in later.
762    * @param rlength
763    * @param flength
764    * @param qlength
765    * @param timestamp
766    * @param type
767    * @param vlength
768    * @return The newly created byte array.
769    */
770   private static byte[] createEmptyByteArray(final int rlength, int flength,
771       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
772     if (rlength > Short.MAX_VALUE) {
773       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
774     }
775     if (flength > Byte.MAX_VALUE) {
776       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
777     }
778     // Qualifier length
779     if (qlength > Integer.MAX_VALUE - rlength - flength) {
780       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
781     }
782     checkForTagsLength(tagsLength);
783     // Key length
784     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
785     if (longkeylength > Integer.MAX_VALUE) {
786       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
787         Integer.MAX_VALUE);
788     }
789     int keylength = (int)longkeylength;
790     // Value length
791     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
792       throw new IllegalArgumentException("Valuer > " +
793           HConstants.MAXIMUM_VALUE_LENGTH);
794     }
795 
796     // Allocate right-sized byte array.
797     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
798         tagsLength)];
799     // Write the correct size markers
800     int pos = 0;
801     pos = Bytes.putInt(bytes, pos, keylength);
802     pos = Bytes.putInt(bytes, pos, vlength);
803     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
804     pos += rlength;
805     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
806     pos += flength + qlength;
807     pos = Bytes.putLong(bytes, pos, timestamp);
808     pos = Bytes.putByte(bytes, pos, type.getCode());
809     pos += vlength;
810     if (tagsLength > 0) {
811       pos = Bytes.putAsShort(bytes, pos, tagsLength);
812     }
813     return bytes;
814   }
815 
816   /**
817    * Checks the parameters passed to a constructor.
818    *
819    * @param row row key
820    * @param rlength row length
821    * @param family family name
822    * @param flength family length
823    * @param qlength qualifier length
824    * @param vlength value length
825    *
826    * @throws IllegalArgumentException an illegal value was passed
827    */
828   private static void checkParameters(final byte [] row, final int rlength,
829       final byte [] family, int flength, int qlength, int vlength)
830           throws IllegalArgumentException {
831     if (rlength > Short.MAX_VALUE) {
832       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
833     }
834     if (row == null) {
835       throw new IllegalArgumentException("Row is null");
836     }
837     // Family length
838     flength = family == null ? 0 : flength;
839     if (flength > Byte.MAX_VALUE) {
840       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
841     }
842     // Qualifier length
843     if (qlength > Integer.MAX_VALUE - rlength - flength) {
844       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
845     }
846     // Key length
847     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
848     if (longKeyLength > Integer.MAX_VALUE) {
849       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
850           Integer.MAX_VALUE);
851     }
852     // Value length
853     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
854       throw new IllegalArgumentException("Value length " + vlength + " > " +
855           HConstants.MAXIMUM_VALUE_LENGTH);
856     }
857   }
858 
859   /**
860    * Write KeyValue format into the provided byte array.
861    *
862    * @param buffer the bytes buffer to use
863    * @param boffset buffer offset
864    * @param row row key
865    * @param roffset row offset
866    * @param rlength row length
867    * @param family family name
868    * @param foffset family offset
869    * @param flength family length
870    * @param qualifier column qualifier
871    * @param qoffset qualifier offset
872    * @param qlength qualifier length
873    * @param timestamp version timestamp
874    * @param type key type
875    * @param value column value
876    * @param voffset value offset
877    * @param vlength value length
878    *
879    * @return The number of useful bytes in the buffer.
880    *
881    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
882    * remaining in the buffer
883    */
884   public static int writeByteArray(byte [] buffer, final int boffset,
885       final byte [] row, final int roffset, final int rlength,
886       final byte [] family, final int foffset, int flength,
887       final byte [] qualifier, final int qoffset, int qlength,
888       final long timestamp, final Type type,
889       final byte [] value, final int voffset, int vlength, Tag[] tags) {
890 
891     checkParameters(row, rlength, family, flength, qlength, vlength);
892 
893     // Calculate length of tags area
894     int tagsLength = 0;
895     if (tags != null && tags.length > 0) {
896       for (Tag t: tags) {
897         tagsLength += t.getValueLength() + Tag.INFRASTRUCTURE_SIZE;
898       }
899     }
900     checkForTagsLength(tagsLength);
901     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
902     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
903         tagsLength);
904     if (keyValueLength > buffer.length - boffset) {
905       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
906           keyValueLength);
907     }
908 
909     // Write key, value and key row length.
910     int pos = boffset;
911     pos = Bytes.putInt(buffer, pos, keyLength);
912     pos = Bytes.putInt(buffer, pos, vlength);
913     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
914     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
915     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
916     if (flength != 0) {
917       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
918     }
919     if (qlength != 0) {
920       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
921     }
922     pos = Bytes.putLong(buffer, pos, timestamp);
923     pos = Bytes.putByte(buffer, pos, type.getCode());
924     if (value != null && value.length > 0) {
925       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
926     }
927     // Write the number of tags. If it is 0 then it means there are no tags.
928     if (tagsLength > 0) {
929       pos = Bytes.putAsShort(buffer, pos, tagsLength);
930       for (Tag t : tags) {
931         int tlen = t.getValueLength();
932         pos = Bytes.putAsShort(buffer, pos, tlen + Tag.TYPE_LENGTH_SIZE);
933         pos = Bytes.putByte(buffer, pos, t.getType());
934         TagUtil.copyValueTo(t, buffer, pos);
935         pos += tlen;
936       }
937     }
938     return keyValueLength;
939   }
940 
941   private static void checkForTagsLength(int tagsLength) {
942     if (tagsLength > MAX_TAGS_LENGTH) {
943       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
944     }
945   }
946 
947   /**
948    * Write KeyValue format into a byte array.
949    * @param row row key
950    * @param roffset row offset
951    * @param rlength row length
952    * @param family family name
953    * @param foffset family offset
954    * @param flength family length
955    * @param qualifier column qualifier
956    * @param qoffset qualifier offset
957    * @param qlength qualifier length
958    * @param timestamp version timestamp
959    * @param type key type
960    * @param value column value
961    * @param voffset value offset
962    * @param vlength value length
963    * @return The newly created byte array.
964    */
965   private static byte [] createByteArray(final byte [] row, final int roffset,
966       final int rlength, final byte [] family, final int foffset, int flength,
967       final byte [] qualifier, final int qoffset, int qlength,
968       final long timestamp, final Type type,
969       final byte [] value, final int voffset,
970       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
971 
972     checkParameters(row, rlength, family, flength, qlength, vlength);
973     checkForTagsLength(tagsLength);
974     // Allocate right-sized byte array.
975     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
976     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
977       tagsLength)];
978     // Write key, value and key row length.
979     int pos = 0;
980     pos = Bytes.putInt(bytes, pos, keyLength);
981     pos = Bytes.putInt(bytes, pos, vlength);
982     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
983     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
984     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
985     if(flength != 0) {
986       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
987     }
988     if(qlength != 0) {
989       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
990     }
991     pos = Bytes.putLong(bytes, pos, timestamp);
992     pos = Bytes.putByte(bytes, pos, type.getCode());
993     if (value != null && value.length > 0) {
994       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
995     }
996     // Add the tags after the value part
997     if (tagsLength > 0) {
998       pos = Bytes.putAsShort(bytes, pos, tagsLength);
999       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
1000     }
1001     return bytes;
1002   }
1003 
1004   /**
1005    * @param qualifier can be a ByteBuffer or a byte[], or null.
1006    * @param value can be a ByteBuffer or a byte[], or null.
1007    */
1008   private static byte [] createByteArray(final byte [] row, final int roffset,
1009       final int rlength, final byte [] family, final int foffset, int flength,
1010       final Object qualifier, final int qoffset, int qlength,
1011       final long timestamp, final Type type,
1012       final Object value, final int voffset, int vlength, List<Tag> tags) {
1013 
1014     checkParameters(row, rlength, family, flength, qlength, vlength);
1015 
1016     // Calculate length of tags area
1017     int tagsLength = 0;
1018     if (tags != null && !tags.isEmpty()) {
1019       for (Tag t : tags) {
1020         tagsLength += t.getValueLength() + Tag.INFRASTRUCTURE_SIZE;
1021       }
1022     }
1023     checkForTagsLength(tagsLength);
1024     // Allocate right-sized byte array.
1025     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1026     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1027         tagsLength)];
1028 
1029     // Write key, value and key row length.
1030     int pos = 0;
1031     pos = Bytes.putInt(bytes, pos, keyLength);
1032 
1033     pos = Bytes.putInt(bytes, pos, vlength);
1034     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1035     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1036     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1037     if(flength != 0) {
1038       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1039     }
1040     if (qlength > 0) {
1041       if (qualifier instanceof ByteBuffer) {
1042         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1043       } else {
1044         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1045       }
1046     }
1047     pos = Bytes.putLong(bytes, pos, timestamp);
1048     pos = Bytes.putByte(bytes, pos, type.getCode());
1049     if (vlength > 0) {
1050       if (value instanceof ByteBuffer) {
1051         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1052       } else {
1053         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1054       }
1055     }
1056     // Add the tags after the value part
1057     if (tagsLength > 0) {
1058       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1059       for (Tag t : tags) {
1060         int tlen = t.getValueLength();
1061         pos = Bytes.putAsShort(bytes, pos, tlen + Tag.TYPE_LENGTH_SIZE);
1062         pos = Bytes.putByte(bytes, pos, t.getType());
1063         TagUtil.copyValueTo(t, bytes, pos);
1064         pos += tlen;
1065       }
1066     }
1067     return bytes;
1068   }
1069 
1070   /**
1071    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1072    */
1073   @Override
1074   public boolean equals(Object other) {
1075     if (!(other instanceof Cell)) {
1076       return false;
1077     }
1078     return CellUtil.equals(this, (Cell)other);
1079   }
1080 
1081   /**
1082    * In line with {@link #equals(Object)}, only uses the key portion, not the value.
1083    */
1084   @Override
1085   public int hashCode() {
1086     return calculateHashForKey(this);
1087   }
1088 
1089   private int calculateHashForKey(Cell cell) {
1090     // pre-calculate the 3 hashes made of byte ranges
1091     int rowHash = Bytes.hashCode(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1092     int familyHash = Bytes.hashCode(cell.getFamilyArray(), cell.getFamilyOffset(),
1093         cell.getFamilyLength());
1094     int qualifierHash = Bytes.hashCode(cell.getQualifierArray(), cell.getQualifierOffset(),
1095         cell.getQualifierLength());
1096 
1097     // combine the 6 sub-hashes
1098     int hash = 31 * rowHash + familyHash;
1099     hash = 31 * hash + qualifierHash;
1100     hash = 31 * hash + (int) cell.getTimestamp();
1101     hash = 31 * hash + cell.getTypeByte();
1102     return hash;
1103   }
1104 
1105   //---------------------------------------------------------------------------
1106   //
1107   //  KeyValue cloning
1108   //
1109   //---------------------------------------------------------------------------
1110 
1111   /**
1112    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1113    * @return Fully copied clone of this KeyValue
1114    * @throws CloneNotSupportedException
1115    */
1116   @Override
1117   public KeyValue clone() throws CloneNotSupportedException {
1118     super.clone();
1119     byte [] b = new byte[this.length];
1120     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1121     KeyValue ret = new KeyValue(b, 0, b.length);
1122     // Important to clone the memstoreTS as well - otherwise memstore's
1123     // update-in-place methods (eg increment) will end up creating
1124     // new entries
1125     ret.setSequenceId(seqId);
1126     return ret;
1127   }
1128 
1129   /**
1130    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1131    * http://en.wikipedia.org/wiki/Object_copy
1132    * @return Shallow copy of this KeyValue
1133    */
1134   public KeyValue shallowCopy() {
1135     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1136     shallowCopy.setSequenceId(this.seqId);
1137     return shallowCopy;
1138   }
1139 
1140   //---------------------------------------------------------------------------
1141   //
1142   //  String representation
1143   //
1144   //---------------------------------------------------------------------------
1145 
1146   @Override
1147   public String toString() {
1148     if (this.bytes == null || this.bytes.length == 0) {
1149       return "empty";
1150     }
1151     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
1152       + getValueLength() + "/seqid=" + seqId;
1153   }
1154 
1155   /**
1156    * @param k Key portion of a KeyValue.
1157    * @return Key as a String, empty string if k is null.
1158    */
1159   public static String keyToString(final byte [] k) {
1160     if (k == null) {
1161       return "";
1162     }
1163     return keyToString(k, 0, k.length);
1164   }
1165 
1166   /**
1167    * Produces a string map for this key/value pair. Useful for programmatic use
1168    * and manipulation of the data stored in an WALKey, for example, printing
1169    * as JSON. Values are left out due to their tendency to be large. If needed,
1170    * they can be added manually.
1171    *
1172    * @return the Map&lt;String,?&gt; containing data from this key
1173    */
1174   public Map<String, Object> toStringMap() {
1175     Map<String, Object> stringMap = new HashMap<String, Object>();
1176     stringMap.put("row", Bytes.toStringBinary(getRowArray(), getRowOffset(), getRowLength()));
1177     stringMap.put("family",
1178       Bytes.toStringBinary(getFamilyArray(), getFamilyOffset(), getFamilyLength()));
1179     stringMap.put("qualifier",
1180       Bytes.toStringBinary(getQualifierArray(), getQualifierOffset(), getQualifierLength()));
1181     stringMap.put("timestamp", getTimestamp());
1182     stringMap.put("vlen", getValueLength());
1183     List<Tag> tags = getTags();
1184     if (tags != null) {
1185       List<String> tagsString = new ArrayList<String>();
1186       for (Tag t : tags) {
1187         tagsString.add(t.toString());
1188       }
1189       stringMap.put("tag", tagsString);
1190     }
1191     return stringMap;
1192   }
1193 
1194   /**
1195    * Use for logging.
1196    * @param b Key portion of a KeyValue.
1197    * @param o Offset to start of key
1198    * @param l Length of key.
1199    * @return Key as a String.
1200    */
1201   public static String keyToString(final byte [] b, final int o, final int l) {
1202     if (b == null) return "";
1203     int rowlength = Bytes.toShort(b, o);
1204     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1205     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1206     int familylength = b[columnoffset - 1];
1207     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1208     String family = familylength == 0? "":
1209       Bytes.toStringBinary(b, columnoffset, familylength);
1210     String qualifier = columnlength == 0? "":
1211       Bytes.toStringBinary(b, columnoffset + familylength,
1212       columnlength - familylength);
1213     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1214     String timestampStr = humanReadableTimestamp(timestamp);
1215     byte type = b[o + l - 1];
1216     return row + "/" + family +
1217       (family != null && family.length() > 0? ":" :"") +
1218       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1219   }
1220 
1221   public static String humanReadableTimestamp(final long timestamp) {
1222     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1223       return "LATEST_TIMESTAMP";
1224     }
1225     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1226       return "OLDEST_TIMESTAMP";
1227     }
1228     return String.valueOf(timestamp);
1229   }
1230 
1231   //---------------------------------------------------------------------------
1232   //
1233   //  Public Member Accessors
1234   //
1235   //---------------------------------------------------------------------------
1236 
1237   /**
1238    * @return The byte array backing this KeyValue.
1239    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1240    */
1241   @Deprecated
1242   public byte [] getBuffer() {
1243     return this.bytes;
1244   }
1245 
1246   /**
1247    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1248    */
1249   public int getOffset() {
1250     return this.offset;
1251   }
1252 
1253   /**
1254    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1255    */
1256   public int getLength() {
1257     return length;
1258   }
1259 
1260   //---------------------------------------------------------------------------
1261   //
1262   //  Length and Offset Calculators
1263   //
1264   //---------------------------------------------------------------------------
1265 
1266   /**
1267    * Determines the total length of the KeyValue stored in the specified
1268    * byte array and offset.  Includes all headers.
1269    * @param bytes byte array
1270    * @param offset offset to start of the KeyValue
1271    * @return length of entire KeyValue, in bytes
1272    */
1273   private static int getLength(byte [] bytes, int offset) {
1274     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1275     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1276     return klength + vlength;
1277   }
1278 
1279   /**
1280    * @return Key offset in backing buffer..
1281    */
1282   public int getKeyOffset() {
1283     return this.offset + ROW_OFFSET;
1284   }
1285 
1286   public String getKeyString() {
1287     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1288   }
1289 
1290   /**
1291    * @return Length of key portion.
1292    */
1293   public int getKeyLength() {
1294     return Bytes.toInt(this.bytes, this.offset);
1295   }
1296 
1297   /**
1298    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1299    */
1300   @Override
1301   public byte[] getValueArray() {
1302     return bytes;
1303   }
1304 
1305   /**
1306    * @return the value offset
1307    */
1308   @Override
1309   public int getValueOffset() {
1310     int voffset = getKeyOffset() + getKeyLength();
1311     return voffset;
1312   }
1313 
1314   /**
1315    * @return Value length
1316    */
1317   @Override
1318   public int getValueLength() {
1319     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1320     return vlength;
1321   }
1322 
1323   /**
1324    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1325    */
1326   @Override
1327   public byte[] getRowArray() {
1328     return bytes;
1329   }
1330 
1331   /**
1332    * @return Row offset
1333    */
1334   @Override
1335   public int getRowOffset() {
1336     return this.offset + ROW_KEY_OFFSET;
1337   }
1338 
1339   /**
1340    * @return Row length
1341    */
1342   @Override
1343   public short getRowLength() {
1344     return Bytes.toShort(this.bytes, getKeyOffset());
1345   }
1346 
1347   /**
1348    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1349    */
1350   @Override
1351   public byte[] getFamilyArray() {
1352     return bytes;
1353   }
1354 
1355   /**
1356    * @return Family offset
1357    */
1358   @Override
1359   public int getFamilyOffset() {
1360     return getFamilyOffset(getRowLength());
1361   }
1362 
1363   /**
1364    * @return Family offset
1365    */
1366   private int getFamilyOffset(int rlength) {
1367     return this.offset + ROW_KEY_OFFSET + rlength + Bytes.SIZEOF_BYTE;
1368   }
1369 
1370   /**
1371    * @return Family length
1372    */
1373   @Override
1374   public byte getFamilyLength() {
1375     return getFamilyLength(getFamilyOffset());
1376   }
1377 
1378   /**
1379    * @return Family length
1380    */
1381   public byte getFamilyLength(int foffset) {
1382     return this.bytes[foffset-1];
1383   }
1384 
1385   /**
1386    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1387    */
1388   @Override
1389   public byte[] getQualifierArray() {
1390     return bytes;
1391   }
1392 
1393   /**
1394    * @return Qualifier offset
1395    */
1396   @Override
1397   public int getQualifierOffset() {
1398     return getQualifierOffset(getFamilyOffset());
1399   }
1400 
1401   /**
1402    * @return Qualifier offset
1403    */
1404   private int getQualifierOffset(int foffset) {
1405     return foffset + getFamilyLength(foffset);
1406   }
1407 
1408   /**
1409    * @return Qualifier length
1410    */
1411   @Override
1412   public int getQualifierLength() {
1413     return getQualifierLength(getRowLength(),getFamilyLength());
1414   }
1415 
1416   /**
1417    * @return Qualifier length
1418    */
1419   private int getQualifierLength(int rlength, int flength) {
1420     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1421   }
1422 
1423   /**
1424    * @return Timestamp offset
1425    */
1426   public int getTimestampOffset() {
1427     return getTimestampOffset(getKeyLength());
1428   }
1429 
1430   /**
1431    * @param keylength Pass if you have it to save on a int creation.
1432    * @return Timestamp offset
1433    */
1434   private int getTimestampOffset(final int keylength) {
1435     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1436   }
1437 
1438   /**
1439    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1440    */
1441   public boolean isLatestTimestamp() {
1442     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1443       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1444   }
1445 
1446   /**
1447    * @param now Time to set into <code>this</code> IFF timestamp ==
1448    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1449    * @return True is we modified this.
1450    */
1451   public boolean updateLatestStamp(final byte [] now) {
1452     if (this.isLatestTimestamp()) {
1453       int tsOffset = getTimestampOffset();
1454       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1455       // clear cache or else getTimestamp() possibly returns an old value
1456       return true;
1457     }
1458     return false;
1459   }
1460 
1461   @Override
1462   public void setTimestamp(long ts) {
1463     Bytes.putBytes(this.bytes, this.getTimestampOffset(), Bytes.toBytes(ts), 0, Bytes.SIZEOF_LONG);
1464   }
1465 
1466   @Override
1467   public void setTimestamp(byte[] ts, int tsOffset) {
1468     Bytes.putBytes(this.bytes, this.getTimestampOffset(), ts, tsOffset, Bytes.SIZEOF_LONG);
1469   }
1470 
1471   //---------------------------------------------------------------------------
1472   //
1473   //  Methods that return copies of fields
1474   //
1475   //---------------------------------------------------------------------------
1476 
1477   /**
1478    * Do not use unless you have to. Used internally for compacting and testing. Use
1479    * {@link #getRowArray()}, {@link #getFamilyArray()}, {@link #getQualifierArray()}, and
1480    * {@link #getValueArray()} if accessing a KeyValue client-side.
1481    * @return Copy of the key portion only.
1482    */
1483   public byte [] getKey() {
1484     int keylength = getKeyLength();
1485     byte [] key = new byte[keylength];
1486     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1487     return key;
1488   }
1489 
1490   /**
1491    *
1492    * @return Timestamp
1493    */
1494   @Override
1495   public long getTimestamp() {
1496     return getTimestamp(getKeyLength());
1497   }
1498 
1499   /**
1500    * @param keylength Pass if you have it to save on a int creation.
1501    * @return Timestamp
1502    */
1503   long getTimestamp(final int keylength) {
1504     int tsOffset = getTimestampOffset(keylength);
1505     return Bytes.toLong(this.bytes, tsOffset);
1506   }
1507 
1508   /**
1509    * @return Type of this KeyValue.
1510    */
1511   @Deprecated
1512   public byte getType() {
1513     return getTypeByte();
1514   }
1515 
1516   /**
1517    * @return KeyValue.TYPE byte representation
1518    */
1519   @Override
1520   public byte getTypeByte() {
1521     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1522   }
1523 
1524   /**
1525    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1526    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1527    * KeyValue type.
1528    */
1529   @Deprecated // use CellUtil#isDelete
1530   public boolean isDelete() {
1531     return KeyValue.isDelete(getType());
1532   }
1533 
1534   /**
1535    * This returns the offset where the tag actually starts.
1536    */
1537   @Override
1538   public int getTagsOffset() {
1539     int tagsLen = getTagsLength();
1540     if (tagsLen == 0) {
1541       return this.offset + this.length;
1542     }
1543     return this.offset + this.length - tagsLen;
1544   }
1545 
1546   /**
1547    * This returns the total length of the tag bytes
1548    */
1549   @Override
1550   public int getTagsLength() {
1551     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1552     if (tagsLen > 0) {
1553       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1554       // length
1555       tagsLen -= TAGS_LENGTH_SIZE;
1556     }
1557     return tagsLen;
1558   }
1559 
1560   /**
1561    * Returns any tags embedded in the KeyValue.  Used in testcases.
1562    * @return The tags
1563    */
1564   public List<Tag> getTags() {
1565     int tagsLength = getTagsLength();
1566     if (tagsLength == 0) {
1567       return EMPTY_ARRAY_LIST;
1568     }
1569     return TagUtil.asList(getTagsArray(), getTagsOffset(), tagsLength);
1570   }
1571 
1572   /**
1573    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1574    */
1575   @Override
1576   public byte[] getTagsArray() {
1577     return bytes;
1578   }
1579 
1580   /**
1581    * Creates a new KeyValue that only contains the key portion (the value is
1582    * set to be null).
1583    *
1584    * TODO only used by KeyOnlyFilter -- move there.
1585    * @param lenAsVal replace value with the actual value length (false=empty)
1586    */
1587   public KeyValue createKeyOnly(boolean lenAsVal) {
1588     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1589     // Rebuild as: <keylen:4><0:4><key:keylen>
1590     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1591     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1592     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1593         Math.min(newBuffer.length,this.length));
1594     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1595     if (lenAsVal) {
1596       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1597     }
1598     return new KeyValue(newBuffer);
1599   }
1600 
1601   /**
1602    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1603    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1604    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1605    * <p>
1606    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1607    * </p>
1608    * <p>
1609    * Not recommend to be used as this is old-style API.
1610    * </p>
1611    * @param c The column.
1612    * @return The parsed column.
1613    */
1614   public static byte [][] parseColumn(byte [] c) {
1615     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1616     if (index == -1) {
1617       // If no delimiter, return array of size 1
1618       return new byte [][] { c };
1619     } else if(index == c.length - 1) {
1620       // family with empty qualifier, return array size 2
1621       byte [] family = new byte[c.length-1];
1622       System.arraycopy(c, 0, family, 0, family.length);
1623       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1624     }
1625     // Family and column, return array size 2
1626     final byte [][] result = new byte [2][];
1627     result[0] = new byte [index];
1628     System.arraycopy(c, 0, result[0], 0, index);
1629     final int len = c.length - (index + 1);
1630     result[1] = new byte[len];
1631     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1632     return result;
1633   }
1634 
1635   /**
1636    * Makes a column in family:qualifier form from separate byte arrays.
1637    * <p>
1638    * Not recommended for usage as this is old-style API.
1639    * @param family
1640    * @param qualifier
1641    * @return family:qualifier
1642    */
1643   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1644     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1645   }
1646 
1647   /**
1648    * @param b
1649    * @param delimiter
1650    * @return Index of delimiter having started from start of <code>b</code>
1651    * moving rightward.
1652    */
1653   public static int getDelimiter(final byte [] b, int offset, final int length,
1654       final int delimiter) {
1655     if (b == null) {
1656       throw new IllegalArgumentException("Passed buffer is null");
1657     }
1658     int result = -1;
1659     for (int i = offset; i < length + offset; i++) {
1660       if (b[i] == delimiter) {
1661         result = i;
1662         break;
1663       }
1664     }
1665     return result;
1666   }
1667 
1668   /**
1669    * Find index of passed delimiter walking from end of buffer backwards.
1670    * @param b
1671    * @param delimiter
1672    * @return Index of delimiter
1673    */
1674   public static int getDelimiterInReverse(final byte [] b, final int offset,
1675       final int length, final int delimiter) {
1676     if (b == null) {
1677       throw new IllegalArgumentException("Passed buffer is null");
1678     }
1679     int result = -1;
1680     for (int i = (offset + length) - 1; i >= offset; i--) {
1681       if (b[i] == delimiter) {
1682         result = i;
1683         break;
1684       }
1685     }
1686     return result;
1687   }
1688 
1689   /**
1690    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1691    * {@link KeyValue}s.
1692    * @deprecated : {@link CellComparator#META_COMPARATOR} to be used
1693    */
1694   @Deprecated
1695   public static class MetaComparator extends KVComparator {
1696     /**
1697      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1698      * table.
1699      */
1700     @Override
1701     public int compare(final Cell left, final Cell right) {
1702       return CellComparator.META_COMPARATOR.compareKeyIgnoresMvcc(left, right);
1703     }
1704 
1705     @Override
1706     public int compareOnlyKeyPortion(Cell left, Cell right) {
1707       return compare(left, right);
1708     }
1709 
1710     @Override
1711     public int compareRows(byte [] left, int loffset, int llength,
1712         byte [] right, int roffset, int rlength) {
1713       int leftDelimiter = getDelimiter(left, loffset, llength,
1714           HConstants.DELIMITER);
1715       int rightDelimiter = getDelimiter(right, roffset, rlength,
1716           HConstants.DELIMITER);
1717       // Compare up to the delimiter
1718       int lpart = (leftDelimiter < 0 ? llength :leftDelimiter - loffset);
1719       int rpart = (rightDelimiter < 0 ? rlength :rightDelimiter - roffset);
1720       int result = Bytes.compareTo(left, loffset, lpart, right, roffset, rpart);
1721       if (result != 0) {
1722         return result;
1723       } else {
1724         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1725           return -1;
1726         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1727           return 1;
1728         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1729           return 0;
1730         }
1731       }
1732       // Compare middle bit of the row.
1733       // Move past delimiter
1734       leftDelimiter++;
1735       rightDelimiter++;
1736       int leftFarDelimiter = getDelimiterInReverse(left, leftDelimiter,
1737           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1738       int rightFarDelimiter = getDelimiterInReverse(right,
1739           rightDelimiter, rlength - (rightDelimiter - roffset),
1740           HConstants.DELIMITER);
1741       // Now compare middlesection of row.
1742       lpart = (leftFarDelimiter < 0 ? llength + loffset: leftFarDelimiter) - leftDelimiter;
1743       rpart = (rightFarDelimiter < 0 ? rlength + roffset: rightFarDelimiter)- rightDelimiter;
1744       result = super.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart);
1745       if (result != 0) {
1746         return result;
1747       }  else {
1748         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1749           return -1;
1750         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1751           return 1;
1752         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1753           return 0;
1754         }
1755       }
1756       // Compare last part of row, the rowid.
1757       leftFarDelimiter++;
1758       rightFarDelimiter++;
1759       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1760           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1761       return result;
1762     }
1763 
1764     /**
1765      * Don't do any fancy Block Index splitting tricks.
1766      */
1767     @Override
1768     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1769       return Arrays.copyOf(rightKey, rightKey.length);
1770     }
1771 
1772     /**
1773      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1774      * instantiate the appropriate comparator.
1775      * TODO: With V3 consider removing this.
1776      * @return legacy class name for FileFileTrailer#comparatorClassName
1777      */
1778     @Override
1779     public String getLegacyKeyComparatorName() {
1780       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1781     }
1782 
1783     @Override
1784     protected Object clone() throws CloneNotSupportedException {
1785       return new MetaComparator();
1786     }
1787 
1788     /**
1789      * Override the row key comparison to parse and compare the meta row key parts.
1790      */
1791     @Override
1792     protected int compareRowKey(final Cell l, final Cell r) {
1793       byte[] left = l.getRowArray();
1794       int loffset = l.getRowOffset();
1795       int llength = l.getRowLength();
1796       byte[] right = r.getRowArray();
1797       int roffset = r.getRowOffset();
1798       int rlength = r.getRowLength();
1799       return compareRows(left, loffset, llength, right, roffset, rlength);
1800     }
1801   }
1802 
1803   /**
1804    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1805    * portion.  This means two KeyValues with same Key but different Values are
1806    * considered the same as far as this Comparator is concerned.
1807    * @deprecated : Use {@link CellComparator}.
1808    */
1809   @Deprecated
1810   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1811 
1812     /**
1813      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1814      * instantiate the appropriate comparator.
1815      * TODO: With V3 consider removing this.
1816      * @return legacy class name for FileFileTrailer#comparatorClassName
1817      */
1818     public String getLegacyKeyComparatorName() {
1819       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1820     }
1821 
1822     @Override // RawComparator
1823     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1824       return compareFlatKey(l,loff,llen, r,roff,rlen);
1825     }
1826 
1827 
1828     /**
1829      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1830      * @param left
1831      * @param right
1832      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
1833      */
1834     protected int compareRowKey(final Cell left, final Cell right) {
1835       return CellComparator.COMPARATOR.compareRows(left, right);
1836     }
1837 
1838     /**
1839      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1840      * full KVs laid out in a flat byte[]s.
1841      * @param left
1842      * @param loffset
1843      * @param llength
1844      * @param right
1845      * @param roffset
1846      * @param rlength
1847      * @return  0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
1848      */
1849     public int compareFlatKey(byte[] left, int loffset, int llength,
1850         byte[] right, int roffset, int rlength) {
1851       // Compare row
1852       short lrowlength = Bytes.toShort(left, loffset);
1853       short rrowlength = Bytes.toShort(right, roffset);
1854       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1855           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1856       if (compare != 0) {
1857         return compare;
1858       }
1859 
1860       // Compare the rest of the two KVs without making any assumptions about
1861       // the common prefix. This function will not compare rows anyway, so we
1862       // don't need to tell it that the common prefix includes the row.
1863       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1864           rlength, rrowlength);
1865     }
1866 
1867     public int compareFlatKey(byte[] left, byte[] right) {
1868       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1869     }
1870 
1871     // compare a key against row/fam/qual/ts/type
1872     public int compareKey(Cell cell,
1873         byte[] row, int roff, int rlen,
1874         byte[] fam, int foff, int flen,
1875         byte[] col, int coff, int clen,
1876         long ts, byte type) {
1877 
1878       int compare = compareRows(
1879         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
1880         row, roff, rlen);
1881       if (compare != 0) {
1882         return compare;
1883       }
1884       // If the column is not specified, the "minimum" key type appears the
1885       // latest in the sorted order, regardless of the timestamp. This is used
1886       // for specifying the last key/value in a given row, because there is no
1887       // "lexicographically last column" (it would be infinitely long). The
1888       // "maximum" key type does not need this behavior.
1889       if (cell.getFamilyLength() + cell.getQualifierLength() == 0
1890           && cell.getTypeByte() == Type.Minimum.getCode()) {
1891         // left is "bigger", i.e. it appears later in the sorted order
1892         return 1;
1893       }
1894       if (flen+clen == 0 && type == Type.Minimum.getCode()) {
1895         return -1;
1896       }
1897 
1898       compare = compareFamilies(
1899         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
1900         fam, foff, flen);
1901       if (compare != 0) {
1902         return compare;
1903       }
1904       compare = compareColumns(
1905         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
1906         col, coff, clen);
1907       if (compare != 0) {
1908         return compare;
1909       }
1910       // Next compare timestamps.
1911       compare = compareTimestamps(cell.getTimestamp(), ts);
1912       if (compare != 0) {
1913         return compare;
1914       }
1915 
1916       // Compare types. Let the delete types sort ahead of puts; i.e. types
1917       // of higher numbers sort before those of lesser numbers. Maximum (255)
1918       // appears ahead of everything, and minimum (0) appears after
1919       // everything.
1920       return (0xff & type) - (0xff & cell.getTypeByte());
1921     }
1922 
1923     public int compareOnlyKeyPortion(Cell left, Cell right) {
1924       return CellComparator.COMPARATOR.compareKeyIgnoresMvcc(left, right);
1925     }
1926 
1927     /**
1928      * Compares the Key of a cell -- with fields being more significant in this order:
1929      * rowkey, colfam/qual, timestamp, type, mvcc
1930      */
1931     @Override
1932     public int compare(final Cell left, final Cell right) {
1933       int compare = CellComparator.COMPARATOR.compare(left, right);
1934       return compare;
1935     }
1936 
1937     public int compareTimestamps(final Cell left, final Cell right) {
1938       return CellComparator.compareTimestamps(left, right);
1939     }
1940 
1941     /**
1942      * @param left
1943      * @param right
1944      * @return Result comparing rows.
1945      */
1946     public int compareRows(final Cell left, final Cell right) {
1947       return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(),
1948       right.getRowArray(), right.getRowOffset(), right.getRowLength());
1949     }
1950 
1951     /**
1952      * Get the b[],o,l for left and right rowkey portions and compare.
1953      * @param left
1954      * @param loffset
1955      * @param llength
1956      * @param right
1957      * @param roffset
1958      * @param rlength
1959      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
1960      */
1961     public int compareRows(byte [] left, int loffset, int llength,
1962         byte [] right, int roffset, int rlength) {
1963       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
1964     }
1965 
1966     int compareColumns(final Cell left, final short lrowlength, final Cell right,
1967         final short rrowlength) {
1968       return CellComparator.compareColumns(left, right);
1969     }
1970 
1971     protected int compareColumns(
1972         byte [] left, int loffset, int llength, final int lfamilylength,
1973         byte [] right, int roffset, int rlength, final int rfamilylength) {
1974       // Compare family portion first.
1975       int diff = Bytes.compareTo(left, loffset, lfamilylength,
1976         right, roffset, rfamilylength);
1977       if (diff != 0) {
1978         return diff;
1979       }
1980       // Compare qualifier portion
1981       return Bytes.compareTo(left, loffset + lfamilylength,
1982         llength - lfamilylength,
1983         right, roffset + rfamilylength, rlength - rfamilylength);
1984       }
1985 
1986     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
1987       // The below older timestamps sorting ahead of newer timestamps looks
1988       // wrong but it is intentional. This way, newer timestamps are first
1989       // found when we iterate over a memstore and newer versions are the
1990       // first we trip over when reading from a store file.
1991       if (ltimestamp < rtimestamp) {
1992         return 1;
1993       } else if (ltimestamp > rtimestamp) {
1994         return -1;
1995       }
1996       return 0;
1997     }
1998 
1999     /**
2000      * Overridden
2001      * @param commonPrefix
2002      * @param left
2003      * @param loffset
2004      * @param llength
2005      * @param right
2006      * @param roffset
2007      * @param rlength
2008      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
2009      */
2010     @Override // SamePrefixComparator
2011     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2012         int loffset, int llength, byte[] right, int roffset, int rlength) {
2013       // Compare row
2014       short lrowlength = Bytes.toShort(left, loffset);
2015       short rrowlength;
2016 
2017       int comparisonResult = 0;
2018       if (commonPrefix < ROW_LENGTH_SIZE) {
2019         // almost nothing in common
2020         rrowlength = Bytes.toShort(right, roffset);
2021         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2022             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2023       } else { // the row length is the same
2024         rrowlength = lrowlength;
2025         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2026           // The rows are not the same. Exclude the common prefix and compare
2027           // the rest of the two rows.
2028           int common = commonPrefix - ROW_LENGTH_SIZE;
2029           comparisonResult = compareRows(
2030               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2031               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2032         }
2033       }
2034       if (comparisonResult != 0) {
2035         return comparisonResult;
2036       }
2037 
2038       assert lrowlength == rrowlength;
2039       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2040           roffset, rlength, lrowlength);
2041     }
2042 
2043     /**
2044      * Compare columnFamily, qualifier, timestamp, and key type (everything
2045      * except the row). This method is used both in the normal comparator and
2046      * the "same-prefix" comparator. Note that we are assuming that row portions
2047      * of both KVs have already been parsed and found identical, and we don't
2048      * validate that assumption here.
2049      * @param commonPrefix
2050      *          the length of the common prefix of the two key-values being
2051      *          compared, including row length and row
2052      */
2053     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2054         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2055       /***
2056        * KeyValue Format and commonLength:
2057        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2058        * ------------------|-------commonLength--------|--------------
2059        */
2060       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2061 
2062       // commonLength + TIMESTAMP_TYPE_SIZE
2063       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2064       // ColumnFamily + Qualifier length.
2065       int lcolumnlength = llength - commonLengthWithTSAndType;
2066       int rcolumnlength = rlength - commonLengthWithTSAndType;
2067 
2068       byte ltype = left[loffset + (llength - 1)];
2069       byte rtype = right[roffset + (rlength - 1)];
2070 
2071       // If the column is not specified, the "minimum" key type appears the
2072       // latest in the sorted order, regardless of the timestamp. This is used
2073       // for specifying the last key/value in a given row, because there is no
2074       // "lexicographically last column" (it would be infinitely long). The
2075       // "maximum" key type does not need this behavior.
2076       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2077         // left is "bigger", i.e. it appears later in the sorted order
2078         return 1;
2079       }
2080       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2081         return -1;
2082       }
2083 
2084       int lfamilyoffset = commonLength + loffset;
2085       int rfamilyoffset = commonLength + roffset;
2086 
2087       // Column family length.
2088       int lfamilylength = left[lfamilyoffset - 1];
2089       int rfamilylength = right[rfamilyoffset - 1];
2090       // If left family size is not equal to right family size, we need not
2091       // compare the qualifiers.
2092       boolean sameFamilySize = (lfamilylength == rfamilylength);
2093       int common = 0;
2094       if (commonPrefix > 0) {
2095         common = Math.max(0, commonPrefix - commonLength);
2096         if (!sameFamilySize) {
2097           // Common should not be larger than Math.min(lfamilylength,
2098           // rfamilylength).
2099           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2100         } else {
2101           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2102         }
2103       }
2104       if (!sameFamilySize) {
2105         // comparing column family is enough.
2106         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2107             - common, right, rfamilyoffset + common, rfamilylength - common);
2108       }
2109       // Compare family & qualifier together.
2110       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2111           lcolumnlength - common, right, rfamilyoffset + common,
2112           rcolumnlength - common);
2113       if (comparison != 0) {
2114         return comparison;
2115       }
2116 
2117       ////
2118       // Next compare timestamps.
2119       long ltimestamp = Bytes.toLong(left,
2120           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2121       long rtimestamp = Bytes.toLong(right,
2122           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2123       int compare = compareTimestamps(ltimestamp, rtimestamp);
2124       if (compare != 0) {
2125         return compare;
2126       }
2127 
2128       // Compare types. Let the delete types sort ahead of puts; i.e. types
2129       // of higher numbers sort before those of lesser numbers. Maximum (255)
2130       // appears ahead of everything, and minimum (0) appears after
2131       // everything.
2132       return (0xff & rtype) - (0xff & ltype);
2133     }
2134 
2135     protected int compareFamilies(final byte[] left, final int loffset, final int lfamilylength,
2136         final byte[] right, final int roffset, final int rfamilylength) {
2137       int diff = Bytes.compareTo(left, loffset, lfamilylength, right, roffset, rfamilylength);
2138       return diff;
2139     }
2140 
2141     protected int compareColumns(final byte[] left, final int loffset, final int lquallength,
2142         final byte[] right, final int roffset, final int rquallength) {
2143       int diff = Bytes.compareTo(left, loffset, lquallength, right, roffset, rquallength);
2144       return diff;
2145     }
2146     /**
2147      * Compares the row and column of two keyvalues for equality
2148      * @param left
2149      * @param right
2150      * @return True if same row and column.
2151      */
2152     public boolean matchingRowColumn(final Cell left,
2153         final Cell right) {
2154       short lrowlength = left.getRowLength();
2155       short rrowlength = right.getRowLength();
2156 
2157       // TsOffset = end of column data. just comparing Row+CF length of each
2158       if ((left.getRowLength() + left.getFamilyLength() + left.getQualifierLength()) != (right
2159           .getRowLength() + right.getFamilyLength() + right.getQualifierLength())) {
2160         return false;
2161       }
2162 
2163       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2164         return false;
2165       }
2166 
2167       int lfoffset = left.getFamilyOffset();
2168       int rfoffset = right.getFamilyOffset();
2169       int lclength = left.getQualifierLength();
2170       int rclength = right.getQualifierLength();
2171       int lfamilylength = left.getFamilyLength();
2172       int rfamilylength = right.getFamilyLength();
2173       int diff = compareFamilies(left.getFamilyArray(), lfoffset, lfamilylength,
2174           right.getFamilyArray(), rfoffset, rfamilylength);
2175       if (diff != 0) {
2176         return false;
2177       } else {
2178         diff = compareColumns(left.getQualifierArray(), left.getQualifierOffset(), lclength,
2179             right.getQualifierArray(), right.getQualifierOffset(), rclength);
2180         return diff == 0;
2181       }
2182     }
2183 
2184     /**
2185      * Compares the row of two keyvalues for equality
2186      * @param left
2187      * @param right
2188      * @return True if rows match.
2189      */
2190     public boolean matchingRows(final Cell left, final Cell right) {
2191       short lrowlength = left.getRowLength();
2192       short rrowlength = right.getRowLength();
2193       return matchingRows(left, lrowlength, right, rrowlength);
2194     }
2195 
2196     /**
2197      * @param left
2198      * @param lrowlength
2199      * @param right
2200      * @param rrowlength
2201      * @return True if rows match.
2202      */
2203     private boolean matchingRows(final Cell left, final short lrowlength,
2204         final Cell right, final short rrowlength) {
2205       return lrowlength == rrowlength &&
2206           matchingRows(left.getRowArray(), left.getRowOffset(), lrowlength,
2207               right.getRowArray(), right.getRowOffset(), rrowlength);
2208     }
2209 
2210     /**
2211      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2212      * @param left Left row array.
2213      * @param loffset Left row offset.
2214      * @param llength Left row length.
2215      * @param right Right row array.
2216      * @param roffset Right row offset.
2217      * @param rlength Right row length.
2218      * @return Whether rows are the same row.
2219      */
2220     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2221         final byte [] right, final int roffset, final int rlength) {
2222       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2223     }
2224 
2225     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2226       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2227       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2228         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2229             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2230             + Bytes.toStringBinary(firstKeyInBlock));
2231         return firstKeyInBlock;
2232       }
2233       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2234         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2235             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2236             Bytes.toStringBinary(fakeKey));
2237         return firstKeyInBlock;
2238       }
2239       return fakeKey;
2240     }
2241 
2242     /**
2243      * This is a HFile block index key optimization.
2244      * @param leftKey
2245      * @param rightKey
2246      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
2247      * @deprecated Since 0.99.2;
2248      */
2249     @Deprecated
2250     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2251       if (rightKey == null) {
2252         throw new IllegalArgumentException("rightKey can not be null");
2253       }
2254       if (leftKey == null) {
2255         return Arrays.copyOf(rightKey, rightKey.length);
2256       }
2257       if (compareFlatKey(leftKey, rightKey) >= 0) {
2258         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2259           + ", rightKey:" + Bytes.toString(rightKey));
2260       }
2261 
2262       short leftRowLength = Bytes.toShort(leftKey, 0);
2263       short rightRowLength = Bytes.toShort(rightKey, 0);
2264       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2265       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2266       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2267       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2268       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2269       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2270       // rows are equal
2271       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2272         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2273         // Compare family & qualifier together.
2274         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2275           rightCommonLength, rightColumnLength);
2276         // same with "row + family + qualifier", return rightKey directly
2277         if (comparison == 0) {
2278           return Arrays.copyOf(rightKey, rightKey.length);
2279         }
2280         // "family + qualifier" are different, generate a faked key per rightKey
2281         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2282         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2283         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2284         return newKey;
2285       }
2286       // rows are different
2287       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2288       short diffIdx = 0;
2289       while (diffIdx < minLength
2290           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2291         diffIdx++;
2292       }
2293       byte[] newRowKey = null;
2294       if (diffIdx >= minLength) {
2295         // leftKey's row is prefix of rightKey's.
2296         newRowKey = new byte[diffIdx + 1];
2297         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2298       } else {
2299         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2300         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2301             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2302           newRowKey = new byte[diffIdx + 1];
2303           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2304           newRowKey[diffIdx] = (byte) (diffByte + 1);
2305         } else {
2306           newRowKey = new byte[diffIdx + 1];
2307           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2308         }
2309       }
2310       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2311         Type.Maximum).getKey();
2312     }
2313 
2314     @Override
2315     protected Object clone() throws CloneNotSupportedException {
2316       super.clone();
2317       return new KVComparator();
2318     }
2319 
2320   }
2321 
2322   /**
2323    * @param b
2324    * @return A KeyValue made of a byte array that holds the key-only part.
2325    * Needed to convert hfile index members to KeyValues.
2326    */
2327   public static KeyValue createKeyValueFromKey(final byte [] b) {
2328     return createKeyValueFromKey(b, 0, b.length);
2329   }
2330 
2331   /**
2332    * @param bb
2333    * @return A KeyValue made of a byte buffer that holds the key-only part.
2334    * Needed to convert hfile index members to KeyValues.
2335    */
2336   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2337     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2338   }
2339 
2340   /**
2341    * @param b
2342    * @param o
2343    * @param l
2344    * @return A KeyValue made of a byte array that holds the key-only part.
2345    * Needed to convert hfile index members to KeyValues.
2346    */
2347   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2348       final int l) {
2349     byte [] newb = new byte[l + ROW_OFFSET];
2350     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2351     Bytes.putInt(newb, 0, l);
2352     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2353     return new KeyValue(newb);
2354   }
2355 
2356   /**
2357    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2358    * backing bytes copied from the steam.
2359    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2360    * of zero, we will return null which can be useful marking a stream as done.
2361    * @throws IOException
2362    */
2363   public static KeyValue create(final DataInput in) throws IOException {
2364     return create(in.readInt(), in);
2365   }
2366 
2367   /**
2368    * Create a KeyValue reading <code>length</code> from <code>in</code>
2369    * @param length
2370    * @param in
2371    * @return Created KeyValue OR if we find a length of zero, we will return null which
2372    * can be useful marking a stream as done.
2373    * @throws IOException
2374    */
2375   public static KeyValue create(int length, final DataInput in) throws IOException {
2376 
2377     if (length <= 0) {
2378       if (length == 0) return null;
2379       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2380     }
2381 
2382     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2383     byte [] bytes = new byte[length];
2384     in.readFully(bytes);
2385     return new KeyValue(bytes, 0, length);
2386   }
2387 
2388   /**
2389    * Create a new KeyValue by copying existing cell and adding new tags
2390    * @param c
2391    * @param newTags
2392    * @return a new KeyValue instance with new tags
2393    */
2394   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2395     List<Tag> existingTags = null;
2396     if(c.getTagsLength() > 0) {
2397       existingTags = CellUtil.getTags(c);
2398       existingTags.addAll(newTags);
2399     } else {
2400       existingTags = newTags;
2401     }
2402     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2403       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
2404       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
2405       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
2406       c.getValueLength(), existingTags);
2407   }
2408 
2409   /**
2410    * Create a KeyValue reading from the raw InputStream.
2411    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2412    * @param in
2413    * @return Created KeyValue or throws an exception
2414    * @throws IOException
2415    * {@link Deprecated} As of 1.2. Use {@link KeyValueUtil#iscreate(InputStream, boolean)} instead.
2416    */
2417   @Deprecated
2418   public static KeyValue iscreate(final InputStream in) throws IOException {
2419     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2420     int bytesRead = 0;
2421     while (bytesRead < intBytes.length) {
2422       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2423       if (n < 0) {
2424         if (bytesRead == 0) {
2425           throw new EOFException();
2426         }
2427         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2428       }
2429       bytesRead += n;
2430     }
2431     // TODO: perhaps some sanity check is needed here.
2432     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2433     IOUtils.readFully(in, bytes, 0, bytes.length);
2434     return new KeyValue(bytes, 0, bytes.length);
2435   }
2436 
2437   /**
2438    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2439    * @param kv
2440    * @param out
2441    * @return Length written on stream
2442    * @throws IOException
2443    * @see #create(DataInput) for the inverse function
2444    */
2445   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2446     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2447     // work for all implementations.
2448     int length = kv.getLength();
2449     out.writeInt(length);
2450     out.write(kv.getBuffer(), kv.getOffset(), length);
2451     return length + Bytes.SIZEOF_INT;
2452   }
2453 
2454   /**
2455    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2456    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2457    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2458    * @param kv
2459    * @param out
2460    * @param withTags
2461    * @return Length written on stream
2462    * @throws IOException
2463    * @see #create(DataInput) for the inverse function
2464    * @see #write(KeyValue, DataOutput)
2465    * @see KeyValueUtil#oswrite(Cell, OutputStream, boolean)
2466    * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
2467    *             Instead use {@link #write(OutputStream, boolean)}
2468    */
2469   @Deprecated
2470   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2471       throws IOException {
2472     return kv.write(out, withTags);
2473   }
2474 
2475   @Override
2476   public int write(OutputStream out) throws IOException {
2477     return write(out, true);
2478   }
2479 
2480   @Override
2481   public int write(OutputStream out, boolean withTags) throws IOException {
2482     // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
2483     // check KeyValueUtil#oswrite also and do necessary changes.
2484     int length = this.length;
2485     if (!withTags) {
2486       length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2487     }
2488     ByteBufferUtils.putInt(out, length);
2489     out.write(this.bytes, this.offset, length);
2490     return length + Bytes.SIZEOF_INT;
2491   }
2492 
2493   /**
2494    * Comparator that compares row component only of a KeyValue.
2495    */
2496   public static class RowOnlyComparator implements Comparator<KeyValue> {
2497     final KVComparator comparator;
2498 
2499     public RowOnlyComparator(final KVComparator c) {
2500       this.comparator = c;
2501     }
2502 
2503     @Override
2504     public int compare(KeyValue left, KeyValue right) {
2505       return comparator.compareRows(left, right);
2506     }
2507   }
2508 
2509 
2510   /**
2511    * Avoids redundant comparisons for better performance.
2512    *
2513    * TODO get rid of this wart
2514    */
2515   public interface SamePrefixComparator<T> {
2516     /**
2517      * Compare two keys assuming that the first n bytes are the same.
2518      * @param commonPrefix How many bytes are the same.
2519      */
2520     int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
2521         byte[] right, int roffset, int rlength
2522     );
2523   }
2524 
2525   /**
2526    * @deprecated  Not to be used for any comparsions
2527    */
2528   @Deprecated
2529   public static class RawBytesComparator extends KVComparator {
2530     /**
2531      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2532      * instantiate the appropriate comparator.
2533      * TODO: With V3 consider removing this.
2534      * @return legacy class name for FileFileTrailer#comparatorClassName
2535      */
2536     @Override
2537     public String getLegacyKeyComparatorName() {
2538       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2539     }
2540 
2541     /**
2542      * @deprecated Since 0.99.2.
2543      */
2544     @Override
2545     @Deprecated
2546     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2547         int roffset, int rlength) {
2548       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2549     }
2550 
2551     @Override
2552     public int compare(Cell left, Cell right) {
2553       return compareOnlyKeyPortion(left, right);
2554     }
2555 
2556     @Override
2557     @VisibleForTesting
2558     public int compareOnlyKeyPortion(Cell left, Cell right) {
2559       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
2560           left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
2561       if (c != 0) {
2562         return c;
2563       }
2564       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getFamilyArray(), left.getFamilyOffset(),
2565           left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
2566           right.getFamilyLength());
2567       if (c != 0) {
2568         return c;
2569       }
2570       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getQualifierArray(), left.getQualifierOffset(),
2571           left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
2572           right.getQualifierLength());
2573       if (c != 0) {
2574         return c;
2575       }
2576       c = compareTimestamps(left.getTimestamp(), right.getTimestamp());
2577       if (c != 0) {
2578         return c;
2579       }
2580       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
2581     }
2582 
2583     @Override
2584     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2585       return firstKeyInBlock;
2586     }
2587 
2588   }
2589 
2590   /**
2591    * HeapSize implementation
2592    *
2593    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2594    * MemStore.
2595    */
2596   @Override
2597   public long heapSize() {
2598     int sum = 0;
2599     sum += ClassSize.OBJECT;// the KeyValue object itself
2600     sum += ClassSize.REFERENCE;// pointer to "bytes"
2601     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2602     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2603     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2604     sum += Bytes.SIZEOF_LONG;// memstoreTS
2605     return ClassSize.align(sum);
2606   }
2607 
2608   /**
2609    * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[]
2610    * Mainly used in places where we need to compare two cells.  Avoids copying of bytes
2611    * In places like block index keys, we need to compare the key byte[] with a cell.
2612    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
2613    */
2614   public static class KeyOnlyKeyValue extends KeyValue {
2615     private short rowLen = -1;
2616     public KeyOnlyKeyValue() {
2617 
2618     }
2619     public KeyOnlyKeyValue(byte[] b) {
2620       this(b, 0, b.length);
2621     }
2622 
2623     public KeyOnlyKeyValue(byte[] b, int offset, int length) {
2624       this.bytes = b;
2625       this.length = length;
2626       this.offset = offset;
2627       this.rowLen = Bytes.toShort(this.bytes, this.offset);
2628     }
2629 
2630     @Override
2631     public int getKeyOffset() {
2632       return this.offset;
2633     }
2634 
2635     /**
2636      * A setter that helps to avoid object creation every time and whenever
2637      * there is a need to create new KeyOnlyKeyValue.
2638      * @param key
2639      * @param offset
2640      * @param length
2641      */
2642     public void setKey(byte[] key, int offset, int length) {
2643       this.bytes = key;
2644       this.offset = offset;
2645       this.length = length;
2646       this.rowLen = Bytes.toShort(this.bytes, this.offset);
2647     }
2648 
2649     @Override
2650     public byte[] getKey() {
2651       int keylength = getKeyLength();
2652       byte[] key = new byte[keylength];
2653       System.arraycopy(this.bytes, getKeyOffset(), key, 0, keylength);
2654       return key;
2655     }
2656 
2657     @Override
2658     public byte[] getRowArray() {
2659       return bytes;
2660     }
2661 
2662     @Override
2663     public int getRowOffset() {
2664       return getKeyOffset() + Bytes.SIZEOF_SHORT;
2665     }
2666 
2667     @Override
2668     public byte[] getFamilyArray() {
2669       return bytes;
2670     }
2671 
2672     @Override
2673     public byte getFamilyLength() {
2674       return this.bytes[getFamilyOffset() - 1];
2675     }
2676 
2677     @Override
2678     public int getFamilyOffset() {
2679       return this.offset + Bytes.SIZEOF_SHORT + getRowLength() + Bytes.SIZEOF_BYTE;
2680     }
2681 
2682     @Override
2683     public byte[] getQualifierArray() {
2684       return bytes;
2685     }
2686 
2687     @Override
2688     public int getQualifierLength() {
2689       return getQualifierLength(getRowLength(), getFamilyLength());
2690     }
2691 
2692     @Override
2693     public int getQualifierOffset() {
2694       return getFamilyOffset() + getFamilyLength();
2695     }
2696 
2697     @Override
2698     public int getKeyLength() {
2699       return length;
2700     }
2701 
2702     @Override
2703     public short getRowLength() {
2704       return rowLen;
2705     }
2706 
2707     @Override
2708     public byte getTypeByte() {
2709       return this.bytes[this.offset + getKeyLength() - 1];
2710     }
2711 
2712     private int getQualifierLength(int rlength, int flength) {
2713       return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
2714     }
2715 
2716     @Override
2717     public long getTimestamp() {
2718       int tsOffset = getTimestampOffset();
2719       return Bytes.toLong(this.bytes, tsOffset);
2720     }
2721 
2722     @Override
2723     public int getTimestampOffset() {
2724       return getKeyOffset() + getKeyLength() - TIMESTAMP_TYPE_SIZE;
2725     }
2726 
2727     @Override
2728     public byte[] getTagsArray() {
2729       return HConstants.EMPTY_BYTE_ARRAY;
2730     }
2731 
2732     @Override
2733     public int getTagsOffset() {
2734       return 0;
2735     }
2736 
2737     @Override
2738     public byte[] getValueArray() {
2739       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2740     }
2741 
2742     @Override
2743     public int getValueOffset() {
2744       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2745     }
2746 
2747     @Override
2748     public int getValueLength() {
2749       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2750     }
2751 
2752     @Override
2753     public int getTagsLength() {
2754       return 0;
2755     }
2756 
2757     @Override
2758     public String toString() {
2759       if (this.bytes == null || this.bytes.length == 0) {
2760         return "empty";
2761       }
2762       return keyToString(this.bytes, this.offset, getKeyLength()) + "/vlen=0/mvcc=0";
2763     }
2764 
2765     @Override
2766     public int hashCode() {
2767       return super.hashCode();
2768     }
2769 
2770     @Override
2771     public boolean equals(Object other) {
2772       return super.equals(other);
2773     }
2774 
2775     @Override
2776     public long heapSize() {
2777       return super.heapSize() + Bytes.SIZEOF_SHORT;
2778     }
2779   }
2780 }