View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.OutputStream;
30  import java.nio.ByteBuffer;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.util.StreamUtils;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.ClassSize;
45  import org.apache.hadoop.io.IOUtils;
46  import org.apache.hadoop.io.RawComparator;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  
50  /**
51   * An HBase Key/Value. This is the fundamental HBase Type.
52   * <p>
53   * HBase applications and users should use the Cell interface and avoid directly using KeyValue
54   * and member functions not defined in Cell.
55   * <p>
56   * If being used client-side, the primary methods to access individual fields are {@link #getRow()},
57   * {@link #getFamily()}, {@link #getQualifier()}, {@link #getTimestamp()}, and {@link #getValue()}.
58   * These methods allocate new byte arrays and return copies. Avoid their use server-side.
59   * <p>
60   * Instances of this class are immutable. They do not implement Comparable but Comparators are
61   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
62   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
63   * Hfiles, and bloom filter keys.
64   * <p>
65   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
66   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
67   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code> Key is further decomposed as:
68   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier>
69   * &lt;timestamp> &lt;keytype></code>
70   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>, column family length maximum
71   * is <code>Byte.MAX_SIZE</code>, and column qualifier + key length must be <
72   * <code>Integer.MAX_SIZE</code>. The column does not contain the family/qualifier delimiter,
73   * {@link #COLUMN_FAMILY_DELIMITER}<br>
74   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
75   * the value part. The format for this part is: <code>&lt;tagslength>&lt;tagsbytes></code>.
76   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
77   * contain one or more tags where as each tag is of the form
78   * <code>&lt;taglength>&lt;tagtype>&lt;tagbytes></code>.  <code>tagtype</code> is one byte and
79   * <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type length
80   * and actual tag bytes length.
81   */
82  @InterfaceAudience.Private
83  public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, SettableTimestamp {
84    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
85  
86    static final Log LOG = LogFactory.getLog(KeyValue.class);
87  
88    /**
89     * Colon character in UTF-8
90     */
91    public static final char COLUMN_FAMILY_DELIMITER = ':';
92  
93    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
94      new byte[]{COLUMN_FAMILY_DELIMITER};
95  
96    /**
97     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
98     * of KeyValue only.
99     */
100   public static final KVComparator COMPARATOR = new KVComparator();
101   /**
102    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
103    * {@link KeyValue}s.
104    */
105   public static final KVComparator META_COMPARATOR = new MetaComparator();
106 
107   /**
108    * Needed for Bloom Filters.
109    */
110   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
111 
112   /** Size of the key length field in bytes*/
113   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
114 
115   /** Size of the key type field in bytes */
116   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
117 
118   /** Size of the row length field in bytes */
119   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
120 
121   /** Size of the family length field in bytes */
122   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
123 
124   /** Size of the timestamp field in bytes */
125   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
126 
127   // Size of the timestamp and type byte on end of a key -- a long + a byte.
128   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
129 
130   // Size of the length shorts and bytes in key.
131   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
132       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
133 
134   // How far into the key the row starts at. First thing to read is the short
135   // that says how long the row is.
136   public static final int ROW_OFFSET =
137     Bytes.SIZEOF_INT /*keylength*/ +
138     Bytes.SIZEOF_INT /*valuelength*/;
139 
140   // Size of the length ints in a KeyValue datastructure.
141   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
142 
143   /** Size of the tags length field in bytes */
144   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
145 
146   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
147 
148   private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
149 
150   /**
151    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
152    * characteristics would take up for its underlying data structure.
153    *
154    * @param rlength row length
155    * @param flength family length
156    * @param qlength qualifier length
157    * @param vlength value length
158    *
159    * @return the <code>KeyValue</code> data structure length
160    */
161   public static long getKeyValueDataStructureSize(int rlength,
162       int flength, int qlength, int vlength) {
163     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
164         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
165   }
166 
167   /**
168    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
169    * characteristics would take up for its underlying data structure.
170    *
171    * @param rlength row length
172    * @param flength family length
173    * @param qlength qualifier length
174    * @param vlength value length
175    * @param tagsLength total length of the tags
176    *
177    * @return the <code>KeyValue</code> data structure length
178    */
179   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
180       int vlength, int tagsLength) {
181     if (tagsLength == 0) {
182       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
183     }
184     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
185         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
186   }
187 
188   /**
189    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
190    * characteristics would take up for its underlying data structure.
191    *
192    * @param klength key length
193    * @param vlength value length
194    * @param tagsLength total length of the tags
195    *
196    * @return the <code>KeyValue</code> data structure length
197    */
198   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
199     if (tagsLength == 0) {
200       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
201     }
202     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
203   }
204 
205   /**
206    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
207    * characteristics would take up in its underlying data structure for the key.
208    *
209    * @param rlength row length
210    * @param flength family length
211    * @param qlength qualifier length
212    *
213    * @return the key data structure length
214    */
215   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
216     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
217   }
218 
219   /**
220    * Key type.
221    * Has space for other key types to be added later.  Cannot rely on
222    * enum ordinals . They change if item is removed or moved.  Do our own codes.
223    */
224   public static enum Type {
225     Minimum((byte)0),
226     Put((byte)4),
227 
228     Delete((byte)8),
229     DeleteFamilyVersion((byte)10),
230     DeleteColumn((byte)12),
231     DeleteFamily((byte)14),
232 
233     // Maximum is used when searching; you look from maximum on down.
234     Maximum((byte)255);
235 
236     private final byte code;
237 
238     Type(final byte c) {
239       this.code = c;
240     }
241 
242     public byte getCode() {
243       return this.code;
244     }
245 
246     /**
247      * Cannot rely on enum ordinals . They change if item is removed or moved.
248      * Do our own codes.
249      * @param b
250      * @return Type associated with passed code.
251      */
252     public static Type codeToType(final byte b) {
253       for (Type t : Type.values()) {
254         if (t.getCode() == b) {
255           return t;
256         }
257       }
258       throw new RuntimeException("Unknown code " + b);
259     }
260   }
261 
262   /**
263    * Lowest possible key.
264    * Makes a Key with highest possible Timestamp, empty row and column.  No
265    * key can be equal or lower than this one in memstore or in store file.
266    */
267   public static final KeyValue LOWESTKEY =
268     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
269 
270   ////
271   // KeyValue core instance fields.
272   protected byte [] bytes = null;  // an immutable byte array that contains the KV
273   protected int offset = 0;  // offset into bytes buffer KV starts at
274   protected int length = 0;  // length of the KV starting from offset.
275 
276   /**
277    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
278    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
279    * KeyValue type.
280    */
281   public static boolean isDelete(byte t) {
282     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
283   }
284 
285   /** Here be dragons **/
286 
287   // used to achieve atomic operations in the memstore.
288   @Override
289   public long getMvccVersion() {
290     return this.getSequenceId();
291   }
292 
293   /**
294    * used to achieve atomic operations in the memstore.
295    */
296   @Override
297   public long getSequenceId() {
298     return seqId;
299   }
300 
301   @Override
302   public void setSequenceId(long seqId) {
303     this.seqId = seqId;
304   }
305 
306   // multi-version concurrency control version.  default value is 0, aka do not care.
307   private long seqId = 0;
308 
309   /** Dragon time over, return to normal business */
310 
311 
312   /** Writable Constructor -- DO NOT USE */
313   public KeyValue() {}
314 
315   /**
316    * Creates a KeyValue from the start of the specified byte array.
317    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
318    * @param bytes byte array
319    */
320   public KeyValue(final byte [] bytes) {
321     this(bytes, 0);
322   }
323 
324   /**
325    * Creates a KeyValue from the specified byte array and offset.
326    * Presumes <code>bytes</code> content starting at <code>offset</code> is
327    * formatted as a KeyValue blob.
328    * @param bytes byte array
329    * @param offset offset to start of KeyValue
330    */
331   public KeyValue(final byte [] bytes, final int offset) {
332     this(bytes, offset, getLength(bytes, offset));
333   }
334 
335   /**
336    * Creates a KeyValue from the specified byte array, starting at offset, and
337    * for length <code>length</code>.
338    * @param bytes byte array
339    * @param offset offset to start of the KeyValue
340    * @param length length of the KeyValue
341    */
342   public KeyValue(final byte [] bytes, final int offset, final int length) {
343     this.bytes = bytes;
344     this.offset = offset;
345     this.length = length;
346   }
347 
348   /**
349    * Creates a KeyValue from the specified byte array, starting at offset, and
350    * for length <code>length</code>.
351    *
352    * @param bytes  byte array
353    * @param offset offset to start of the KeyValue
354    * @param length length of the KeyValue
355    * @param ts
356    */
357   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
358     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
359   }
360 
361   /** Constructors that build a new backing byte array from fields */
362 
363   /**
364    * Constructs KeyValue structure filled with null value.
365    * Sets type to {@link KeyValue.Type#Maximum}
366    * @param row - row key (arbitrary byte array)
367    * @param timestamp
368    */
369   public KeyValue(final byte [] row, final long timestamp) {
370     this(row, null, null, timestamp, Type.Maximum, null);
371   }
372 
373   /**
374    * Constructs KeyValue structure filled with null value.
375    * @param row - row key (arbitrary byte array)
376    * @param timestamp
377    */
378   public KeyValue(final byte [] row, final long timestamp, Type type) {
379     this(row, null, null, timestamp, type, null);
380   }
381 
382   /**
383    * Constructs KeyValue structure filled with null value.
384    * Sets type to {@link KeyValue.Type#Maximum}
385    * @param row - row key (arbitrary byte array)
386    * @param family family name
387    * @param qualifier column qualifier
388    */
389   public KeyValue(final byte [] row, final byte [] family,
390       final byte [] qualifier) {
391     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
392   }
393 
394   /**
395    * Constructs KeyValue structure as a put filled with specified values and
396    * LATEST_TIMESTAMP.
397    * @param row - row key (arbitrary byte array)
398    * @param family family name
399    * @param qualifier column qualifier
400    */
401   public KeyValue(final byte [] row, final byte [] family,
402       final byte [] qualifier, final byte [] value) {
403     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
404   }
405 
406   /**
407    * Constructs KeyValue structure filled with specified values.
408    * @param row row key
409    * @param family family name
410    * @param qualifier column qualifier
411    * @param timestamp version timestamp
412    * @param type key type
413    * @throws IllegalArgumentException
414    */
415   public KeyValue(final byte[] row, final byte[] family,
416       final byte[] qualifier, final long timestamp, Type type) {
417     this(row, family, qualifier, timestamp, type, null);
418   }
419 
420   /**
421    * Constructs KeyValue structure filled with specified values.
422    * @param row row key
423    * @param family family name
424    * @param qualifier column qualifier
425    * @param timestamp version timestamp
426    * @param value column value
427    * @throws IllegalArgumentException
428    */
429   public KeyValue(final byte[] row, final byte[] family,
430       final byte[] qualifier, final long timestamp, final byte[] value) {
431     this(row, family, qualifier, timestamp, Type.Put, value);
432   }
433 
434   /**
435    * Constructs KeyValue structure filled with specified values.
436    * @param row row key
437    * @param family family name
438    * @param qualifier column qualifier
439    * @param timestamp version timestamp
440    * @param value column value
441    * @param tags tags
442    * @throws IllegalArgumentException
443    */
444   public KeyValue(final byte[] row, final byte[] family,
445       final byte[] qualifier, final long timestamp, final byte[] value,
446       final Tag[] tags) {
447     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
448   }
449 
450   /**
451    * Constructs KeyValue structure filled with specified values.
452    * @param row row key
453    * @param family family name
454    * @param qualifier column qualifier
455    * @param timestamp version timestamp
456    * @param value column value
457    * @param tags tags non-empty list of tags or null
458    * @throws IllegalArgumentException
459    */
460   public KeyValue(final byte[] row, final byte[] family,
461       final byte[] qualifier, final long timestamp, final byte[] value,
462       final List<Tag> tags) {
463     this(row, 0, row==null ? 0 : row.length,
464       family, 0, family==null ? 0 : family.length,
465       qualifier, 0, qualifier==null ? 0 : qualifier.length,
466       timestamp, Type.Put,
467       value, 0, value==null ? 0 : value.length, tags);
468   }
469 
470   /**
471    * Constructs KeyValue structure filled with specified values.
472    * @param row row key
473    * @param family family name
474    * @param qualifier column qualifier
475    * @param timestamp version timestamp
476    * @param type key type
477    * @param value column value
478    * @throws IllegalArgumentException
479    */
480   public KeyValue(final byte[] row, final byte[] family,
481       final byte[] qualifier, final long timestamp, Type type,
482       final byte[] value) {
483     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
484         timestamp, type,   value, 0, len(value));
485   }
486 
487   /**
488    * Constructs KeyValue structure filled with specified values.
489    * <p>
490    * Column is split into two fields, family and qualifier.
491    * @param row row key
492    * @param family family name
493    * @param qualifier column qualifier
494    * @param timestamp version timestamp
495    * @param type key type
496    * @param value column value
497    * @throws IllegalArgumentException
498    */
499   public KeyValue(final byte[] row, final byte[] family,
500       final byte[] qualifier, final long timestamp, Type type,
501       final byte[] value, final List<Tag> tags) {
502     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
503         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
504   }
505 
506   /**
507    * Constructs KeyValue structure filled with specified values.
508    * @param row row key
509    * @param family family name
510    * @param qualifier column qualifier
511    * @param timestamp version timestamp
512    * @param type key type
513    * @param value column value
514    * @throws IllegalArgumentException
515    */
516   public KeyValue(final byte[] row, final byte[] family,
517       final byte[] qualifier, final long timestamp, Type type,
518       final byte[] value, final byte[] tags) {
519     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
520         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
521   }
522 
523   /**
524    * Constructs KeyValue structure filled with specified values.
525    * @param row row key
526    * @param family family name
527    * @param qualifier column qualifier
528    * @param qoffset qualifier offset
529    * @param qlength qualifier length
530    * @param timestamp version timestamp
531    * @param type key type
532    * @param value column value
533    * @param voffset value offset
534    * @param vlength value length
535    * @throws IllegalArgumentException
536    */
537   public KeyValue(byte [] row, byte [] family,
538       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
539       byte [] value, int voffset, int vlength, List<Tag> tags) {
540     this(row, 0, row==null ? 0 : row.length,
541         family, 0, family==null ? 0 : family.length,
542         qualifier, qoffset, qlength, timestamp, type,
543         value, voffset, vlength, tags);
544   }
545 
546   /**
547    * @param row
548    * @param family
549    * @param qualifier
550    * @param qoffset
551    * @param qlength
552    * @param timestamp
553    * @param type
554    * @param value
555    * @param voffset
556    * @param vlength
557    * @param tags
558    */
559   public KeyValue(byte [] row, byte [] family,
560       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
561       byte [] value, int voffset, int vlength, byte[] tags) {
562     this(row, 0, row==null ? 0 : row.length,
563         family, 0, family==null ? 0 : family.length,
564         qualifier, qoffset, qlength, timestamp, type,
565         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
566   }
567 
568   /**
569    * Constructs KeyValue structure filled with specified values.
570    * <p>
571    * Column is split into two fields, family and qualifier.
572    * @param row row key
573    * @throws IllegalArgumentException
574    */
575   public KeyValue(final byte [] row, final int roffset, final int rlength,
576       final byte [] family, final int foffset, final int flength,
577       final byte [] qualifier, final int qoffset, final int qlength,
578       final long timestamp, final Type type,
579       final byte [] value, final int voffset, final int vlength) {
580     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
581       qlength, timestamp, type, value, voffset, vlength, null);
582   }
583 
584   /**
585    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
586    * data buffer.
587    * <p>
588    * Column is split into two fields, family and qualifier.
589    *
590    * @param buffer the bytes buffer to use
591    * @param boffset buffer offset
592    * @param row row key
593    * @param roffset row offset
594    * @param rlength row length
595    * @param family family name
596    * @param foffset family offset
597    * @param flength family length
598    * @param qualifier column qualifier
599    * @param qoffset qualifier offset
600    * @param qlength qualifier length
601    * @param timestamp version timestamp
602    * @param type key type
603    * @param value column value
604    * @param voffset value offset
605    * @param vlength value length
606    * @param tags non-empty list of tags or null
607    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
608    * remaining in the buffer
609    */
610   public KeyValue(byte [] buffer, final int boffset,
611       final byte [] row, final int roffset, final int rlength,
612       final byte [] family, final int foffset, final int flength,
613       final byte [] qualifier, final int qoffset, final int qlength,
614       final long timestamp, final Type type,
615       final byte [] value, final int voffset, final int vlength,
616       final Tag[] tags) {
617      this.bytes  = buffer;
618      this.length = writeByteArray(buffer, boffset,
619          row, roffset, rlength,
620          family, foffset, flength, qualifier, qoffset, qlength,
621         timestamp, type, value, voffset, vlength, tags);
622      this.offset = boffset;
623    }
624 
625   /**
626    * Constructs KeyValue structure filled with specified values.
627    * <p>
628    * Column is split into two fields, family and qualifier.
629    * @param row row key
630    * @param roffset row offset
631    * @param rlength row length
632    * @param family family name
633    * @param foffset family offset
634    * @param flength family length
635    * @param qualifier column qualifier
636    * @param qoffset qualifier offset
637    * @param qlength qualifier length
638    * @param timestamp version timestamp
639    * @param type key type
640    * @param value column value
641    * @param voffset value offset
642    * @param vlength value length
643    * @param tags tags
644    * @throws IllegalArgumentException
645    */
646   public KeyValue(final byte [] row, final int roffset, final int rlength,
647       final byte [] family, final int foffset, final int flength,
648       final byte [] qualifier, final int qoffset, final int qlength,
649       final long timestamp, final Type type,
650       final byte [] value, final int voffset, final int vlength,
651       final List<Tag> tags) {
652     this.bytes = createByteArray(row, roffset, rlength,
653         family, foffset, flength, qualifier, qoffset, qlength,
654         timestamp, type, value, voffset, vlength, tags);
655     this.length = bytes.length;
656     this.offset = 0;
657   }
658 
659   /**
660    * @param row
661    * @param roffset
662    * @param rlength
663    * @param family
664    * @param foffset
665    * @param flength
666    * @param qualifier
667    * @param qoffset
668    * @param qlength
669    * @param timestamp
670    * @param type
671    * @param value
672    * @param voffset
673    * @param vlength
674    * @param tags
675    */
676   public KeyValue(final byte [] row, final int roffset, final int rlength,
677       final byte [] family, final int foffset, final int flength,
678       final byte [] qualifier, final int qoffset, final int qlength,
679       final long timestamp, final Type type,
680       final byte [] value, final int voffset, final int vlength,
681       final byte[] tags, final int tagsOffset, final int tagsLength) {
682     this.bytes = createByteArray(row, roffset, rlength,
683         family, foffset, flength, qualifier, qoffset, qlength,
684         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
685     this.length = bytes.length;
686     this.offset = 0;
687   }
688 
689   /**
690    * Constructs an empty KeyValue structure, with specified sizes.
691    * This can be used to partially fill up KeyValues.
692    * <p>
693    * Column is split into two fields, family and qualifier.
694    * @param rlength row length
695    * @param flength family length
696    * @param qlength qualifier length
697    * @param timestamp version timestamp
698    * @param type key type
699    * @param vlength value length
700    * @throws IllegalArgumentException
701    */
702   public KeyValue(final int rlength,
703       final int flength,
704       final int qlength,
705       final long timestamp, final Type type,
706       final int vlength) {
707     this(rlength, flength, qlength, timestamp, type, vlength, 0);
708   }
709 
710   /**
711    * Constructs an empty KeyValue structure, with specified sizes.
712    * This can be used to partially fill up KeyValues.
713    * <p>
714    * Column is split into two fields, family and qualifier.
715    * @param rlength row length
716    * @param flength family length
717    * @param qlength qualifier length
718    * @param timestamp version timestamp
719    * @param type key type
720    * @param vlength value length
721    * @param tagsLength
722    * @throws IllegalArgumentException
723    */
724   public KeyValue(final int rlength,
725       final int flength,
726       final int qlength,
727       final long timestamp, final Type type,
728       final int vlength, final int tagsLength) {
729     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
730         tagsLength);
731     this.length = bytes.length;
732     this.offset = 0;
733   }
734 
735 
736   public KeyValue(byte[] row, int roffset, int rlength,
737                   byte[] family, int foffset, int flength,
738                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
739     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
740         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
741         value, 0, value == null ? 0 : value.remaining(), tags);
742     this.length = bytes.length;
743     this.offset = 0;
744   }
745 
746   public KeyValue(Cell c) {
747     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
748         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
749         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
750         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
751         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
752     this.seqId = c.getSequenceId();
753   }
754 
755   /**
756    * Create a KeyValue that is smaller than all other possible KeyValues
757    * for the given row. That is any (valid) KeyValue on 'row' would sort
758    * _after_ the result.
759    *
760    * @param row - row key (arbitrary byte array)
761    * @return First possible KeyValue on passed <code>row</code>
762    * @deprecated Since 0.99.2. Use {@link KeyValueUtil#createFirstOnRow(byte [])} instead
763    */
764   @Deprecated
765   public static KeyValue createFirstOnRow(final byte [] row) {
766     return KeyValueUtil.createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
767   }
768 
769   /**
770    * Create a KeyValue for the specified row, family and qualifier that would be
771    * smaller than all other possible KeyValues that have the same row,family,qualifier.
772    * Used for seeking.
773    * @param row - row key (arbitrary byte array)
774    * @param family - family name
775    * @param qualifier - column qualifier
776    * @return First possible key on passed <code>row</code>, and column.
777    * @deprecated Since 0.99.2. Use {@link KeyValueUtil#createFirstOnRow(byte[], byte[], byte[])}
778    * instead
779    */
780   @Deprecated
781   public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
782       final byte [] qualifier) {
783     return KeyValueUtil.createFirstOnRow(row, family, qualifier);
784   }
785 
786   /**
787    * Create a KeyValue for the specified row, family and qualifier that would be
788    * smaller than all other possible KeyValues that have the same row,
789    * family, qualifier.
790    * Used for seeking.
791    * @param row row key
792    * @param roffset row offset
793    * @param rlength row length
794    * @param family family name
795    * @param foffset family offset
796    * @param flength family length
797    * @param qualifier column qualifier
798    * @param qoffset qualifier offset
799    * @param qlength qualifier length
800    * @return First possible key on passed Row, Family, Qualifier.
801    * @deprecated Since 0.99.2. Use {@link KeyValueUtil#createFirstOnRow(byte[], int, int,
802    * byte[], int, int, byte[], int, int)} instead
803    */
804   @Deprecated
805   public static KeyValue createFirstOnRow(final byte [] row,
806       final int roffset, final int rlength, final byte [] family,
807       final int foffset, final int flength, final byte [] qualifier,
808       final int qoffset, final int qlength) {
809     return new KeyValue(row, roffset, rlength, family,
810         foffset, flength, qualifier, qoffset, qlength,
811         HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
812   }
813 
814   /**
815    * Create an empty byte[] representing a KeyValue
816    * All lengths are preset and can be filled in later.
817    * @param rlength
818    * @param flength
819    * @param qlength
820    * @param timestamp
821    * @param type
822    * @param vlength
823    * @return The newly created byte array.
824    */
825   private static byte[] createEmptyByteArray(final int rlength, int flength,
826       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
827     if (rlength > Short.MAX_VALUE) {
828       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
829     }
830     if (flength > Byte.MAX_VALUE) {
831       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
832     }
833     // Qualifier length
834     if (qlength > Integer.MAX_VALUE - rlength - flength) {
835       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
836     }
837     checkForTagsLength(tagsLength);
838     // Key length
839     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
840     if (longkeylength > Integer.MAX_VALUE) {
841       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
842         Integer.MAX_VALUE);
843     }
844     int keylength = (int)longkeylength;
845     // Value length
846     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
847       throw new IllegalArgumentException("Valuer > " +
848           HConstants.MAXIMUM_VALUE_LENGTH);
849     }
850 
851     // Allocate right-sized byte array.
852     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
853         tagsLength)];
854     // Write the correct size markers
855     int pos = 0;
856     pos = Bytes.putInt(bytes, pos, keylength);
857     pos = Bytes.putInt(bytes, pos, vlength);
858     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
859     pos += rlength;
860     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
861     pos += flength + qlength;
862     pos = Bytes.putLong(bytes, pos, timestamp);
863     pos = Bytes.putByte(bytes, pos, type.getCode());
864     pos += vlength;
865     if (tagsLength > 0) {
866       pos = Bytes.putAsShort(bytes, pos, tagsLength);
867     }
868     return bytes;
869   }
870 
871   /**
872    * Checks the parameters passed to a constructor.
873    *
874    * @param row row key
875    * @param rlength row length
876    * @param family family name
877    * @param flength family length
878    * @param qlength qualifier length
879    * @param vlength value length
880    *
881    * @throws IllegalArgumentException an illegal value was passed
882    */
883   private static void checkParameters(final byte [] row, final int rlength,
884       final byte [] family, int flength, int qlength, int vlength)
885           throws IllegalArgumentException {
886     if (rlength > Short.MAX_VALUE) {
887       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
888     }
889     if (row == null) {
890       throw new IllegalArgumentException("Row is null");
891     }
892     // Family length
893     flength = family == null ? 0 : flength;
894     if (flength > Byte.MAX_VALUE) {
895       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
896     }
897     // Qualifier length
898     if (qlength > Integer.MAX_VALUE - rlength - flength) {
899       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
900     }
901     // Key length
902     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
903     if (longKeyLength > Integer.MAX_VALUE) {
904       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
905           Integer.MAX_VALUE);
906     }
907     // Value length
908     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
909       throw new IllegalArgumentException("Value length " + vlength + " > " +
910           HConstants.MAXIMUM_VALUE_LENGTH);
911     }
912   }
913 
914   /**
915    * Write KeyValue format into the provided byte array.
916    *
917    * @param buffer the bytes buffer to use
918    * @param boffset buffer offset
919    * @param row row key
920    * @param roffset row offset
921    * @param rlength row length
922    * @param family family name
923    * @param foffset family offset
924    * @param flength family length
925    * @param qualifier column qualifier
926    * @param qoffset qualifier offset
927    * @param qlength qualifier length
928    * @param timestamp version timestamp
929    * @param type key type
930    * @param value column value
931    * @param voffset value offset
932    * @param vlength value length
933    *
934    * @return The number of useful bytes in the buffer.
935    *
936    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
937    * remaining in the buffer
938    */
939   public static int writeByteArray(byte [] buffer, final int boffset,
940       final byte [] row, final int roffset, final int rlength,
941       final byte [] family, final int foffset, int flength,
942       final byte [] qualifier, final int qoffset, int qlength,
943       final long timestamp, final Type type,
944       final byte [] value, final int voffset, int vlength, Tag[] tags) {
945 
946     checkParameters(row, rlength, family, flength, qlength, vlength);
947 
948     // Calculate length of tags area
949     int tagsLength = 0;
950     if (tags != null && tags.length > 0) {
951       for (Tag t: tags) {
952         tagsLength += t.getLength();
953       }
954     }
955     checkForTagsLength(tagsLength);
956     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
957     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
958         tagsLength);
959     if (keyValueLength > buffer.length - boffset) {
960       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
961           keyValueLength);
962     }
963 
964     // Write key, value and key row length.
965     int pos = boffset;
966     pos = Bytes.putInt(buffer, pos, keyLength);
967     pos = Bytes.putInt(buffer, pos, vlength);
968     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
969     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
970     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
971     if (flength != 0) {
972       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
973     }
974     if (qlength != 0) {
975       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
976     }
977     pos = Bytes.putLong(buffer, pos, timestamp);
978     pos = Bytes.putByte(buffer, pos, type.getCode());
979     if (value != null && value.length > 0) {
980       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
981     }
982     // Write the number of tags. If it is 0 then it means there are no tags.
983     if (tagsLength > 0) {
984       pos = Bytes.putAsShort(buffer, pos, tagsLength);
985       for (Tag t : tags) {
986         pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
987       }
988     }
989     return keyValueLength;
990   }
991 
992   private static void checkForTagsLength(int tagsLength) {
993     if (tagsLength > MAX_TAGS_LENGTH) {
994       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
995     }
996   }
997 
998   /**
999    * Write KeyValue format into a byte array.
1000    * @param row row key
1001    * @param roffset row offset
1002    * @param rlength row length
1003    * @param family family name
1004    * @param foffset family offset
1005    * @param flength family length
1006    * @param qualifier column qualifier
1007    * @param qoffset qualifier offset
1008    * @param qlength qualifier length
1009    * @param timestamp version timestamp
1010    * @param type key type
1011    * @param value column value
1012    * @param voffset value offset
1013    * @param vlength value length
1014    * @return The newly created byte array.
1015    */
1016   private static byte [] createByteArray(final byte [] row, final int roffset,
1017       final int rlength, final byte [] family, final int foffset, int flength,
1018       final byte [] qualifier, final int qoffset, int qlength,
1019       final long timestamp, final Type type,
1020       final byte [] value, final int voffset,
1021       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
1022 
1023     checkParameters(row, rlength, family, flength, qlength, vlength);
1024     checkForTagsLength(tagsLength);
1025     // Allocate right-sized byte array.
1026     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1027     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1028       tagsLength)];
1029     // Write key, value and key row length.
1030     int pos = 0;
1031     pos = Bytes.putInt(bytes, pos, keyLength);
1032     pos = Bytes.putInt(bytes, pos, vlength);
1033     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1034     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1035     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1036     if(flength != 0) {
1037       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1038     }
1039     if(qlength != 0) {
1040       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
1041     }
1042     pos = Bytes.putLong(bytes, pos, timestamp);
1043     pos = Bytes.putByte(bytes, pos, type.getCode());
1044     if (value != null && value.length > 0) {
1045       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
1046     }
1047     // Add the tags after the value part
1048     if (tagsLength > 0) {
1049       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1050       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
1051     }
1052     return bytes;
1053   }
1054 
1055   /**
1056    * @param qualifier can be a ByteBuffer or a byte[], or null.
1057    * @param value can be a ByteBuffer or a byte[], or null.
1058    */
1059   private static byte [] createByteArray(final byte [] row, final int roffset,
1060       final int rlength, final byte [] family, final int foffset, int flength,
1061       final Object qualifier, final int qoffset, int qlength,
1062       final long timestamp, final Type type,
1063       final Object value, final int voffset, int vlength, List<Tag> tags) {
1064 
1065     checkParameters(row, rlength, family, flength, qlength, vlength);
1066 
1067     // Calculate length of tags area
1068     int tagsLength = 0;
1069     if (tags != null && !tags.isEmpty()) {
1070       for (Tag t : tags) {
1071         tagsLength += t.getLength();
1072       }
1073     }
1074     checkForTagsLength(tagsLength);
1075     // Allocate right-sized byte array.
1076     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1077     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1078         tagsLength)];
1079 
1080     // Write key, value and key row length.
1081     int pos = 0;
1082     pos = Bytes.putInt(bytes, pos, keyLength);
1083 
1084     pos = Bytes.putInt(bytes, pos, vlength);
1085     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1086     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1087     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1088     if(flength != 0) {
1089       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1090     }
1091     if (qlength > 0) {
1092       if (qualifier instanceof ByteBuffer) {
1093         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1094       } else {
1095         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1096       }
1097     }
1098     pos = Bytes.putLong(bytes, pos, timestamp);
1099     pos = Bytes.putByte(bytes, pos, type.getCode());
1100     if (vlength > 0) {
1101       if (value instanceof ByteBuffer) {
1102         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1103       } else {
1104         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1105       }
1106     }
1107     // Add the tags after the value part
1108     if (tagsLength > 0) {
1109       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1110       for (Tag t : tags) {
1111         pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
1112       }
1113     }
1114     return bytes;
1115   }
1116 
1117   /**
1118    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1119    */
1120   @Override
1121   public boolean equals(Object other) {
1122     if (!(other instanceof Cell)) {
1123       return false;
1124     }
1125     return CellComparator.equals(this, (Cell)other);
1126   }
1127 
1128   /**
1129    * In line with {@link #equals(Object)}, only uses the key portion, not the value.
1130    */
1131   @Override
1132   public int hashCode() {
1133     return CellComparator.hashCodeIgnoreMvcc(this);
1134   }
1135 
1136   //---------------------------------------------------------------------------
1137   //
1138   //  KeyValue cloning
1139   //
1140   //---------------------------------------------------------------------------
1141 
1142   /**
1143    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1144    * @return Fully copied clone of this KeyValue
1145    * @throws CloneNotSupportedException
1146    */
1147   @Override
1148   public KeyValue clone() throws CloneNotSupportedException {
1149     super.clone();
1150     byte [] b = new byte[this.length];
1151     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1152     KeyValue ret = new KeyValue(b, 0, b.length);
1153     // Important to clone the memstoreTS as well - otherwise memstore's
1154     // update-in-place methods (eg increment) will end up creating
1155     // new entries
1156     ret.setSequenceId(seqId);
1157     return ret;
1158   }
1159 
1160   /**
1161    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1162    * http://en.wikipedia.org/wiki/Object_copy
1163    * @return Shallow copy of this KeyValue
1164    */
1165   public KeyValue shallowCopy() {
1166     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1167     shallowCopy.setSequenceId(this.seqId);
1168     return shallowCopy;
1169   }
1170 
1171   //---------------------------------------------------------------------------
1172   //
1173   //  String representation
1174   //
1175   //---------------------------------------------------------------------------
1176 
1177   @Override
1178   public String toString() {
1179     if (this.bytes == null || this.bytes.length == 0) {
1180       return "empty";
1181     }
1182     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
1183       + getValueLength() + "/seqid=" + seqId;
1184   }
1185 
1186   /**
1187    * @param k Key portion of a KeyValue.
1188    * @return Key as a String, empty string if k is null.
1189    */
1190   public static String keyToString(final byte [] k) {
1191     if (k == null) {
1192       return "";
1193     }
1194     return keyToString(k, 0, k.length);
1195   }
1196 
1197   /**
1198    * Produces a string map for this key/value pair. Useful for programmatic use
1199    * and manipulation of the data stored in an WALKey, for example, printing
1200    * as JSON. Values are left out due to their tendency to be large. If needed,
1201    * they can be added manually.
1202    *
1203    * @return the Map<String,?> containing data from this key
1204    */
1205   public Map<String, Object> toStringMap() {
1206     Map<String, Object> stringMap = new HashMap<String, Object>();
1207     stringMap.put("row", Bytes.toStringBinary(getRow()));
1208     stringMap.put("family", Bytes.toStringBinary(getFamily()));
1209     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
1210     stringMap.put("timestamp", getTimestamp());
1211     stringMap.put("vlen", getValueLength());
1212     List<Tag> tags = getTags();
1213     if (tags != null) {
1214       List<String> tagsString = new ArrayList<String>();
1215       for (Tag t : tags) {
1216         tagsString.add((t.getType()) + ":" +Bytes.toStringBinary(t.getValue()));
1217       }
1218       stringMap.put("tag", tagsString);
1219     }
1220     return stringMap;
1221   }
1222 
1223   /**
1224    * Use for logging.
1225    * @param b Key portion of a KeyValue.
1226    * @param o Offset to start of key
1227    * @param l Length of key.
1228    * @return Key as a String.
1229    */
1230   public static String keyToString(final byte [] b, final int o, final int l) {
1231     if (b == null) return "";
1232     int rowlength = Bytes.toShort(b, o);
1233     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1234     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1235     int familylength = b[columnoffset - 1];
1236     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1237     String family = familylength == 0? "":
1238       Bytes.toStringBinary(b, columnoffset, familylength);
1239     String qualifier = columnlength == 0? "":
1240       Bytes.toStringBinary(b, columnoffset + familylength,
1241       columnlength - familylength);
1242     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1243     String timestampStr = humanReadableTimestamp(timestamp);
1244     byte type = b[o + l - 1];
1245     return row + "/" + family +
1246       (family != null && family.length() > 0? ":" :"") +
1247       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1248   }
1249 
1250   public static String humanReadableTimestamp(final long timestamp) {
1251     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1252       return "LATEST_TIMESTAMP";
1253     }
1254     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1255       return "OLDEST_TIMESTAMP";
1256     }
1257     return String.valueOf(timestamp);
1258   }
1259 
1260   //---------------------------------------------------------------------------
1261   //
1262   //  Public Member Accessors
1263   //
1264   //---------------------------------------------------------------------------
1265 
1266   /**
1267    * @return The byte array backing this KeyValue.
1268    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1269    */
1270   @Deprecated
1271   public byte [] getBuffer() {
1272     return this.bytes;
1273   }
1274 
1275   /**
1276    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1277    */
1278   public int getOffset() {
1279     return this.offset;
1280   }
1281 
1282   /**
1283    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1284    */
1285   public int getLength() {
1286     return length;
1287   }
1288 
1289   //---------------------------------------------------------------------------
1290   //
1291   //  Length and Offset Calculators
1292   //
1293   //---------------------------------------------------------------------------
1294 
1295   /**
1296    * Determines the total length of the KeyValue stored in the specified
1297    * byte array and offset.  Includes all headers.
1298    * @param bytes byte array
1299    * @param offset offset to start of the KeyValue
1300    * @return length of entire KeyValue, in bytes
1301    */
1302   private static int getLength(byte [] bytes, int offset) {
1303     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1304     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1305     return klength + vlength;
1306   }
1307 
1308   /**
1309    * @return Key offset in backing buffer..
1310    */
1311   public int getKeyOffset() {
1312     return this.offset + ROW_OFFSET;
1313   }
1314 
1315   public String getKeyString() {
1316     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1317   }
1318 
1319   /**
1320    * @return Length of key portion.
1321    */
1322   public int getKeyLength() {
1323     return Bytes.toInt(this.bytes, this.offset);
1324   }
1325 
1326   /**
1327    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1328    */
1329   @Override
1330   public byte[] getValueArray() {
1331     return bytes;
1332   }
1333 
1334   /**
1335    * @return the value offset
1336    */
1337   @Override
1338   public int getValueOffset() {
1339     int voffset = getKeyOffset() + getKeyLength();
1340     return voffset;
1341   }
1342 
1343   /**
1344    * @return Value length
1345    */
1346   @Override
1347   public int getValueLength() {
1348     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1349     return vlength;
1350   }
1351 
1352   /**
1353    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1354    */
1355   @Override
1356   public byte[] getRowArray() {
1357     return bytes;
1358   }
1359 
1360   /**
1361    * @return Row offset
1362    */
1363   @Override
1364   public int getRowOffset() {
1365     return getKeyOffset() + Bytes.SIZEOF_SHORT;
1366   }
1367 
1368   /**
1369    * @return Row length
1370    */
1371   @Override
1372   public short getRowLength() {
1373     return Bytes.toShort(this.bytes, getKeyOffset());
1374   }
1375 
1376   /**
1377    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1378    */
1379   @Override
1380   public byte[] getFamilyArray() {
1381     return bytes;
1382   }
1383 
1384   /**
1385    * @return Family offset
1386    */
1387   @Override
1388   public int getFamilyOffset() {
1389     return getFamilyOffset(getRowLength());
1390   }
1391 
1392   /**
1393    * @return Family offset
1394    */
1395   private int getFamilyOffset(int rlength) {
1396     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
1397   }
1398 
1399   /**
1400    * @return Family length
1401    */
1402   @Override
1403   public byte getFamilyLength() {
1404     return getFamilyLength(getFamilyOffset());
1405   }
1406 
1407   /**
1408    * @return Family length
1409    */
1410   public byte getFamilyLength(int foffset) {
1411     return this.bytes[foffset-1];
1412   }
1413 
1414   /**
1415    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1416    */
1417   @Override
1418   public byte[] getQualifierArray() {
1419     return bytes;
1420   }
1421 
1422   /**
1423    * @return Qualifier offset
1424    */
1425   @Override
1426   public int getQualifierOffset() {
1427     return getQualifierOffset(getFamilyOffset());
1428   }
1429 
1430   /**
1431    * @return Qualifier offset
1432    */
1433   private int getQualifierOffset(int foffset) {
1434     return foffset + getFamilyLength(foffset);
1435   }
1436 
1437   /**
1438    * @return Qualifier length
1439    */
1440   @Override
1441   public int getQualifierLength() {
1442     return getQualifierLength(getRowLength(),getFamilyLength());
1443   }
1444 
1445   /**
1446    * @return Qualifier length
1447    */
1448   private int getQualifierLength(int rlength, int flength) {
1449     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1450   }
1451 
1452   /**
1453    * @return Timestamp offset
1454    */
1455   public int getTimestampOffset() {
1456     return getTimestampOffset(getKeyLength());
1457   }
1458 
1459   /**
1460    * @param keylength Pass if you have it to save on a int creation.
1461    * @return Timestamp offset
1462    */
1463   private int getTimestampOffset(final int keylength) {
1464     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1465   }
1466 
1467   /**
1468    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1469    */
1470   public boolean isLatestTimestamp() {
1471     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1472       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1473   }
1474 
1475   /**
1476    * @param now Time to set into <code>this</code> IFF timestamp ==
1477    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1478    * @return True is we modified this.
1479    */
1480   public boolean updateLatestStamp(final byte [] now) {
1481     if (this.isLatestTimestamp()) {
1482       int tsOffset = getTimestampOffset();
1483       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1484       // clear cache or else getTimestamp() possibly returns an old value
1485       return true;
1486     }
1487     return false;
1488   }
1489 
1490   @Override
1491   public void setTimestamp(long ts) {
1492     Bytes.putBytes(this.bytes, this.getTimestampOffset(), Bytes.toBytes(ts), 0, Bytes.SIZEOF_LONG);
1493   }
1494 
1495   @Override
1496   public void setTimestamp(byte[] ts, int tsOffset) {
1497     Bytes.putBytes(this.bytes, this.getTimestampOffset(), ts, tsOffset, Bytes.SIZEOF_LONG);
1498   }
1499 
1500   //---------------------------------------------------------------------------
1501   //
1502   //  Methods that return copies of fields
1503   //
1504   //---------------------------------------------------------------------------
1505 
1506   /**
1507    * Do not use unless you have to.  Used internally for compacting and testing.
1508    *
1509    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
1510    * {@link #getValue()} if accessing a KeyValue client-side.
1511    * @return Copy of the key portion only.
1512    */
1513   public byte [] getKey() {
1514     int keylength = getKeyLength();
1515     byte [] key = new byte[keylength];
1516     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1517     return key;
1518   }
1519 
1520   /**
1521    * Returns value in a new byte array.
1522    * Primarily for use client-side. If server-side, use
1523    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1524    * save on allocations.
1525    * @return Value in a new byte array.
1526    */
1527   @Override
1528   @Deprecated // use CellUtil.getValueArray()
1529   public byte [] getValue() {
1530     return CellUtil.cloneValue(this);
1531   }
1532 
1533   /**
1534    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1535    * byte array.<p>
1536    *
1537    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1538    * lengths instead.
1539    * @return Row in a new byte array.
1540    */
1541   @Override
1542   @Deprecated // use CellUtil.getRowArray()
1543   public byte [] getRow() {
1544     return CellUtil.cloneRow(this);
1545   }
1546 
1547   /**
1548    *
1549    * @return Timestamp
1550    */
1551   @Override
1552   public long getTimestamp() {
1553     return getTimestamp(getKeyLength());
1554   }
1555 
1556   /**
1557    * @param keylength Pass if you have it to save on a int creation.
1558    * @return Timestamp
1559    */
1560   long getTimestamp(final int keylength) {
1561     int tsOffset = getTimestampOffset(keylength);
1562     return Bytes.toLong(this.bytes, tsOffset);
1563   }
1564 
1565   /**
1566    * @return Type of this KeyValue.
1567    */
1568   @Deprecated
1569   public byte getType() {
1570     return getTypeByte();
1571   }
1572 
1573   /**
1574    * @return KeyValue.TYPE byte representation
1575    */
1576   @Override
1577   public byte getTypeByte() {
1578     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1579   }
1580 
1581   /**
1582    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1583    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1584    * KeyValue type.
1585    */
1586   @Deprecated // use CellUtil#isDelete
1587   public boolean isDelete() {
1588     return KeyValue.isDelete(getType());
1589   }
1590 
1591   /**
1592    * Primarily for use client-side.  Returns the family of this KeyValue in a
1593    * new byte array.<p>
1594    *
1595    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1596    * lengths instead.
1597    * @return Returns family. Makes a copy.
1598    */
1599   @Override
1600   @Deprecated // use CellUtil.getFamilyArray
1601   public byte [] getFamily() {
1602     return CellUtil.cloneFamily(this);
1603   }
1604 
1605   /**
1606    * Primarily for use client-side.  Returns the column qualifier of this
1607    * KeyValue in a new byte array.<p>
1608    *
1609    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1610    * lengths instead.
1611    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1612    * @return Returns qualifier. Makes a copy.
1613    */
1614   @Override
1615   @Deprecated // use CellUtil.getQualifierArray
1616   public byte [] getQualifier() {
1617     return CellUtil.cloneQualifier(this);
1618   }
1619 
1620   /**
1621    * This returns the offset where the tag actually starts.
1622    */
1623   @Override
1624   public int getTagsOffset() {
1625     int tagsLen = getTagsLength();
1626     if (tagsLen == 0) {
1627       return this.offset + this.length;
1628     }
1629     return this.offset + this.length - tagsLen;
1630   }
1631 
1632   /**
1633    * This returns the total length of the tag bytes
1634    */
1635   @Override
1636   public int getTagsLength() {
1637     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1638     if (tagsLen > 0) {
1639       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1640       // length
1641       tagsLen -= TAGS_LENGTH_SIZE;
1642     }
1643     return tagsLen;
1644   }
1645 
1646   /**
1647    * Returns any tags embedded in the KeyValue.  Used in testcases.
1648    * @return The tags
1649    */
1650   public List<Tag> getTags() {
1651     int tagsLength = getTagsLength();
1652     if (tagsLength == 0) {
1653       return EMPTY_ARRAY_LIST;
1654     }
1655     return Tag.asList(getTagsArray(), getTagsOffset(), tagsLength);
1656   }
1657 
1658   /**
1659    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1660    */
1661   @Override
1662   public byte[] getTagsArray() {
1663     return bytes;
1664   }
1665 
1666   /**
1667    * Creates a new KeyValue that only contains the key portion (the value is
1668    * set to be null).
1669    *
1670    * TODO only used by KeyOnlyFilter -- move there.
1671    * @param lenAsVal replace value with the actual value length (false=empty)
1672    */
1673   public KeyValue createKeyOnly(boolean lenAsVal) {
1674     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1675     // Rebuild as: <keylen:4><0:4><key:keylen>
1676     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1677     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1678     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1679         Math.min(newBuffer.length,this.length));
1680     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1681     if (lenAsVal) {
1682       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1683     }
1684     return new KeyValue(newBuffer);
1685   }
1686 
1687   /**
1688    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1689    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1690    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1691    * <p>
1692    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1693    * </p>
1694    * <p>
1695    * Not recommend to be used as this is old-style API.
1696    * </p>
1697    * @param c The column.
1698    * @return The parsed column.
1699    */
1700   public static byte [][] parseColumn(byte [] c) {
1701     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1702     if (index == -1) {
1703       // If no delimiter, return array of size 1
1704       return new byte [][] { c };
1705     } else if(index == c.length - 1) {
1706       // family with empty qualifier, return array size 2
1707       byte [] family = new byte[c.length-1];
1708       System.arraycopy(c, 0, family, 0, family.length);
1709       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1710     }
1711     // Family and column, return array size 2
1712     final byte [][] result = new byte [2][];
1713     result[0] = new byte [index];
1714     System.arraycopy(c, 0, result[0], 0, index);
1715     final int len = c.length - (index + 1);
1716     result[1] = new byte[len];
1717     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1718     return result;
1719   }
1720 
1721   /**
1722    * Makes a column in family:qualifier form from separate byte arrays.
1723    * <p>
1724    * Not recommended for usage as this is old-style API.
1725    * @param family
1726    * @param qualifier
1727    * @return family:qualifier
1728    */
1729   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1730     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1731   }
1732 
1733   /**
1734    * @param b
1735    * @param delimiter
1736    * @return Index of delimiter having started from start of <code>b</code>
1737    * moving rightward.
1738    */
1739   public static int getDelimiter(final byte [] b, int offset, final int length,
1740       final int delimiter) {
1741     if (b == null) {
1742       throw new IllegalArgumentException("Passed buffer is null");
1743     }
1744     int result = -1;
1745     for (int i = offset; i < length + offset; i++) {
1746       if (b[i] == delimiter) {
1747         result = i;
1748         break;
1749       }
1750     }
1751     return result;
1752   }
1753 
1754   /**
1755    * Find index of passed delimiter walking from end of buffer backwards.
1756    * @param b
1757    * @param delimiter
1758    * @return Index of delimiter
1759    */
1760   public static int getDelimiterInReverse(final byte [] b, final int offset,
1761       final int length, final int delimiter) {
1762     if (b == null) {
1763       throw new IllegalArgumentException("Passed buffer is null");
1764     }
1765     int result = -1;
1766     for (int i = (offset + length) - 1; i >= offset; i--) {
1767       if (b[i] == delimiter) {
1768         result = i;
1769         break;
1770       }
1771     }
1772     return result;
1773   }
1774 
1775   /**
1776    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1777    * {@link KeyValue}s.
1778    */
1779   public static class MetaComparator extends KVComparator {
1780     /**
1781      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1782      * table.
1783      */
1784     @Override
1785     public int compare(final Cell left, final Cell right) {
1786       int c = compareRowKey(left, right);
1787       if (c != 0) {
1788         return c;
1789       }
1790       return CellComparator.compareWithoutRow(left, right);
1791     }
1792 
1793     @Override
1794     public int compareOnlyKeyPortion(Cell left, Cell right) {
1795       return compare(left, right);
1796     }
1797 
1798     @Override
1799     public int compareRows(byte [] left, int loffset, int llength,
1800         byte [] right, int roffset, int rlength) {
1801       int leftDelimiter = getDelimiter(left, loffset, llength,
1802           HConstants.DELIMITER);
1803       int rightDelimiter = getDelimiter(right, roffset, rlength,
1804           HConstants.DELIMITER);
1805       // Compare up to the delimiter
1806       int lpart = (leftDelimiter < 0 ? llength :leftDelimiter - loffset);
1807       int rpart = (rightDelimiter < 0 ? rlength :rightDelimiter - roffset);
1808       int result = Bytes.compareTo(left, loffset, lpart, right, roffset, rpart);
1809       if (result != 0) {
1810         return result;
1811       } else {
1812         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1813           return -1;
1814         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1815           return 1;
1816         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1817           return 0;
1818         }
1819       }
1820       // Compare middle bit of the row.
1821       // Move past delimiter
1822       leftDelimiter++;
1823       rightDelimiter++;
1824       int leftFarDelimiter = getDelimiterInReverse(left, leftDelimiter,
1825           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1826       int rightFarDelimiter = getDelimiterInReverse(right,
1827           rightDelimiter, rlength - (rightDelimiter - roffset),
1828           HConstants.DELIMITER);
1829       // Now compare middlesection of row.
1830       lpart = (leftFarDelimiter < 0 ? llength + loffset: leftFarDelimiter) - leftDelimiter;
1831       rpart = (rightFarDelimiter < 0 ? rlength + roffset: rightFarDelimiter)- rightDelimiter;
1832       result = super.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart);
1833       if (result != 0) {
1834         return result;
1835       }  else {
1836         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1837           return -1;
1838         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1839           return 1;
1840         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1841           return 0;
1842         }
1843       }
1844       // Compare last part of row, the rowid.
1845       leftFarDelimiter++;
1846       rightFarDelimiter++;
1847       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1848           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1849       return result;
1850     }
1851 
1852     /**
1853      * Don't do any fancy Block Index splitting tricks.
1854      */
1855     @Override
1856     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1857       return Arrays.copyOf(rightKey, rightKey.length);
1858     }
1859 
1860     /**
1861      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1862      * instantiate the appropriate comparator.
1863      * TODO: With V3 consider removing this.
1864      * @return legacy class name for FileFileTrailer#comparatorClassName
1865      */
1866     @Override
1867     public String getLegacyKeyComparatorName() {
1868       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1869     }
1870 
1871     @Override
1872     protected Object clone() throws CloneNotSupportedException {
1873       return new MetaComparator();
1874     }
1875 
1876     /**
1877      * Override the row key comparison to parse and compare the meta row key parts.
1878      */
1879     @Override
1880     protected int compareRowKey(final Cell l, final Cell r) {
1881       byte[] left = l.getRowArray();
1882       int loffset = l.getRowOffset();
1883       int llength = l.getRowLength();
1884       byte[] right = r.getRowArray();
1885       int roffset = r.getRowOffset();
1886       int rlength = r.getRowLength();
1887       return compareRows(left, loffset, llength, right, roffset, rlength);
1888     }
1889   }
1890 
1891   /**
1892    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1893    * portion.  This means two KeyValues with same Key but different Values are
1894    * considered the same as far as this Comparator is concerned.
1895    */
1896   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1897 
1898     /**
1899      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1900      * instantiate the appropriate comparator.
1901      * TODO: With V3 consider removing this.
1902      * @return legacy class name for FileFileTrailer#comparatorClassName
1903      */
1904     public String getLegacyKeyComparatorName() {
1905       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1906     }
1907 
1908     @Override // RawComparator
1909     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1910       return compareFlatKey(l,loff,llen, r,roff,rlen);
1911     }
1912 
1913 
1914     /**
1915      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1916      * @param left
1917      * @param right
1918      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1919      */
1920     protected int compareRowKey(final Cell left, final Cell right) {
1921       return CellComparator.compareRows(left, right);
1922     }
1923 
1924     /**
1925      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1926      * full KVs laid out in a flat byte[]s.
1927      * @param left
1928      * @param loffset
1929      * @param llength
1930      * @param right
1931      * @param roffset
1932      * @param rlength
1933      * @return  0 if equal, <0 if left smaller, >0 if right smaller
1934      */
1935     public int compareFlatKey(byte[] left, int loffset, int llength,
1936         byte[] right, int roffset, int rlength) {
1937       // Compare row
1938       short lrowlength = Bytes.toShort(left, loffset);
1939       short rrowlength = Bytes.toShort(right, roffset);
1940       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1941           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1942       if (compare != 0) {
1943         return compare;
1944       }
1945 
1946       // Compare the rest of the two KVs without making any assumptions about
1947       // the common prefix. This function will not compare rows anyway, so we
1948       // don't need to tell it that the common prefix includes the row.
1949       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1950           rlength, rrowlength);
1951     }
1952 
1953     public int compareFlatKey(byte[] left, byte[] right) {
1954       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1955     }
1956 
1957     // compare a key against row/fam/qual/ts/type
1958     public int compareKey(Cell cell,
1959         byte[] row, int roff, int rlen,
1960         byte[] fam, int foff, int flen,
1961         byte[] col, int coff, int clen,
1962         long ts, byte type) {
1963 
1964       int compare = compareRows(
1965         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
1966         row, roff, rlen);
1967       if (compare != 0) {
1968         return compare;
1969       }
1970       // If the column is not specified, the "minimum" key type appears the
1971       // latest in the sorted order, regardless of the timestamp. This is used
1972       // for specifying the last key/value in a given row, because there is no
1973       // "lexicographically last column" (it would be infinitely long). The
1974       // "maximum" key type does not need this behavior.
1975       if (cell.getFamilyLength() + cell.getQualifierLength() == 0
1976           && cell.getTypeByte() == Type.Minimum.getCode()) {
1977         // left is "bigger", i.e. it appears later in the sorted order
1978         return 1;
1979       }
1980       if (flen+clen == 0 && type == Type.Minimum.getCode()) {
1981         return -1;
1982       }
1983 
1984       compare = compareFamilies(
1985         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
1986         fam, foff, flen);
1987       if (compare != 0) {
1988         return compare;
1989       }
1990       compare = compareColumns(
1991         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
1992         col, coff, clen);
1993       if (compare != 0) {
1994         return compare;
1995       }
1996       // Next compare timestamps.
1997       compare = compareTimestamps(cell.getTimestamp(), ts);
1998       if (compare != 0) {
1999         return compare;
2000       }
2001 
2002       // Compare types. Let the delete types sort ahead of puts; i.e. types
2003       // of higher numbers sort before those of lesser numbers. Maximum (255)
2004       // appears ahead of everything, and minimum (0) appears after
2005       // everything.
2006       return (0xff & type) - (0xff & cell.getTypeByte());
2007     }
2008 
2009     public int compareOnlyKeyPortion(Cell left, Cell right) {
2010       return CellComparator.compare(left, right, true);
2011     }
2012 
2013     /**
2014      * Compares the Key of a cell -- with fields being more significant in this order:
2015      * rowkey, colfam/qual, timestamp, type, mvcc
2016      */
2017     @Override
2018     public int compare(final Cell left, final Cell right) {
2019       int compare = CellComparator.compare(left, right, false);
2020       return compare;
2021     }
2022 
2023     public int compareTimestamps(final Cell left, final Cell right) {
2024       return CellComparator.compareTimestamps(left, right);
2025     }
2026 
2027     /**
2028      * @param left
2029      * @param right
2030      * @return Result comparing rows.
2031      */
2032     public int compareRows(final Cell left, final Cell right) {
2033       return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(),
2034       right.getRowArray(), right.getRowOffset(), right.getRowLength());
2035     }
2036 
2037     /**
2038      * Get the b[],o,l for left and right rowkey portions and compare.
2039      * @param left
2040      * @param loffset
2041      * @param llength
2042      * @param right
2043      * @param roffset
2044      * @param rlength
2045      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2046      */
2047     public int compareRows(byte [] left, int loffset, int llength,
2048         byte [] right, int roffset, int rlength) {
2049       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2050     }
2051 
2052     int compareColumns(final Cell left, final short lrowlength, final Cell right,
2053         final short rrowlength) {
2054       return CellComparator.compareColumns(left, right);
2055     }
2056 
2057     protected int compareColumns(
2058         byte [] left, int loffset, int llength, final int lfamilylength,
2059         byte [] right, int roffset, int rlength, final int rfamilylength) {
2060       // Compare family portion first.
2061       int diff = Bytes.compareTo(left, loffset, lfamilylength,
2062         right, roffset, rfamilylength);
2063       if (diff != 0) {
2064         return diff;
2065       }
2066       // Compare qualifier portion
2067       return Bytes.compareTo(left, loffset + lfamilylength,
2068         llength - lfamilylength,
2069         right, roffset + rfamilylength, rlength - rfamilylength);
2070       }
2071 
2072     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
2073       // The below older timestamps sorting ahead of newer timestamps looks
2074       // wrong but it is intentional. This way, newer timestamps are first
2075       // found when we iterate over a memstore and newer versions are the
2076       // first we trip over when reading from a store file.
2077       if (ltimestamp < rtimestamp) {
2078         return 1;
2079       } else if (ltimestamp > rtimestamp) {
2080         return -1;
2081       }
2082       return 0;
2083     }
2084 
2085     /**
2086      * Overridden
2087      * @param commonPrefix
2088      * @param left
2089      * @param loffset
2090      * @param llength
2091      * @param right
2092      * @param roffset
2093      * @param rlength
2094      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2095      */
2096     @Override // SamePrefixComparator
2097     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2098         int loffset, int llength, byte[] right, int roffset, int rlength) {
2099       // Compare row
2100       short lrowlength = Bytes.toShort(left, loffset);
2101       short rrowlength;
2102 
2103       int comparisonResult = 0;
2104       if (commonPrefix < ROW_LENGTH_SIZE) {
2105         // almost nothing in common
2106         rrowlength = Bytes.toShort(right, roffset);
2107         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2108             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2109       } else { // the row length is the same
2110         rrowlength = lrowlength;
2111         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2112           // The rows are not the same. Exclude the common prefix and compare
2113           // the rest of the two rows.
2114           int common = commonPrefix - ROW_LENGTH_SIZE;
2115           comparisonResult = compareRows(
2116               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2117               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2118         }
2119       }
2120       if (comparisonResult != 0) {
2121         return comparisonResult;
2122       }
2123 
2124       assert lrowlength == rrowlength;
2125       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2126           roffset, rlength, lrowlength);
2127     }
2128 
2129     /**
2130      * Compare columnFamily, qualifier, timestamp, and key type (everything
2131      * except the row). This method is used both in the normal comparator and
2132      * the "same-prefix" comparator. Note that we are assuming that row portions
2133      * of both KVs have already been parsed and found identical, and we don't
2134      * validate that assumption here.
2135      * @param commonPrefix
2136      *          the length of the common prefix of the two key-values being
2137      *          compared, including row length and row
2138      */
2139     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2140         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2141       /***
2142        * KeyValue Format and commonLength:
2143        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2144        * ------------------|-------commonLength--------|--------------
2145        */
2146       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2147 
2148       // commonLength + TIMESTAMP_TYPE_SIZE
2149       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2150       // ColumnFamily + Qualifier length.
2151       int lcolumnlength = llength - commonLengthWithTSAndType;
2152       int rcolumnlength = rlength - commonLengthWithTSAndType;
2153 
2154       byte ltype = left[loffset + (llength - 1)];
2155       byte rtype = right[roffset + (rlength - 1)];
2156 
2157       // If the column is not specified, the "minimum" key type appears the
2158       // latest in the sorted order, regardless of the timestamp. This is used
2159       // for specifying the last key/value in a given row, because there is no
2160       // "lexicographically last column" (it would be infinitely long). The
2161       // "maximum" key type does not need this behavior.
2162       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2163         // left is "bigger", i.e. it appears later in the sorted order
2164         return 1;
2165       }
2166       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2167         return -1;
2168       }
2169 
2170       int lfamilyoffset = commonLength + loffset;
2171       int rfamilyoffset = commonLength + roffset;
2172 
2173       // Column family length.
2174       int lfamilylength = left[lfamilyoffset - 1];
2175       int rfamilylength = right[rfamilyoffset - 1];
2176       // If left family size is not equal to right family size, we need not
2177       // compare the qualifiers.
2178       boolean sameFamilySize = (lfamilylength == rfamilylength);
2179       int common = 0;
2180       if (commonPrefix > 0) {
2181         common = Math.max(0, commonPrefix - commonLength);
2182         if (!sameFamilySize) {
2183           // Common should not be larger than Math.min(lfamilylength,
2184           // rfamilylength).
2185           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2186         } else {
2187           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2188         }
2189       }
2190       if (!sameFamilySize) {
2191         // comparing column family is enough.
2192         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2193             - common, right, rfamilyoffset + common, rfamilylength - common);
2194       }
2195       // Compare family & qualifier together.
2196       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2197           lcolumnlength - common, right, rfamilyoffset + common,
2198           rcolumnlength - common);
2199       if (comparison != 0) {
2200         return comparison;
2201       }
2202 
2203       ////
2204       // Next compare timestamps.
2205       long ltimestamp = Bytes.toLong(left,
2206           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2207       long rtimestamp = Bytes.toLong(right,
2208           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2209       int compare = compareTimestamps(ltimestamp, rtimestamp);
2210       if (compare != 0) {
2211         return compare;
2212       }
2213 
2214       // Compare types. Let the delete types sort ahead of puts; i.e. types
2215       // of higher numbers sort before those of lesser numbers. Maximum (255)
2216       // appears ahead of everything, and minimum (0) appears after
2217       // everything.
2218       return (0xff & rtype) - (0xff & ltype);
2219     }
2220 
2221     protected int compareFamilies(final byte[] left, final int loffset, final int lfamilylength,
2222         final byte[] right, final int roffset, final int rfamilylength) {
2223       int diff = Bytes.compareTo(left, loffset, lfamilylength, right, roffset, rfamilylength);
2224       return diff;
2225     }
2226 
2227     protected int compareColumns(final byte[] left, final int loffset, final int lquallength,
2228         final byte[] right, final int roffset, final int rquallength) {
2229       int diff = Bytes.compareTo(left, loffset, lquallength, right, roffset, rquallength);
2230       return diff;
2231     }
2232     /**
2233      * Compares the row and column of two keyvalues for equality
2234      * @param left
2235      * @param right
2236      * @return True if same row and column.
2237      */
2238     public boolean matchingRowColumn(final Cell left,
2239         final Cell right) {
2240       short lrowlength = left.getRowLength();
2241       short rrowlength = right.getRowLength();
2242 
2243       // TsOffset = end of column data. just comparing Row+CF length of each
2244       if ((left.getRowLength() + left.getFamilyLength() + left.getQualifierLength()) != (right
2245           .getRowLength() + right.getFamilyLength() + right.getQualifierLength())) {
2246         return false;
2247       }
2248 
2249       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2250         return false;
2251       }
2252 
2253       int lfoffset = left.getFamilyOffset();
2254       int rfoffset = right.getFamilyOffset();
2255       int lclength = left.getQualifierLength();
2256       int rclength = right.getQualifierLength();
2257       int lfamilylength = left.getFamilyLength();
2258       int rfamilylength = right.getFamilyLength();
2259       int diff = compareFamilies(left.getFamilyArray(), lfoffset, lfamilylength,
2260           right.getFamilyArray(), rfoffset, rfamilylength);
2261       if (diff != 0) {
2262         return false;
2263       } else {
2264         diff = compareColumns(left.getQualifierArray(), left.getQualifierOffset(), lclength,
2265             right.getQualifierArray(), right.getQualifierOffset(), rclength);
2266         return diff == 0;
2267       }
2268     }
2269 
2270     /**
2271      * Compares the row of two keyvalues for equality
2272      * @param left
2273      * @param right
2274      * @return True if rows match.
2275      */
2276     public boolean matchingRows(final Cell left, final Cell right) {
2277       short lrowlength = left.getRowLength();
2278       short rrowlength = right.getRowLength();
2279       return matchingRows(left, lrowlength, right, rrowlength);
2280     }
2281 
2282     /**
2283      * @param left
2284      * @param lrowlength
2285      * @param right
2286      * @param rrowlength
2287      * @return True if rows match.
2288      */
2289     private boolean matchingRows(final Cell left, final short lrowlength,
2290         final Cell right, final short rrowlength) {
2291       return lrowlength == rrowlength &&
2292           matchingRows(left.getRowArray(), left.getRowOffset(), lrowlength,
2293               right.getRowArray(), right.getRowOffset(), rrowlength);
2294     }
2295 
2296     /**
2297      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2298      * @param left Left row array.
2299      * @param loffset Left row offset.
2300      * @param llength Left row length.
2301      * @param right Right row array.
2302      * @param roffset Right row offset.
2303      * @param rlength Right row length.
2304      * @return Whether rows are the same row.
2305      */
2306     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2307         final byte [] right, final int roffset, final int rlength) {
2308       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2309     }
2310 
2311     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2312       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2313       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2314         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2315             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2316             + Bytes.toStringBinary(firstKeyInBlock));
2317         return firstKeyInBlock;
2318       }
2319       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2320         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2321             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2322             Bytes.toStringBinary(fakeKey));
2323         return firstKeyInBlock;
2324       }
2325       return fakeKey;
2326     }
2327 
2328     /**
2329      * This is a HFile block index key optimization.
2330      * @param leftKey
2331      * @param rightKey
2332      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2333      * @deprecated Since 0.99.2; Use
2334      *             {@link CellComparator#getMidpoint(KeyValue.KVComparator, Cell, Cell) instead}
2335      */
2336     @Deprecated
2337     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2338       if (rightKey == null) {
2339         throw new IllegalArgumentException("rightKey can not be null");
2340       }
2341       if (leftKey == null) {
2342         return Arrays.copyOf(rightKey, rightKey.length);
2343       }
2344       if (compareFlatKey(leftKey, rightKey) >= 0) {
2345         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2346           + ", rightKey:" + Bytes.toString(rightKey));
2347       }
2348 
2349       short leftRowLength = Bytes.toShort(leftKey, 0);
2350       short rightRowLength = Bytes.toShort(rightKey, 0);
2351       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2352       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2353       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2354       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2355       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2356       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2357       // rows are equal
2358       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2359         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2360         // Compare family & qualifier together.
2361         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2362           rightCommonLength, rightColumnLength);
2363         // same with "row + family + qualifier", return rightKey directly
2364         if (comparison == 0) {
2365           return Arrays.copyOf(rightKey, rightKey.length);
2366         }
2367         // "family + qualifier" are different, generate a faked key per rightKey
2368         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2369         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2370         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2371         return newKey;
2372       }
2373       // rows are different
2374       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2375       short diffIdx = 0;
2376       while (diffIdx < minLength
2377           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2378         diffIdx++;
2379       }
2380       byte[] newRowKey = null;
2381       if (diffIdx >= minLength) {
2382         // leftKey's row is prefix of rightKey's.
2383         newRowKey = new byte[diffIdx + 1];
2384         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2385       } else {
2386         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2387         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2388             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2389           newRowKey = new byte[diffIdx + 1];
2390           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2391           newRowKey[diffIdx] = (byte) (diffByte + 1);
2392         } else {
2393           newRowKey = new byte[diffIdx + 1];
2394           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2395         }
2396       }
2397       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2398         Type.Maximum).getKey();
2399     }
2400 
2401     @Override
2402     protected Object clone() throws CloneNotSupportedException {
2403       super.clone();
2404       return new KVComparator();
2405     }
2406 
2407   }
2408 
2409   /**
2410    * @param b
2411    * @return A KeyValue made of a byte array that holds the key-only part.
2412    * Needed to convert hfile index members to KeyValues.
2413    */
2414   public static KeyValue createKeyValueFromKey(final byte [] b) {
2415     return createKeyValueFromKey(b, 0, b.length);
2416   }
2417 
2418   /**
2419    * @param bb
2420    * @return A KeyValue made of a byte buffer that holds the key-only part.
2421    * Needed to convert hfile index members to KeyValues.
2422    */
2423   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2424     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2425   }
2426 
2427   /**
2428    * @param b
2429    * @param o
2430    * @param l
2431    * @return A KeyValue made of a byte array that holds the key-only part.
2432    * Needed to convert hfile index members to KeyValues.
2433    */
2434   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2435       final int l) {
2436     byte [] newb = new byte[l + ROW_OFFSET];
2437     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2438     Bytes.putInt(newb, 0, l);
2439     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2440     return new KeyValue(newb);
2441   }
2442 
2443   /**
2444    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2445    * backing bytes copied from the steam.
2446    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2447    * of zero, we will return null which can be useful marking a stream as done.
2448    * @throws IOException
2449    */
2450   public static KeyValue create(final DataInput in) throws IOException {
2451     return create(in.readInt(), in);
2452   }
2453 
2454   /**
2455    * Create a KeyValue reading <code>length</code> from <code>in</code>
2456    * @param length
2457    * @param in
2458    * @return Created KeyValue OR if we find a length of zero, we will return null which
2459    * can be useful marking a stream as done.
2460    * @throws IOException
2461    */
2462   public static KeyValue create(int length, final DataInput in) throws IOException {
2463 
2464     if (length <= 0) {
2465       if (length == 0) return null;
2466       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2467     }
2468 
2469     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2470     byte [] bytes = new byte[length];
2471     in.readFully(bytes);
2472     return new KeyValue(bytes, 0, length);
2473   }
2474 
2475   /**
2476    * Create a new KeyValue by copying existing cell and adding new tags
2477    * @param c
2478    * @param newTags
2479    * @return a new KeyValue instance with new tags
2480    */
2481   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2482     List<Tag> existingTags = null;
2483     if(c.getTagsLength() > 0) {
2484       existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
2485       existingTags.addAll(newTags);
2486     } else {
2487       existingTags = newTags;
2488     }
2489     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2490       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
2491       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
2492       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
2493       c.getValueLength(), existingTags);
2494   }
2495 
2496   /**
2497    * Create a KeyValue reading from the raw InputStream.
2498    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2499    * @param in
2500    * @return Created KeyValue or throws an exception
2501    * @throws IOException
2502    */
2503   public static KeyValue iscreate(final InputStream in) throws IOException {
2504     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2505     int bytesRead = 0;
2506     while (bytesRead < intBytes.length) {
2507       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2508       if (n < 0) {
2509         if (bytesRead == 0) {
2510           throw new EOFException();
2511         }
2512         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2513       }
2514       bytesRead += n;
2515     }
2516     // TODO: perhaps some sanity check is needed here.
2517     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2518     IOUtils.readFully(in, bytes, 0, bytes.length);
2519     return new KeyValue(bytes, 0, bytes.length);
2520   }
2521 
2522   /**
2523    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2524    * @param kv
2525    * @param out
2526    * @return Length written on stream
2527    * @throws IOException
2528    * @see #create(DataInput) for the inverse function
2529    */
2530   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2531     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2532     // work for all implementations.
2533     int length = kv.getLength();
2534     out.writeInt(length);
2535     out.write(kv.getBuffer(), kv.getOffset(), length);
2536     return length + Bytes.SIZEOF_INT;
2537   }
2538 
2539   /**
2540    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2541    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2542    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2543    * @param kv
2544    * @param out
2545    * @return Length written on stream
2546    * @throws IOException
2547    * @see #create(DataInput) for the inverse function
2548    * @see #write(KeyValue, DataOutput)
2549    * @deprecated use {@link #oswrite(KeyValue, OutputStream, boolean)} instead
2550    */
2551   @Deprecated
2552   public static long oswrite(final KeyValue kv, final OutputStream out)
2553       throws IOException {
2554     int length = kv.getLength();
2555     // This does same as DataOuput#writeInt (big-endian, etc.)
2556     out.write(Bytes.toBytes(length));
2557     out.write(kv.getBuffer(), kv.getOffset(), length);
2558     return length + Bytes.SIZEOF_INT;
2559   }
2560 
2561   /**
2562    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2563    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2564    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2565    * @param kv
2566    * @param out
2567    * @param withTags
2568    * @return Length written on stream
2569    * @throws IOException
2570    * @see #create(DataInput) for the inverse function
2571    * @see #write(KeyValue, DataOutput)
2572    * @see KeyValueUtil#oswrite(Cell, OutputStream, boolean)
2573    */
2574   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2575       throws IOException {
2576     // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
2577     // check KeyValueUtil#oswrite also and do necessary changes.
2578     int length = kv.getLength();
2579     if (!withTags) {
2580       length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2581     }
2582     // This does same as DataOuput#writeInt (big-endian, etc.)
2583     StreamUtils.writeInt(out, length);
2584     out.write(kv.getBuffer(), kv.getOffset(), length);
2585     return length + Bytes.SIZEOF_INT;
2586   }
2587 
2588   /**
2589    * Comparator that compares row component only of a KeyValue.
2590    */
2591   public static class RowOnlyComparator implements Comparator<KeyValue> {
2592     final KVComparator comparator;
2593 
2594     public RowOnlyComparator(final KVComparator c) {
2595       this.comparator = c;
2596     }
2597 
2598     @Override
2599     public int compare(KeyValue left, KeyValue right) {
2600       return comparator.compareRows(left, right);
2601     }
2602   }
2603 
2604 
2605   /**
2606    * Avoids redundant comparisons for better performance.
2607    *
2608    * TODO get rid of this wart
2609    */
2610   public interface SamePrefixComparator<T> {
2611     /**
2612      * Compare two keys assuming that the first n bytes are the same.
2613      * @param commonPrefix How many bytes are the same.
2614      */
2615     int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
2616         byte[] right, int roffset, int rlength
2617     );
2618   }
2619 
2620   /**
2621    * This is a TEST only Comparator used in TestSeekTo and TestReseekTo.
2622    */
2623   public static class RawBytesComparator extends KVComparator {
2624     /**
2625      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2626      * instantiate the appropriate comparator.
2627      * TODO: With V3 consider removing this.
2628      * @return legacy class name for FileFileTrailer#comparatorClassName
2629      */
2630     @Override
2631     public String getLegacyKeyComparatorName() {
2632       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2633     }
2634 
2635     /**
2636      * @deprecated Since 0.99.2.
2637      */
2638     @Override
2639     @Deprecated
2640     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2641         int roffset, int rlength) {
2642       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2643     }
2644 
2645     @Override
2646     public int compare(Cell left, Cell right) {
2647       return compareOnlyKeyPortion(left, right);
2648     }
2649 
2650     @Override
2651     @VisibleForTesting
2652     public int compareOnlyKeyPortion(Cell left, Cell right) {
2653       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
2654           left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
2655       if (c != 0) {
2656         return c;
2657       }
2658       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getFamilyArray(), left.getFamilyOffset(),
2659           left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
2660           right.getFamilyLength());
2661       if (c != 0) {
2662         return c;
2663       }
2664       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getQualifierArray(), left.getQualifierOffset(),
2665           left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
2666           right.getQualifierLength());
2667       if (c != 0) {
2668         return c;
2669       }
2670       c = compareTimestamps(left.getTimestamp(), right.getTimestamp());
2671       if (c != 0) {
2672         return c;
2673       }
2674       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
2675     }
2676 
2677     @Override
2678     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2679       return firstKeyInBlock;
2680     }
2681 
2682   }
2683 
2684   /**
2685    * HeapSize implementation
2686    *
2687    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2688    * MemStore.
2689    */
2690   @Override
2691   public long heapSize() {
2692     int sum = 0;
2693     sum += ClassSize.OBJECT;// the KeyValue object itself
2694     sum += ClassSize.REFERENCE;// pointer to "bytes"
2695     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2696     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2697     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2698     sum += Bytes.SIZEOF_LONG;// memstoreTS
2699     return ClassSize.align(sum);
2700   }
2701 
2702   /**
2703    * This is a hack that should be removed once we don't care about matching
2704    * up client- and server-side estimations of cell size. It needed to be
2705    * backwards compatible with estimations done by older clients. We need to
2706    * pretend that tags never exist and KeyValues aren't serialized with tag
2707    * length included. See HBASE-13262 and HBASE-13303
2708    */
2709   @Deprecated
2710   public long heapSizeWithoutTags() {
2711     int sum = 0;
2712     sum += ClassSize.OBJECT;// the KeyValue object itself
2713     sum += ClassSize.REFERENCE;// pointer to "bytes"
2714     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2715     sum += KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
2716     sum += getKeyLength();
2717     sum += getValueLength();
2718     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2719     sum += Bytes.SIZEOF_LONG;// memstoreTS
2720     return ClassSize.align(sum);
2721   }
2722 
2723   /**
2724    * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[]
2725    * Mainly used in places where we need to compare two cells.  Avoids copying of bytes
2726    * In places like block index keys, we need to compare the key byte[] with a cell.
2727    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
2728    */
2729   public static class KeyOnlyKeyValue extends KeyValue {
2730     public KeyOnlyKeyValue() {
2731 
2732     }
2733     public KeyOnlyKeyValue(byte[] b) {
2734       this(b, 0, b.length);
2735     }
2736 
2737     public KeyOnlyKeyValue(byte[] b, int offset, int length) {
2738       this.bytes = b;
2739       this.length = length;
2740       this.offset = offset;
2741     }
2742 
2743     @Override
2744     public int getKeyOffset() {
2745       return this.offset;
2746     }
2747 
2748     /**
2749      * A setter that helps to avoid object creation every time and whenever
2750      * there is a need to create new KeyOnlyKeyValue.
2751      * @param key
2752      * @param offset
2753      * @param length
2754      */
2755     public void setKey(byte[] key, int offset, int length) {
2756       this.bytes = key;
2757       this.offset = offset;
2758       this.length = length;
2759     }
2760 
2761     @Override
2762     public byte[] getKey() {
2763       int keylength = getKeyLength();
2764       byte[] key = new byte[keylength];
2765       System.arraycopy(this.bytes, getKeyOffset(), key, 0, keylength);
2766       return key;
2767     }
2768 
2769     @Override
2770     public byte[] getRowArray() {
2771       return bytes;
2772     }
2773 
2774     @Override
2775     public int getRowOffset() {
2776       return getKeyOffset() + Bytes.SIZEOF_SHORT;
2777     }
2778 
2779     @Override
2780     public byte[] getFamilyArray() {
2781       return bytes;
2782     }
2783 
2784     @Override
2785     public byte getFamilyLength() {
2786       return this.bytes[getFamilyOffset() - 1];
2787     }
2788 
2789     @Override
2790     public int getFamilyOffset() {
2791       return this.offset + Bytes.SIZEOF_SHORT + getRowLength() + Bytes.SIZEOF_BYTE;
2792     }
2793 
2794     @Override
2795     public byte[] getQualifierArray() {
2796       return bytes;
2797     }
2798 
2799     @Override
2800     public int getQualifierLength() {
2801       return getQualifierLength(getRowLength(), getFamilyLength());
2802     }
2803 
2804     @Override
2805     public int getQualifierOffset() {
2806       return getFamilyOffset() + getFamilyLength();
2807     }
2808 
2809     @Override
2810     public int getKeyLength() {
2811       return length;
2812     }
2813 
2814     @Override
2815     public short getRowLength() {
2816       return Bytes.toShort(this.bytes, getKeyOffset());
2817     }
2818 
2819     @Override
2820     public byte getTypeByte() {
2821       return this.bytes[this.offset + getKeyLength() - 1];
2822     }
2823 
2824     private int getQualifierLength(int rlength, int flength) {
2825       return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
2826     }
2827 
2828     @Override
2829     public long getTimestamp() {
2830       int tsOffset = getTimestampOffset();
2831       return Bytes.toLong(this.bytes, tsOffset);
2832     }
2833 
2834     @Override
2835     public int getTimestampOffset() {
2836       return getKeyOffset() + getKeyLength() - TIMESTAMP_TYPE_SIZE;
2837     }
2838 
2839     @Override
2840     public byte[] getTagsArray() {
2841       return HConstants.EMPTY_BYTE_ARRAY;
2842     }
2843 
2844     @Override
2845     public int getTagsOffset() {
2846       return 0;
2847     }
2848 
2849     @Override
2850     public byte[] getValueArray() {
2851       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2852     }
2853 
2854     @Override
2855     public int getValueOffset() {
2856       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2857     }
2858 
2859     @Override
2860     public int getValueLength() {
2861       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2862     }
2863 
2864     @Override
2865     public int getTagsLength() {
2866       return 0;
2867     }
2868 
2869     @Override
2870     public String toString() {
2871       if (this.bytes == null || this.bytes.length == 0) {
2872         return "empty";
2873       }
2874       return keyToString(this.bytes, this.offset, getKeyLength()) + "/vlen=0/mvcc=0";
2875     }
2876 
2877     @Override
2878     public int hashCode() {
2879       return super.hashCode();
2880     }
2881 
2882     @Override
2883     public boolean equals(Object other) {
2884       return super.equals(other);
2885     }
2886   }
2887 }