001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.EOFException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.List;
030
031import org.apache.hadoop.hbase.KeyValue.Type;
032import org.apache.hadoop.hbase.io.util.StreamUtils;
033import org.apache.hadoop.hbase.util.ByteBufferUtils;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.io.IOUtils;
036import org.apache.hadoop.io.WritableUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.base.Function;
042import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
043import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
044
045/**
046 * static convenience methods for dealing with KeyValues and collections of KeyValues
047 */
048@InterfaceAudience.Private
049public class KeyValueUtil {
050
051  private static final Logger LOG = LoggerFactory.getLogger(KeyValueUtil.class);
052
053  /**************** length *********************/
054
055  /**
056   * Returns number of bytes this cell would have been used if serialized as in {@link KeyValue}
057   * @param cell
058   * @return the length
059   */
060  public static int length(final Cell cell) {
061    return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
062        cell.getValueLength(), cell.getTagsLength(), true);
063  }
064
065  public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
066    if (withTags) {
067      return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen));
068    }
069    return (int) (KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen));
070  }
071
072  /**
073   * Returns number of bytes this cell's key part would have been used if serialized as in
074   * {@link KeyValue}. Key includes rowkey, family, qualifier, timestamp and type.
075   * @param cell
076   * @return the key length
077   */
078  public static int keyLength(final Cell cell) {
079    return keyLength(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength());
080  }
081
082  private static int keyLength(short rlen, byte flen, int qlen) {
083    return (int) KeyValue.getKeyDataStructureSize(rlen, flen, qlen);
084  }
085
086  public static int lengthWithMvccVersion(final KeyValue kv, final boolean includeMvccVersion) {
087    int length = kv.getLength();
088    if (includeMvccVersion) {
089      length += WritableUtils.getVIntSize(kv.getSequenceId());
090    }
091    return length;
092  }
093
094  public static int totalLengthWithMvccVersion(final Iterable<? extends KeyValue> kvs,
095      final boolean includeMvccVersion) {
096    int length = 0;
097    for (KeyValue kv : IterableUtils.emptyIfNull(kvs)) {
098      length += lengthWithMvccVersion(kv, includeMvccVersion);
099    }
100    return length;
101  }
102
103
104  /**************** copy the cell to create a new keyvalue *********************/
105
106  public static KeyValue copyToNewKeyValue(final Cell cell) {
107    byte[] bytes = copyToNewByteArray(cell);
108    KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
109    kvCell.setSequenceId(cell.getSequenceId());
110    return kvCell;
111  }
112
113  /**
114   * The position will be set to the beginning of the new ByteBuffer
115   * @param cell
116   * @return the Bytebuffer containing the key part of the cell
117   */
118  public static ByteBuffer copyKeyToNewByteBuffer(final Cell cell) {
119    byte[] bytes = new byte[keyLength(cell)];
120    appendKeyTo(cell, bytes, 0);
121    ByteBuffer buffer = ByteBuffer.wrap(bytes);
122    return buffer;
123  }
124
125  /**
126   * Copies the key to a new KeyValue
127   * @param cell
128   * @return the KeyValue that consists only the key part of the incoming cell
129   */
130  public static KeyValue toNewKeyCell(final Cell cell) {
131    byte[] bytes = new byte[keyLength(cell)];
132    appendKeyTo(cell, bytes, 0);
133    KeyValue kv = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
134    // Set the seq id. The new key cell could be used in comparisons so it
135    // is important that it uses the seqid also. If not the comparsion would fail
136    kv.setSequenceId(cell.getSequenceId());
137    return kv;
138  }
139
140  public static byte[] copyToNewByteArray(final Cell cell) {
141    int v1Length = length(cell);
142    byte[] backingBytes = new byte[v1Length];
143    appendToByteArray(cell, backingBytes, 0, true);
144    return backingBytes;
145  }
146
147  public static int appendKeyTo(final Cell cell, final byte[] output,
148      final int offset) {
149    int nextOffset = offset;
150    nextOffset = Bytes.putShort(output, nextOffset, cell.getRowLength());
151    nextOffset = CellUtil.copyRowTo(cell, output, nextOffset);
152    nextOffset = Bytes.putByte(output, nextOffset, cell.getFamilyLength());
153    nextOffset = CellUtil.copyFamilyTo(cell, output, nextOffset);
154    nextOffset = CellUtil.copyQualifierTo(cell, output, nextOffset);
155    nextOffset = Bytes.putLong(output, nextOffset, cell.getTimestamp());
156    nextOffset = Bytes.putByte(output, nextOffset, cell.getTypeByte());
157    return nextOffset;
158  }
159
160  /**************** copy key and value *********************/
161
162  public static int appendToByteArray(Cell cell, byte[] output, int offset, boolean withTags) {
163    int pos = offset;
164    pos = Bytes.putInt(output, pos, keyLength(cell));
165    pos = Bytes.putInt(output, pos, cell.getValueLength());
166    pos = appendKeyTo(cell, output, pos);
167    pos = CellUtil.copyValueTo(cell, output, pos);
168    if (withTags && (cell.getTagsLength() > 0)) {
169      pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
170      pos = PrivateCellUtil.copyTagsTo(cell, output, pos);
171    }
172    return pos;
173  }
174
175  /**
176   * Copy the Cell content into the passed buf in KeyValue serialization format.
177   */
178  public static int appendTo(Cell cell, ByteBuffer buf, int offset, boolean withTags) {
179    offset = ByteBufferUtils.putInt(buf, offset, keyLength(cell));// Key length
180    offset = ByteBufferUtils.putInt(buf, offset, cell.getValueLength());// Value length
181    offset = appendKeyTo(cell, buf, offset);
182    offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes
183    int tagsLength = cell.getTagsLength();
184    if (withTags && (tagsLength > 0)) {
185      offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length
186      offset = PrivateCellUtil.copyTagsTo(cell, buf, offset);// Tags bytes
187    }
188    return offset;
189  }
190
191  public static int appendKeyTo(Cell cell, ByteBuffer buf, int offset) {
192    offset = ByteBufferUtils.putShort(buf, offset, cell.getRowLength());// RK length
193    offset = CellUtil.copyRowTo(cell, buf, offset);// Row bytes
194    offset = ByteBufferUtils.putByte(buf, offset, cell.getFamilyLength());// CF length
195    offset = CellUtil.copyFamilyTo(cell, buf, offset);// CF bytes
196    offset = CellUtil.copyQualifierTo(cell, buf, offset);// Qualifier bytes
197    offset = ByteBufferUtils.putLong(buf, offset, cell.getTimestamp());// TS
198    offset = ByteBufferUtils.putByte(buf, offset, cell.getTypeByte());// Type
199    return offset;
200  }
201
202  public static void appendToByteBuffer(final ByteBuffer bb, final KeyValue kv,
203      final boolean includeMvccVersion) {
204    // keep pushing the limit out. assume enough capacity
205    bb.limit(bb.position() + kv.getLength());
206    bb.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
207    if (includeMvccVersion) {
208      int numMvccVersionBytes = WritableUtils.getVIntSize(kv.getSequenceId());
209      ByteBufferUtils.extendLimit(bb, numMvccVersionBytes);
210      ByteBufferUtils.writeVLong(bb, kv.getSequenceId());
211    }
212  }
213
214
215  /**************** iterating *******************************/
216
217  /**
218   * Creates a new KeyValue object positioned in the supplied ByteBuffer and sets the ByteBuffer's
219   * position to the start of the next KeyValue. Does not allocate a new array or copy data.
220   * @param bb
221   * @param includesMvccVersion
222   * @param includesTags
223   */
224  public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion,
225      boolean includesTags) {
226    if (bb.isDirect()) {
227      throw new IllegalArgumentException("only supports heap buffers");
228    }
229    if (bb.remaining() < 1) {
230      return null;
231    }
232    KeyValue keyValue = null;
233    int underlyingArrayOffset = bb.arrayOffset() + bb.position();
234    int keyLength = bb.getInt();
235    int valueLength = bb.getInt();
236    ByteBufferUtils.skip(bb, keyLength + valueLength);
237    int tagsLength = 0;
238    if (includesTags) {
239      // Read short as unsigned, high byte first
240      tagsLength = ((bb.get() & 0xff) << 8) ^ (bb.get() & 0xff);
241      ByteBufferUtils.skip(bb, tagsLength);
242    }
243    int kvLength = (int) KeyValue.getKeyValueDataStructureSize(keyLength, valueLength, tagsLength);
244    keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
245    if (includesMvccVersion) {
246      long mvccVersion = ByteBufferUtils.readVLong(bb);
247      keyValue.setSequenceId(mvccVersion);
248    }
249    return keyValue;
250  }
251
252
253  /*************** next/previous **********************************/
254
255  /**
256   * Decrement the timestamp.  For tests (currently wasteful)
257   *
258   * Remember timestamps are sorted reverse chronologically.
259   * @param in
260   * @return previous key
261   */
262  public static KeyValue previousKey(final KeyValue in) {
263    return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in),
264      CellUtil.cloneQualifier(in), in.getTimestamp() - 1);
265  }
266
267
268  /**
269   * Create a KeyValue for the specified row, family and qualifier that would be
270   * larger than or equal to all other possible KeyValues that have the same
271   * row, family, qualifier. Used for reseeking. Should NEVER be returned to a client.
272   *
273   * @param row
274   *          row key
275   * @param roffset
276   *         row offset
277   * @param rlength
278   *         row length
279   * @param family
280   *         family name
281   * @param foffset
282   *         family offset
283   * @param flength
284   *         family length
285   * @param qualifier
286   *        column qualifier
287   * @param qoffset
288   *        qualifier offset
289   * @param qlength
290   *        qualifier length
291   * @return Last possible key on passed row, family, qualifier.
292   */
293  public static KeyValue createLastOnRow(final byte[] row, final int roffset, final int rlength,
294      final byte[] family, final int foffset, final int flength, final byte[] qualifier,
295      final int qoffset, final int qlength) {
296    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
297        qlength, HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
298  }
299
300  /**
301   * Create a KeyValue that is smaller than all other possible KeyValues
302   * for the given row. That is any (valid) KeyValue on 'row' would sort
303   * _after_ the result.
304   *
305   * @param row - row key (arbitrary byte array)
306   * @return First possible KeyValue on passed <code>row</code>
307   */
308  public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
309    return new KeyValue(row, roffset, rlength,
310        null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
311  }
312
313  /**
314   * Creates a KeyValue that is last on the specified row id. That is,
315   * every other possible KeyValue for the given row would compareTo()
316   * less than the result of this call.
317   * @param row row key
318   * @return Last possible KeyValue on passed <code>row</code>
319   */
320  public static KeyValue createLastOnRow(final byte[] row) {
321    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
322  }
323
324  /**
325   * Create a KeyValue that is smaller than all other possible KeyValues
326   * for the given row. That is any (valid) KeyValue on 'row' would sort
327   * _after_ the result.
328   *
329   * @param row - row key (arbitrary byte array)
330   * @return First possible KeyValue on passed <code>row</code>
331   */
332  public static KeyValue createFirstOnRow(final byte [] row) {
333    return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
334  }
335
336  /**
337   * Creates a KeyValue that is smaller than all other KeyValues that
338   * are older than the passed timestamp.
339   * @param row - row key (arbitrary byte array)
340   * @param ts - timestamp
341   * @return First possible key on passed <code>row</code> and timestamp.
342   */
343  public static KeyValue createFirstOnRow(final byte [] row,
344      final long ts) {
345    return new KeyValue(row, null, null, ts, Type.Maximum);
346  }
347
348  /**
349   * Create a KeyValue for the specified row, family and qualifier that would be
350   * smaller than all other possible KeyValues that have the same row,family,qualifier.
351   * Used for seeking.
352   * @param row - row key (arbitrary byte array)
353   * @param family - family name
354   * @param qualifier - column qualifier
355   * @return First possible key on passed <code>row</code>, and column.
356   */
357  public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
358      final byte [] qualifier) {
359    return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
360  }
361
362  /**
363   * @param row - row key (arbitrary byte array)
364   * @param f - family name
365   * @param q - column qualifier
366   * @param ts - timestamp
367   * @return First possible key on passed <code>row</code>, column and timestamp
368   */
369  public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
370      final byte [] q, final long ts) {
371    return new KeyValue(row, f, q, ts, Type.Maximum);
372  }
373
374  /**
375   * Create a KeyValue for the specified row, family and qualifier that would be
376   * smaller than all other possible KeyValues that have the same row,
377   * family, qualifier.
378   * Used for seeking.
379   * @param row row key
380   * @param roffset row offset
381   * @param rlength row length
382   * @param family family name
383   * @param foffset family offset
384   * @param flength family length
385   * @param qualifier column qualifier
386   * @param qoffset qualifier offset
387   * @param qlength qualifier length
388   * @return First possible key on passed Row, Family, Qualifier.
389   */
390  public static KeyValue createFirstOnRow(final byte [] row,
391      final int roffset, final int rlength, final byte [] family,
392      final int foffset, final int flength, final byte [] qualifier,
393      final int qoffset, final int qlength) {
394    return new KeyValue(row, roffset, rlength, family,
395        foffset, flength, qualifier, qoffset, qlength,
396        HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
397  }
398
399  /**
400   * Create a KeyValue for the specified row, family and qualifier that would be
401   * smaller than all other possible KeyValues that have the same row,
402   * family, qualifier.
403   * Used for seeking.
404   *
405   * @param buffer the buffer to use for the new <code>KeyValue</code> object
406   * @param row the value key
407   * @param family family name
408   * @param qualifier column qualifier
409   *
410   * @return First possible key on passed Row, Family, Qualifier.
411   *
412   * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
413   * than the provided buffer or than <code>Integer.MAX_VALUE</code>
414   */
415  public static KeyValue createFirstOnRow(byte [] buffer, final byte [] row,
416      final byte [] family, final byte [] qualifier)
417          throws IllegalArgumentException {
418    return createFirstOnRow(buffer, 0, row, 0, row.length,
419        family, 0, family.length,
420        qualifier, 0, qualifier.length);
421  }
422
423  /**
424   * Create a KeyValue for the specified row, family and qualifier that would be
425   * smaller than all other possible KeyValues that have the same row,
426   * family, qualifier.
427   * Used for seeking.
428   *
429   * @param buffer the buffer to use for the new <code>KeyValue</code> object
430   * @param boffset buffer offset
431   * @param row the value key
432   * @param roffset row offset
433   * @param rlength row length
434   * @param family family name
435   * @param foffset family offset
436   * @param flength family length
437   * @param qualifier column qualifier
438   * @param qoffset qualifier offset
439   * @param qlength qualifier length
440   *
441   * @return First possible key on passed Row, Family, Qualifier.
442   *
443   * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
444   * than the provided buffer or than <code>Integer.MAX_VALUE</code>
445   */
446  public static KeyValue createFirstOnRow(byte[] buffer, final int boffset, final byte[] row,
447      final int roffset, final int rlength, final byte[] family, final int foffset,
448      final int flength, final byte[] qualifier, final int qoffset, final int qlength)
449      throws IllegalArgumentException {
450
451    long lLength = KeyValue.getKeyValueDataStructureSize(rlength, flength, qlength, 0);
452
453    if (lLength > Integer.MAX_VALUE) {
454      throw new IllegalArgumentException("KeyValue length " + lLength + " > " + Integer.MAX_VALUE);
455    }
456    int iLength = (int) lLength;
457    if (buffer.length - boffset < iLength) {
458      throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < "
459          + iLength);
460    }
461
462    int len = KeyValue.writeByteArray(buffer, boffset, row, roffset, rlength, family, foffset,
463        flength, qualifier, qoffset, qlength, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum,
464        null, 0, 0, null);
465    return new KeyValue(buffer, boffset, len);
466  }
467
468  /*************** misc **********************************/
469  /**
470   * @param cell
471   * @return <code>cell</code> if it is an object of class {@link KeyValue} else we will return a
472   *         new {@link KeyValue} instance made from <code>cell</code> Note: Even if the cell is an
473   *         object of any of the subclass of {@link KeyValue}, we will create a new
474   *         {@link KeyValue} object wrapping same buffer. This API is used only with MR based tools
475   *         which expect the type to be exactly KeyValue. That is the reason for doing this way.
476   * @deprecated without any replacement.
477   */
478  @Deprecated
479  public static KeyValue ensureKeyValue(final Cell cell) {
480    if (cell == null) return null;
481    if (cell instanceof KeyValue) {
482      if (cell.getClass().getName().equals(KeyValue.class.getName())) {
483        return (KeyValue) cell;
484      }
485      // Cell is an Object of any of the sub classes of KeyValue. Make a new KeyValue wrapping the
486      // same byte[]
487      KeyValue kv = (KeyValue) cell;
488      KeyValue newKv = new KeyValue(kv.bytes, kv.offset, kv.length);
489      newKv.setSequenceId(kv.getSequenceId());
490      return newKv;
491    }
492    return copyToNewKeyValue(cell);
493  }
494
495  @Deprecated
496  public static List<KeyValue> ensureKeyValues(List<Cell> cells) {
497    List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() {
498      @Override
499      public KeyValue apply(Cell arg0) {
500        return KeyValueUtil.ensureKeyValue(arg0);
501      }
502    });
503    return new ArrayList<>(lazyList);
504  }
505  /**
506   * Write out a KeyValue in the manner in which we used to when KeyValue was a
507   * Writable.
508   *
509   * @param kv
510   * @param out
511   * @return Length written on stream
512   * @throws IOException
513   * @see #create(DataInput) for the inverse function
514   */
515  public static long write(final KeyValue kv, final DataOutput out) throws IOException {
516    // This is how the old Writables write used to serialize KVs. Need to figure
517    // way to make it
518    // work for all implementations.
519    int length = kv.getLength();
520    out.writeInt(length);
521    out.write(kv.getBuffer(), kv.getOffset(), length);
522    return (long) length + Bytes.SIZEOF_INT;
523  }
524
525  static String bytesToHex(byte[] buf, int offset, int length) {
526    return ", KeyValueBytesHex=" + Bytes.toStringBinary(buf, offset, length) + ", offset=" + offset
527        + ", length=" + length;
528  }
529
530  static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
531    if (buf == null) {
532      String msg = "Invalid to have null byte array in KeyValue.";
533      LOG.warn(msg);
534      throw new IllegalArgumentException(msg);
535    }
536
537    int pos = offset, endOffset = offset + length;
538    // check the key
539    if (pos + Bytes.SIZEOF_INT > endOffset) {
540      String msg =
541          "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length);
542      LOG.warn(msg);
543      throw new IllegalArgumentException(msg);
544    }
545    int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
546    pos += Bytes.SIZEOF_INT;
547    if (keyLen <= 0 || pos + keyLen > endOffset) {
548      String msg =
549          "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length);
550      LOG.warn(msg);
551      throw new IllegalArgumentException(msg);
552    }
553    // check the value
554    if (pos + Bytes.SIZEOF_INT > endOffset) {
555      String msg =
556          "Overflow when reading value length at position=" + pos + bytesToHex(buf, offset, length);
557      LOG.warn(msg);
558      throw new IllegalArgumentException(msg);
559    }
560    int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
561    pos += Bytes.SIZEOF_INT;
562    if (valLen < 0 || pos + valLen > endOffset) {
563      String msg = "Invalid value length in KeyValue, valueLength=" + valLen +
564          bytesToHex(buf, offset, length);
565      LOG.warn(msg);
566      throw new IllegalArgumentException(msg);
567    }
568    // check the row
569    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
570      String msg =
571          "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length);
572      LOG.warn(msg);
573      throw new IllegalArgumentException(msg);
574    }
575    short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
576    pos += Bytes.SIZEOF_SHORT;
577    if (rowLen < 0 || pos + rowLen > endOffset) {
578      String msg =
579          "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length);
580      LOG.warn(msg);
581      throw new IllegalArgumentException(msg);
582    }
583    pos += rowLen;
584    // check the family
585    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
586      String msg = "Overflow when reading family length at position=" + pos +
587          bytesToHex(buf, offset, length);
588      LOG.warn(msg);
589      throw new IllegalArgumentException(msg);
590    }
591    int familyLen = buf[pos];
592    pos += Bytes.SIZEOF_BYTE;
593    if (familyLen < 0 || pos + familyLen > endOffset) {
594      String msg = "Invalid family length in KeyValue, familyLength=" + familyLen +
595          bytesToHex(buf, offset, length);
596      LOG.warn(msg);
597      throw new IllegalArgumentException(msg);
598    }
599    pos += familyLen;
600    // check the qualifier
601    int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
602        - Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
603    if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
604      String msg = "Invalid qualifier length in KeyValue, qualifierLen=" + qualifierLen +
605              bytesToHex(buf, offset, length);
606      LOG.warn(msg);
607      throw new IllegalArgumentException(msg);
608    }
609    pos += qualifierLen;
610    // check the timestamp
611    if (pos + Bytes.SIZEOF_LONG > endOffset) {
612      String msg =
613          "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length);
614      LOG.warn(msg);
615      throw new IllegalArgumentException(msg);
616    }
617    long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
618    if (timestamp < 0) {
619      String msg =
620          "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length);
621      LOG.warn(msg);
622      throw new IllegalArgumentException(msg);
623    }
624    pos += Bytes.SIZEOF_LONG;
625    // check the type
626    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
627      String msg =
628          "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length);
629      LOG.warn(msg);
630      throw new IllegalArgumentException(msg);
631    }
632    byte type = buf[pos];
633    if (!Type.isValidType(type)) {
634      String msg = "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length);
635      LOG.warn(msg);
636      throw new IllegalArgumentException(msg);
637    }
638    pos += Bytes.SIZEOF_BYTE;
639    // check the value
640    if (pos + valLen > endOffset) {
641      String msg =
642          "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length);
643      LOG.warn(msg);
644      throw new IllegalArgumentException(msg);
645    }
646    pos += valLen;
647    // check the tags
648    if (withTags) {
649      if (pos == endOffset) {
650        // withTags is true but no tag in the cell.
651        return;
652      }
653      pos = checkKeyValueTagBytes(buf, offset, length, pos, endOffset);
654    }
655    if (pos != endOffset) {
656      String msg = "Some redundant bytes in KeyValue's buffer, startOffset=" + pos + ", endOffset="
657          + endOffset + bytesToHex(buf, offset, length);
658      LOG.warn(msg);
659      throw new IllegalArgumentException(msg);
660    }
661  }
662
663  private static int checkKeyValueTagBytes(byte[] buf, int offset, int length, int pos,
664      int endOffset) {
665    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
666      String msg = "Overflow when reading tags length at position=" + pos +
667          bytesToHex(buf, offset, length);
668      LOG.warn(msg);
669      throw new IllegalArgumentException(msg);
670    }
671    short tagsLen = Bytes.toShort(buf, pos);
672    pos += Bytes.SIZEOF_SHORT;
673    if (tagsLen < 0 || pos + tagsLen > endOffset) {
674      String msg = "Invalid tags length in KeyValue at position=" + (pos - Bytes.SIZEOF_SHORT)
675          + bytesToHex(buf, offset, length);
676      LOG.warn(msg);
677      throw new IllegalArgumentException(msg);
678    }
679    int tagsEndOffset = pos + tagsLen;
680    for (; pos < tagsEndOffset;) {
681      if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
682        String msg = "Overflow when reading tag length at position=" + pos +
683            bytesToHex(buf, offset, length);
684        LOG.warn(msg);
685        throw new IllegalArgumentException(msg);
686      }
687      short tagLen = Bytes.toShort(buf, pos);
688      pos += Tag.TAG_LENGTH_SIZE;
689      // tagLen contains one byte tag type, so must be not less than 1.
690      if (tagLen < 1 || pos + tagLen > endOffset) {
691        String msg =
692            "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE) + ", tagLength="
693                + tagLen + bytesToHex(buf, offset, length);
694        LOG.warn(msg);
695        throw new IllegalArgumentException(msg);
696      }
697      pos += tagLen;
698    }
699    return pos;
700  }
701
702  /**
703   * Create a KeyValue reading from the raw InputStream. Named
704   * <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
705   * @param in inputStream to read.
706   * @param withTags whether the keyvalue should include tags are not
707   * @return Created KeyValue OR if we find a length of zero, we will return null which can be
708   *         useful marking a stream as done.
709   * @throws IOException
710   */
711  public static KeyValue createKeyValueFromInputStream(InputStream in, boolean withTags)
712      throws IOException {
713    byte[] intBytes = new byte[Bytes.SIZEOF_INT];
714    int bytesRead = 0;
715    while (bytesRead < intBytes.length) {
716      int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
717      if (n < 0) {
718        if (bytesRead == 0) {
719          throw new EOFException();
720        }
721        throw new IOException("Failed read of int, read " + bytesRead + " bytes");
722      }
723      bytesRead += n;
724    }
725    byte[] bytes = new byte[Bytes.toInt(intBytes)];
726    IOUtils.readFully(in, bytes, 0, bytes.length);
727    return withTags ? new KeyValue(bytes, 0, bytes.length)
728        : new NoTagsKeyValue(bytes, 0, bytes.length);
729  }
730
731  /**
732   * @param b
733   * @return A KeyValue made of a byte array that holds the key-only part.
734   *         Needed to convert hfile index members to KeyValues.
735   */
736  public static KeyValue createKeyValueFromKey(final byte[] b) {
737    return createKeyValueFromKey(b, 0, b.length);
738  }
739
740  /**
741   * @param bb
742   * @return A KeyValue made of a byte buffer that holds the key-only part.
743   *         Needed to convert hfile index members to KeyValues.
744   */
745  public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
746    return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
747  }
748
749  /**
750   * @param b
751   * @param o
752   * @param l
753   * @return A KeyValue made of a byte array that holds the key-only part.
754   *         Needed to convert hfile index members to KeyValues.
755   */
756  public static KeyValue createKeyValueFromKey(final byte[] b, final int o, final int l) {
757    byte[] newb = new byte[l + KeyValue.ROW_OFFSET];
758    System.arraycopy(b, o, newb, KeyValue.ROW_OFFSET, l);
759    Bytes.putInt(newb, 0, l);
760    Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
761    return new KeyValue(newb);
762  }
763
764  /**
765   * @param in
766   *          Where to read bytes from. Creates a byte array to hold the
767   *          KeyValue backing bytes copied from the steam.
768   * @return KeyValue created by deserializing from <code>in</code> OR if we
769   *         find a length of zero, we will return null which can be useful
770   *         marking a stream as done.
771   * @throws IOException
772   */
773  public static KeyValue create(final DataInput in) throws IOException {
774    return create(in.readInt(), in);
775  }
776
777  /**
778   * Create a KeyValue reading <code>length</code> from <code>in</code>
779   *
780   * @param length
781   * @param in
782   * @return Created KeyValue OR if we find a length of zero, we will return
783   *         null which can be useful marking a stream as done.
784   * @throws IOException
785   */
786  public static KeyValue create(int length, final DataInput in) throws IOException {
787
788    if (length <= 0) {
789      if (length == 0)
790        return null;
791      throw new IOException("Failed read " + length + " bytes, stream corrupt?");
792    }
793
794    // This is how the old Writables.readFrom used to deserialize. Didn't even
795    // vint.
796    byte[] bytes = new byte[length];
797    in.readFully(bytes);
798    return new KeyValue(bytes, 0, length);
799  }
800
801  public static int getSerializedSize(Cell cell, boolean withTags) {
802    if (cell instanceof ExtendedCell) {
803      return ((ExtendedCell) cell).getSerializedSize(withTags);
804    }
805    return length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
806        cell.getValueLength(), cell.getTagsLength(), withTags);
807  }
808
809  public static int oswrite(final Cell cell, final OutputStream out, final boolean withTags)
810      throws IOException {
811    if (cell instanceof ExtendedCell) {
812      return ((ExtendedCell)cell).write(out, withTags);
813    } else {
814      short rlen = cell.getRowLength();
815      byte flen = cell.getFamilyLength();
816      int qlen = cell.getQualifierLength();
817      int vlen = cell.getValueLength();
818      int tlen = cell.getTagsLength();
819      int size = 0;
820      // write key length
821      int klen = keyLength(rlen, flen, qlen);
822      ByteBufferUtils.putInt(out, klen);
823      // write value length
824      ByteBufferUtils.putInt(out, vlen);
825      // Write rowkey - 2 bytes rk length followed by rowkey bytes
826      StreamUtils.writeShort(out, rlen);
827      out.write(cell.getRowArray(), cell.getRowOffset(), rlen);
828      // Write cf - 1 byte of cf length followed by the family bytes
829      out.write(flen);
830      out.write(cell.getFamilyArray(), cell.getFamilyOffset(), flen);
831      // write qualifier
832      out.write(cell.getQualifierArray(), cell.getQualifierOffset(), qlen);
833      // write timestamp
834      StreamUtils.writeLong(out, cell.getTimestamp());
835      // write the type
836      out.write(cell.getTypeByte());
837      // write value
838      out.write(cell.getValueArray(), cell.getValueOffset(), vlen);
839      size = klen + vlen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
840      // write tags if we have to
841      if (withTags && tlen > 0) {
842        // 2 bytes tags length followed by tags bytes
843        // tags length is serialized with 2 bytes only(short way) even if the
844        // type is int. As this
845        // is non -ve numbers, we save the sign bit. See HBASE-11437
846        out.write((byte) (0xff & (tlen >> 8)));
847        out.write((byte) (0xff & tlen));
848        out.write(cell.getTagsArray(), cell.getTagsOffset(), tlen);
849        size += tlen + KeyValue.TAGS_LENGTH_SIZE;
850      }
851      return size;
852    }
853  }
854}