001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.List;
030
031import org.apache.hadoop.hbase.KeyValue.Type;
032import org.apache.hadoop.hbase.io.util.StreamUtils;
033import org.apache.hadoop.hbase.util.ByteBufferUtils;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.io.IOUtils;
036import org.apache.hadoop.io.WritableUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Function;
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
042
043/**
044 * static convenience methods for dealing with KeyValues and collections of KeyValues
045 */
046@InterfaceAudience.Private
047public class KeyValueUtil {
048
049  /**************** length *********************/
050
051  /**
052   * Returns number of bytes this cell would have been used if serialized as in {@link KeyValue}
053   * @param cell
054   * @return the length
055   */
056  public static int length(final Cell cell) {
057    return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
058        cell.getValueLength(), cell.getTagsLength(), true);
059  }
060
061  public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
062    if (withTags) {
063      return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen));
064    }
065    return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen));
066  }
067
068  /**
069   * Returns number of bytes this cell's key part would have been used if serialized as in
070   * {@link KeyValue}. Key includes rowkey, family, qualifier, timestamp and type.
071   * @param cell
072   * @return the key length
073   */
074  public static int keyLength(final Cell cell) {
075    return keyLength(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength());
076  }
077
078  private static int keyLength(short rlen, byte flen, int qlen) {
079    return (int) KeyValue.getKeyDataStructureSize(rlen, flen, qlen);
080  }
081
082  public static int lengthWithMvccVersion(final KeyValue kv, final boolean includeMvccVersion) {
083    int length = kv.getLength();
084    if (includeMvccVersion) {
085      length += WritableUtils.getVIntSize(kv.getSequenceId());
086    }
087    return length;
088  }
089
090  public static int totalLengthWithMvccVersion(final Iterable<? extends KeyValue> kvs,
091      final boolean includeMvccVersion) {
092    int length = 0;
093    for (KeyValue kv : IterableUtils.emptyIfNull(kvs)) {
094      length += lengthWithMvccVersion(kv, includeMvccVersion);
095    }
096    return length;
097  }
098
099
100  /**************** copy the cell to create a new keyvalue *********************/
101
102  public static KeyValue copyToNewKeyValue(final Cell cell) {
103    byte[] bytes = copyToNewByteArray(cell);
104    KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
105    kvCell.setSequenceId(cell.getSequenceId());
106    return kvCell;
107  }
108
109  /**
110   * The position will be set to the beginning of the new ByteBuffer
111   * @param cell
112   * @return the Bytebuffer containing the key part of the cell
113   */
114  public static ByteBuffer copyKeyToNewByteBuffer(final Cell cell) {
115    byte[] bytes = new byte[keyLength(cell)];
116    appendKeyTo(cell, bytes, 0);
117    ByteBuffer buffer = ByteBuffer.wrap(bytes);
118    return buffer;
119  }
120
121  /**
122   * Copies the key to a new KeyValue
123   * @param cell
124   * @return the KeyValue that consists only the key part of the incoming cell
125   */
126  public static KeyValue toNewKeyCell(final Cell cell) {
127    byte[] bytes = new byte[keyLength(cell)];
128    appendKeyTo(cell, bytes, 0);
129    KeyValue kv = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
130    // Set the seq id. The new key cell could be used in comparisons so it
131    // is important that it uses the seqid also. If not the comparsion would fail
132    kv.setSequenceId(cell.getSequenceId());
133    return kv;
134  }
135
136  public static byte[] copyToNewByteArray(final Cell cell) {
137    int v1Length = length(cell);
138    byte[] backingBytes = new byte[v1Length];
139    appendToByteArray(cell, backingBytes, 0, true);
140    return backingBytes;
141  }
142
143  public static int appendKeyTo(final Cell cell, final byte[] output,
144      final int offset) {
145    int nextOffset = offset;
146    nextOffset = Bytes.putShort(output, nextOffset, cell.getRowLength());
147    nextOffset = CellUtil.copyRowTo(cell, output, nextOffset);
148    nextOffset = Bytes.putByte(output, nextOffset, cell.getFamilyLength());
149    nextOffset = CellUtil.copyFamilyTo(cell, output, nextOffset);
150    nextOffset = CellUtil.copyQualifierTo(cell, output, nextOffset);
151    nextOffset = Bytes.putLong(output, nextOffset, cell.getTimestamp());
152    nextOffset = Bytes.putByte(output, nextOffset, cell.getTypeByte());
153    return nextOffset;
154  }
155
156  /**************** copy key and value *********************/
157
158  public static int appendToByteArray(Cell cell, byte[] output, int offset, boolean withTags) {
159    int pos = offset;
160    pos = Bytes.putInt(output, pos, keyLength(cell));
161    pos = Bytes.putInt(output, pos, cell.getValueLength());
162    pos = appendKeyTo(cell, output, pos);
163    pos = CellUtil.copyValueTo(cell, output, pos);
164    if (withTags && (cell.getTagsLength() > 0)) {
165      pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
166      pos = PrivateCellUtil.copyTagsTo(cell, output, pos);
167    }
168    return pos;
169  }
170
171  /**
172   * Copy the Cell content into the passed buf in KeyValue serialization format.
173   */
174  public static int appendTo(Cell cell, ByteBuffer buf, int offset, boolean withTags) {
175    offset = ByteBufferUtils.putInt(buf, offset, keyLength(cell));// Key length
176    offset = ByteBufferUtils.putInt(buf, offset, cell.getValueLength());// Value length
177    offset = appendKeyTo(cell, buf, offset);
178    offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes
179    int tagsLength = cell.getTagsLength();
180    if (withTags && (tagsLength > 0)) {
181      offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length
182      offset = PrivateCellUtil.copyTagsTo(cell, buf, offset);// Tags bytes
183    }
184    return offset;
185  }
186
187  public static int appendKeyTo(Cell cell, ByteBuffer buf, int offset) {
188    offset = ByteBufferUtils.putShort(buf, offset, cell.getRowLength());// RK length
189    offset = CellUtil.copyRowTo(cell, buf, offset);// Row bytes
190    offset = ByteBufferUtils.putByte(buf, offset, cell.getFamilyLength());// CF length
191    offset = CellUtil.copyFamilyTo(cell, buf, offset);// CF bytes
192    offset = CellUtil.copyQualifierTo(cell, buf, offset);// Qualifier bytes
193    offset = ByteBufferUtils.putLong(buf, offset, cell.getTimestamp());// TS
194    offset = ByteBufferUtils.putByte(buf, offset, cell.getTypeByte());// Type
195    return offset;
196  }
197
198  public static void appendToByteBuffer(final ByteBuffer bb, final KeyValue kv,
199      final boolean includeMvccVersion) {
200    // keep pushing the limit out. assume enough capacity
201    bb.limit(bb.position() + kv.getLength());
202    bb.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
203    if (includeMvccVersion) {
204      int numMvccVersionBytes = WritableUtils.getVIntSize(kv.getSequenceId());
205      ByteBufferUtils.extendLimit(bb, numMvccVersionBytes);
206      ByteBufferUtils.writeVLong(bb, kv.getSequenceId());
207    }
208  }
209
210
211  /**************** iterating *******************************/
212
213  /**
214   * Creates a new KeyValue object positioned in the supplied ByteBuffer and sets the ByteBuffer's
215   * position to the start of the next KeyValue. Does not allocate a new array or copy data.
216   * @param bb
217   * @param includesMvccVersion
218   * @param includesTags
219   */
220  public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion,
221      boolean includesTags) {
222    if (bb.isDirect()) {
223      throw new IllegalArgumentException("only supports heap buffers");
224    }
225    if (bb.remaining() < 1) {
226      return null;
227    }
228    KeyValue keyValue = null;
229    int underlyingArrayOffset = bb.arrayOffset() + bb.position();
230    int keyLength = bb.getInt();
231    int valueLength = bb.getInt();
232    ByteBufferUtils.skip(bb, keyLength + valueLength);
233    int tagsLength = 0;
234    if (includesTags) {
235      // Read short as unsigned, high byte first
236      tagsLength = ((bb.get() & 0xff) << 8) ^ (bb.get() & 0xff);
237      ByteBufferUtils.skip(bb, tagsLength);
238    }
239    int kvLength = (int) KeyValue.getKeyValueDataStructureSize(keyLength, valueLength, tagsLength);
240    keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
241    if (includesMvccVersion) {
242      long mvccVersion = ByteBufferUtils.readVLong(bb);
243      keyValue.setSequenceId(mvccVersion);
244    }
245    return keyValue;
246  }
247
248
249  /*************** next/previous **********************************/
250
251  /**
252   * Decrement the timestamp.  For tests (currently wasteful)
253   *
254   * Remember timestamps are sorted reverse chronologically.
255   * @param in
256   * @return previous key
257   */
258  public static KeyValue previousKey(final KeyValue in) {
259    return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in),
260      CellUtil.cloneQualifier(in), in.getTimestamp() - 1);
261  }
262
263
264  /**
265   * Create a KeyValue for the specified row, family and qualifier that would be
266   * larger than or equal to all other possible KeyValues that have the same
267   * row, family, qualifier. Used for reseeking. Should NEVER be returned to a client.
268   *
269   * @param row
270   *          row key
271   * @param roffset
272   *         row offset
273   * @param rlength
274   *         row length
275   * @param family
276   *         family name
277   * @param foffset
278   *         family offset
279   * @param flength
280   *         family length
281   * @param qualifier
282   *        column qualifier
283   * @param qoffset
284   *        qualifier offset
285   * @param qlength
286   *        qualifier length
287   * @return Last possible key on passed row, family, qualifier.
288   */
289  public static KeyValue createLastOnRow(final byte[] row, final int roffset, final int rlength,
290      final byte[] family, final int foffset, final int flength, final byte[] qualifier,
291      final int qoffset, final int qlength) {
292    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
293        qlength, HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
294  }
295
296  /**
297   * Create a KeyValue that is smaller than all other possible KeyValues
298   * for the given row. That is any (valid) KeyValue on 'row' would sort
299   * _after_ the result.
300   *
301   * @param row - row key (arbitrary byte array)
302   * @return First possible KeyValue on passed <code>row</code>
303   */
304  public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
305    return new KeyValue(row, roffset, rlength,
306        null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
307  }
308
309  /**
310   * Creates a KeyValue that is last on the specified row id. That is,
311   * every other possible KeyValue for the given row would compareTo()
312   * less than the result of this call.
313   * @param row row key
314   * @return Last possible KeyValue on passed <code>row</code>
315   */
316  public static KeyValue createLastOnRow(final byte[] row) {
317    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
318  }
319
320  /**
321   * Create a KeyValue that is smaller than all other possible KeyValues
322   * for the given row. That is any (valid) KeyValue on 'row' would sort
323   * _after_ the result.
324   *
325   * @param row - row key (arbitrary byte array)
326   * @return First possible KeyValue on passed <code>row</code>
327   */
328  public static KeyValue createFirstOnRow(final byte [] row) {
329    return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
330  }
331
332  /**
333   * Creates a KeyValue that is smaller than all other KeyValues that
334   * are older than the passed timestamp.
335   * @param row - row key (arbitrary byte array)
336   * @param ts - timestamp
337   * @return First possible key on passed <code>row</code> and timestamp.
338   */
339  public static KeyValue createFirstOnRow(final byte [] row,
340      final long ts) {
341    return new KeyValue(row, null, null, ts, Type.Maximum);
342  }
343
344  /**
345   * Create a KeyValue for the specified row, family and qualifier that would be
346   * smaller than all other possible KeyValues that have the same row,family,qualifier.
347   * Used for seeking.
348   * @param row - row key (arbitrary byte array)
349   * @param family - family name
350   * @param qualifier - column qualifier
351   * @return First possible key on passed <code>row</code>, and column.
352   */
353  public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
354      final byte [] qualifier) {
355    return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
356  }
357
358  /**
359   * @param row - row key (arbitrary byte array)
360   * @param f - family name
361   * @param q - column qualifier
362   * @param ts - timestamp
363   * @return First possible key on passed <code>row</code>, column and timestamp
364   */
365  public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
366      final byte [] q, final long ts) {
367    return new KeyValue(row, f, q, ts, Type.Maximum);
368  }
369
370  /**
371   * Create a KeyValue for the specified row, family and qualifier that would be
372   * smaller than all other possible KeyValues that have the same row,
373   * family, qualifier.
374   * Used for seeking.
375   * @param row row key
376   * @param roffset row offset
377   * @param rlength row length
378   * @param family family name
379   * @param foffset family offset
380   * @param flength family length
381   * @param qualifier column qualifier
382   * @param qoffset qualifier offset
383   * @param qlength qualifier length
384   * @return First possible key on passed Row, Family, Qualifier.
385   */
386  public static KeyValue createFirstOnRow(final byte [] row,
387      final int roffset, final int rlength, final byte [] family,
388      final int foffset, final int flength, final byte [] qualifier,
389      final int qoffset, final int qlength) {
390    return new KeyValue(row, roffset, rlength, family,
391        foffset, flength, qualifier, qoffset, qlength,
392        HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
393  }
394
395  /**
396   * Create a KeyValue for the specified row, family and qualifier that would be
397   * smaller than all other possible KeyValues that have the same row,
398   * family, qualifier.
399   * Used for seeking.
400   *
401   * @param buffer the buffer to use for the new <code>KeyValue</code> object
402   * @param row the value key
403   * @param family family name
404   * @param qualifier column qualifier
405   *
406   * @return First possible key on passed Row, Family, Qualifier.
407   *
408   * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
409   * than the provided buffer or than <code>Integer.MAX_VALUE</code>
410   */
411  public static KeyValue createFirstOnRow(byte [] buffer, final byte [] row,
412      final byte [] family, final byte [] qualifier)
413          throws IllegalArgumentException {
414    return createFirstOnRow(buffer, 0, row, 0, row.length,
415        family, 0, family.length,
416        qualifier, 0, qualifier.length);
417  }
418
419  /**
420   * Create a KeyValue for the specified row, family and qualifier that would be
421   * smaller than all other possible KeyValues that have the same row,
422   * family, qualifier.
423   * Used for seeking.
424   *
425   * @param buffer the buffer to use for the new <code>KeyValue</code> object
426   * @param boffset buffer offset
427   * @param row the value key
428   * @param roffset row offset
429   * @param rlength row length
430   * @param family family name
431   * @param foffset family offset
432   * @param flength family length
433   * @param qualifier column qualifier
434   * @param qoffset qualifier offset
435   * @param qlength qualifier length
436   *
437   * @return First possible key on passed Row, Family, Qualifier.
438   *
439   * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
440   * than the provided buffer or than <code>Integer.MAX_VALUE</code>
441   */
442  public static KeyValue createFirstOnRow(byte[] buffer, final int boffset, final byte[] row,
443      final int roffset, final int rlength, final byte[] family, final int foffset,
444      final int flength, final byte[] qualifier, final int qoffset, final int qlength)
445      throws IllegalArgumentException {
446
447    long lLength = KeyValue.getKeyValueDataStructureSize(rlength, flength, qlength, 0);
448
449    if (lLength > Integer.MAX_VALUE) {
450      throw new IllegalArgumentException("KeyValue length " + lLength + " > " + Integer.MAX_VALUE);
451    }
452    int iLength = (int) lLength;
453    if (buffer.length - boffset < iLength) {
454      throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < "
455          + iLength);
456    }
457
458    int len = KeyValue.writeByteArray(buffer, boffset, row, roffset, rlength, family, foffset,
459        flength, qualifier, qoffset, qlength, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum,
460        null, 0, 0, null);
461    return new KeyValue(buffer, boffset, len);
462  }
463
464  /*************** misc **********************************/
465  /**
466   * @param cell
467   * @return <code>cell</code> if it is an object of class {@link KeyValue} else we will return a
468   *         new {@link KeyValue} instance made from <code>cell</code> Note: Even if the cell is an
469   *         object of any of the subclass of {@link KeyValue}, we will create a new
470   *         {@link KeyValue} object wrapping same buffer. This API is used only with MR based tools
471   *         which expect the type to be exactly KeyValue. That is the reason for doing this way.
472   * @deprecated without any replacement.
473   */
474  @Deprecated
475  public static KeyValue ensureKeyValue(final Cell cell) {
476    if (cell == null) return null;
477    if (cell instanceof KeyValue) {
478      if (cell.getClass().getName().equals(KeyValue.class.getName())) {
479        return (KeyValue) cell;
480      }
481      // Cell is an Object of any of the sub classes of KeyValue. Make a new KeyValue wrapping the
482      // same byte[]
483      KeyValue kv = (KeyValue) cell;
484      KeyValue newKv = new KeyValue(kv.bytes, kv.offset, kv.length);
485      newKv.setSequenceId(kv.getSequenceId());
486      return newKv;
487    }
488    return copyToNewKeyValue(cell);
489  }
490
491  @Deprecated
492  public static List<KeyValue> ensureKeyValues(List<Cell> cells) {
493    List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() {
494      @Override
495      public KeyValue apply(Cell arg0) {
496        return KeyValueUtil.ensureKeyValue(arg0);
497      }
498    });
499    return new ArrayList<>(lazyList);
500  }
501  /**
502   * Write out a KeyValue in the manner in which we used to when KeyValue was a
503   * Writable.
504   *
505   * @param kv
506   * @param out
507   * @return Length written on stream
508   * @throws IOException
509   * @see #create(DataInput) for the inverse function
510   */
511  public static long write(final KeyValue kv, final DataOutput out) throws IOException {
512    // This is how the old Writables write used to serialize KVs. Need to figure
513    // way to make it
514    // work for all implementations.
515    int length = kv.getLength();
516    out.writeInt(length);
517    out.write(kv.getBuffer(), kv.getOffset(), length);
518    return (long) length + Bytes.SIZEOF_INT;
519  }
520
521  static String bytesToHex(byte[] buf, int offset, int length) {
522    String bufferContents = buf != null ? Bytes.toStringBinary(buf, offset, length) : "<null>";
523    return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset
524        + ", length=" + length;
525  }
526
527  static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
528    if (buf == null) {
529      throw new IllegalArgumentException("Invalid to have null " +
530          "byte array in KeyValue.");
531    }
532
533    int pos = offset, endOffset = offset + length;
534    // check the key
535    if (pos + Bytes.SIZEOF_INT > endOffset) {
536      throw new IllegalArgumentException(
537          "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length));
538    }
539    int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
540    pos += Bytes.SIZEOF_INT;
541    if (keyLen <= 0 || pos + keyLen > endOffset) {
542      throw new IllegalArgumentException(
543          "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length));
544    }
545    // check the value
546    if (pos + Bytes.SIZEOF_INT > endOffset) {
547      throw new IllegalArgumentException("Overflow when reading value length at position=" + pos
548          + bytesToHex(buf, offset, length));
549    }
550    int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
551    pos += Bytes.SIZEOF_INT;
552    if (valLen < 0 || pos + valLen > endOffset) {
553      throw new IllegalArgumentException("Invalid value length in KeyValue, valueLength=" + valLen
554          + bytesToHex(buf, offset, length));
555    }
556    // check the row
557    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
558      throw new IllegalArgumentException(
559          "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length));
560    }
561    short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
562    pos += Bytes.SIZEOF_SHORT;
563    if (rowLen < 0 || pos + rowLen > endOffset) {
564      throw new IllegalArgumentException(
565          "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length));
566    }
567    pos += rowLen;
568    // check the family
569    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
570      throw new IllegalArgumentException("Overflow when reading family length at position=" + pos
571          + bytesToHex(buf, offset, length));
572    }
573    int familyLen = buf[pos];
574    pos += Bytes.SIZEOF_BYTE;
575    if (familyLen < 0 || pos + familyLen > endOffset) {
576      throw new IllegalArgumentException("Invalid family length in KeyValue, familyLength="
577          + familyLen + bytesToHex(buf, offset, length));
578    }
579    pos += familyLen;
580    // check the qualifier
581    int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
582        - Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
583    if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
584      throw new IllegalArgumentException("Invalid qualifier length in KeyValue, qualifierLen="
585          + qualifierLen + bytesToHex(buf, offset, length));
586    }
587    pos += qualifierLen;
588    // check the timestamp
589    if (pos + Bytes.SIZEOF_LONG > endOffset) {
590      throw new IllegalArgumentException(
591          "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length));
592    }
593    long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
594    if (timestamp < 0) {
595      throw new IllegalArgumentException(
596          "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length));
597    }
598    pos += Bytes.SIZEOF_LONG;
599    // check the type
600    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
601      throw new IllegalArgumentException(
602          "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length));
603    }
604    byte type = buf[pos];
605    if (!Type.isValidType(type)) {
606      throw new IllegalArgumentException(
607          "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length));
608    }
609    pos += Bytes.SIZEOF_BYTE;
610    // check the value
611    if (pos + valLen > endOffset) {
612      throw new IllegalArgumentException(
613          "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length));
614    }
615    pos += valLen;
616    // check the tags
617    if (withTags) {
618      if (pos == endOffset) {
619        // withTags is true but no tag in the cell.
620        return;
621      }
622      if (pos + Bytes.SIZEOF_SHORT > endOffset) {
623        throw new IllegalArgumentException("Overflow when reading tags length at position=" + pos
624            + bytesToHex(buf, offset, length));
625      }
626      short tagsLen = Bytes.toShort(buf, pos);
627      pos += Bytes.SIZEOF_SHORT;
628      if (tagsLen < 0 || pos + tagsLen > endOffset) {
629        throw new IllegalArgumentException("Invalid tags length in KeyValue at position="
630            + (pos - Bytes.SIZEOF_SHORT) + bytesToHex(buf, offset, length));
631      }
632      int tagsEndOffset = pos + tagsLen;
633      for (; pos < tagsEndOffset;) {
634        if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
635          throw new IllegalArgumentException("Overflow when reading tag length at position=" + pos
636              + bytesToHex(buf, offset, length));
637        }
638        short tagLen = Bytes.toShort(buf, pos);
639        pos += Tag.TAG_LENGTH_SIZE;
640        // tagLen contains one byte tag type, so must be not less than 1.
641        if (tagLen < 1 || pos + tagLen > endOffset) {
642          throw new IllegalArgumentException(
643              "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
644                  + tagLen + bytesToHex(buf, offset, length));
645        }
646        pos += tagLen;
647      }
648    }
649    if (pos != endOffset) {
650      throw new IllegalArgumentException("Some redundant bytes in KeyValue's buffer, startOffset="
651          + pos + ", endOffset=" + endOffset + bytesToHex(buf, offset, length));
652    }
653  }
654
655  /**
656   * Create a KeyValue reading from the raw InputStream. Named
657   * <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
658   * @param in inputStream to read.
659   * @param withTags whether the keyvalue should include tags are not
660   * @return Created KeyValue OR if we find a length of zero, we will return null which can be
661   *         useful marking a stream as done.
662   * @throws IOException
663   */
664  public static KeyValue createKeyValueFromInputStream(InputStream in, boolean withTags)
665      throws IOException {
666    byte[] intBytes = new byte[Bytes.SIZEOF_INT];
667    int bytesRead = 0;
668    while (bytesRead < intBytes.length) {
669      int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
670      if (n < 0) {
671        if (bytesRead == 0) {
672          throw new EOFException();
673        }
674        throw new IOException("Failed read of int, read " + bytesRead + " bytes");
675      }
676      bytesRead += n;
677    }
678    byte[] bytes = new byte[Bytes.toInt(intBytes)];
679    IOUtils.readFully(in, bytes, 0, bytes.length);
680    return withTags ? new KeyValue(bytes, 0, bytes.length)
681        : new NoTagsKeyValue(bytes, 0, bytes.length);
682  }
683
684  /**
685   * @param b
686   * @return A KeyValue made of a byte array that holds the key-only part.
687   *         Needed to convert hfile index members to KeyValues.
688   */
689  public static KeyValue createKeyValueFromKey(final byte[] b) {
690    return createKeyValueFromKey(b, 0, b.length);
691  }
692
693  /**
694   * @param bb
695   * @return A KeyValue made of a byte buffer that holds the key-only part.
696   *         Needed to convert hfile index members to KeyValues.
697   */
698  public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
699    return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
700  }
701
702  /**
703   * @param b
704   * @param o
705   * @param l
706   * @return A KeyValue made of a byte array that holds the key-only part.
707   *         Needed to convert hfile index members to KeyValues.
708   */
709  public static KeyValue createKeyValueFromKey(final byte[] b, final int o, final int l) {
710    byte[] newb = new byte[l + KeyValue.ROW_OFFSET];
711    System.arraycopy(b, o, newb, KeyValue.ROW_OFFSET, l);
712    Bytes.putInt(newb, 0, l);
713    Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
714    return new KeyValue(newb);
715  }
716
717  /**
718   * @param in
719   *          Where to read bytes from. Creates a byte array to hold the
720   *          KeyValue backing bytes copied from the steam.
721   * @return KeyValue created by deserializing from <code>in</code> OR if we
722   *         find a length of zero, we will return null which can be useful
723   *         marking a stream as done.
724   * @throws IOException
725   */
726  public static KeyValue create(final DataInput in) throws IOException {
727    return create(in.readInt(), in);
728  }
729
730  /**
731   * Create a KeyValue reading <code>length</code> from <code>in</code>
732   *
733   * @param length
734   * @param in
735   * @return Created KeyValue OR if we find a length of zero, we will return
736   *         null which can be useful marking a stream as done.
737   * @throws IOException
738   */
739  public static KeyValue create(int length, final DataInput in) throws IOException {
740
741    if (length <= 0) {
742      if (length == 0)
743        return null;
744      throw new IOException("Failed read " + length + " bytes, stream corrupt?");
745    }
746
747    // This is how the old Writables.readFrom used to deserialize. Didn't even
748    // vint.
749    byte[] bytes = new byte[length];
750    in.readFully(bytes);
751    return new KeyValue(bytes, 0, length);
752  }
753
754  public static int getSerializedSize(Cell cell, boolean withTags) {
755    if (cell instanceof ExtendedCell) {
756      return ((ExtendedCell) cell).getSerializedSize(withTags);
757    }
758    return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
759        cell.getValueLength(), cell.getTagsLength(), withTags);
760  }
761
762  public static int oswrite(final Cell cell, final OutputStream out, final boolean withTags)
763      throws IOException {
764    if (cell instanceof ExtendedCell) {
765      return ((ExtendedCell)cell).write(out, withTags);
766    } else {
767      short rlen = cell.getRowLength();
768      byte flen = cell.getFamilyLength();
769      int qlen = cell.getQualifierLength();
770      int vlen = cell.getValueLength();
771      int tlen = cell.getTagsLength();
772      int size = 0;
773      // write key length
774      int klen = keyLength(rlen, flen, qlen);
775      ByteBufferUtils.putInt(out, klen);
776      // write value length
777      ByteBufferUtils.putInt(out, vlen);
778      // Write rowkey - 2 bytes rk length followed by rowkey bytes
779      StreamUtils.writeShort(out, rlen);
780      out.write(cell.getRowArray(), cell.getRowOffset(), rlen);
781      // Write cf - 1 byte of cf length followed by the family bytes
782      out.write(flen);
783      out.write(cell.getFamilyArray(), cell.getFamilyOffset(), flen);
784      // write qualifier
785      out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qlen);
786      // write timestamp
787      StreamUtils.writeLong(out, cell.getTimestamp());
788      // write the type
789      out.write(cell.getTypeByte());
790      // write value
791      out.write(cell.getValueArray(), cell.getValueOffset(), vlen);
792      size = klen + vlen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
793      // write tags if we have to
794      if (withTags && tlen > 0) {
795        // 2 bytes tags length followed by tags bytes
796        // tags length is serialized with 2 bytes only(short way) even if the
797        // type is int. As this
798        // is non -ve numbers, we save the sign bit. See HBASE-11437
799        out.write((byte) (0xff & (tlen >> 8)));
800        out.write((byte) (0xff & tlen));
801        out.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
802        size += tlen + KeyValue.TAGS_LENGTH_SIZE;
803      }
804      return size;
805    }
806  }
807}