001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.EOFException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.ByteBuffer;
026import java.util.ArrayList;
027import java.util.List;
028import org.apache.hadoop.hbase.util.ByteBufferUtils;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.hadoop.io.IOUtils;
031import org.apache.hadoop.io.WritableUtils;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Function;
037import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
038import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
039
040/**
041 * static convenience methods for dealing with KeyValues and collections of KeyValues
042 */
043@InterfaceAudience.Private
044public class KeyValueUtil {
045
046  private static final Logger LOG = LoggerFactory.getLogger(KeyValueUtil.class);
047
048  /**************** length *********************/
049
050  public static int length(short rlen, byte flen, int qlen, int vlen, int tlen, boolean withTags) {
051    if (withTags) {
052      return (int) KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen, tlen);
053    }
054    return (int) KeyValue.getKeyValueDataStructureSize(rlen, flen, qlen, vlen);
055  }
056
057  /**
058   * Returns number of bytes this cell's key part would have been used if serialized as in
059   * {@link KeyValue}. Key includes rowkey, family, qualifier, timestamp and type.
060   * @return the key length
061   */
062  public static int keyLength(final Cell cell) {
063    return keyLength(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength());
064  }
065
066  private static int keyLength(short rlen, byte flen, int qlen) {
067    return (int) KeyValue.getKeyDataStructureSize(rlen, flen, qlen);
068  }
069
070  public static int lengthWithMvccVersion(final KeyValue kv, final boolean includeMvccVersion) {
071    int length = kv.getLength();
072    if (includeMvccVersion) {
073      length += WritableUtils.getVIntSize(kv.getSequenceId());
074    }
075    return length;
076  }
077
078  public static int totalLengthWithMvccVersion(final Iterable<? extends KeyValue> kvs,
079    final boolean includeMvccVersion) {
080    int length = 0;
081    for (KeyValue kv : IterableUtils.emptyIfNull(kvs)) {
082      length += lengthWithMvccVersion(kv, includeMvccVersion);
083    }
084    return length;
085  }
086
087  /**************** copy the cell to create a new keyvalue *********************/
088
089  public static KeyValue copyToNewKeyValue(final ExtendedCell cell) {
090    byte[] bytes = copyToNewByteArray(cell);
091    KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
092    kvCell.setSequenceId(cell.getSequenceId());
093    return kvCell;
094  }
095
096  /**
097   * The position will be set to the beginning of the new ByteBuffer
098   * @return the Bytebuffer containing the key part of the cell
099   */
100  public static ByteBuffer copyKeyToNewByteBuffer(final ExtendedCell cell) {
101    byte[] bytes = new byte[keyLength(cell)];
102    appendKeyTo(cell, bytes, 0);
103    ByteBuffer buffer = ByteBuffer.wrap(bytes);
104    return buffer;
105  }
106
107  /**
108   * Copies the key to a new KeyValue
109   * @return the KeyValue that consists only the key part of the incoming cell
110   */
111  public static KeyValue toNewKeyCell(final ExtendedCell cell) {
112    byte[] bytes = new byte[keyLength(cell)];
113    appendKeyTo(cell, bytes, 0);
114    KeyValue kv = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
115    // Set the seq id. The new key cell could be used in comparisons so it
116    // is important that it uses the seqid also. If not the comparsion would fail
117    kv.setSequenceId(cell.getSequenceId());
118    return kv;
119  }
120
121  public static byte[] copyToNewByteArray(final ExtendedCell cell) {
122    // Cell#getSerializedSize returns the serialized size of the Source cell, which may
123    // not serialize all fields. We are constructing a KeyValue backing array here,
124    // which does include all fields, and must allocate accordingly.
125    // TODO we could probably use Cell#getSerializedSize safely, the errors were
126    // caused by cells corrupted by use-after-free bugs
127    int v1Length = length(cell.getRowLength(), cell.getFamilyLength(), cell.getQualifierLength(),
128      cell.getValueLength(), cell.getTagsLength(), true);
129    byte[] backingBytes = new byte[v1Length];
130    appendToByteArray(cell, backingBytes, 0, true);
131    return backingBytes;
132  }
133
134  public static int appendKeyTo(final ExtendedCell cell, final byte[] output, final int offset) {
135    int nextOffset = offset;
136    nextOffset = Bytes.putShort(output, nextOffset, cell.getRowLength());
137    nextOffset = CellUtil.copyRowTo(cell, output, nextOffset);
138    nextOffset = Bytes.putByte(output, nextOffset, cell.getFamilyLength());
139    nextOffset = CellUtil.copyFamilyTo(cell, output, nextOffset);
140    nextOffset = CellUtil.copyQualifierTo(cell, output, nextOffset);
141    nextOffset = Bytes.putLong(output, nextOffset, cell.getTimestamp());
142    nextOffset = Bytes.putByte(output, nextOffset, cell.getTypeByte());
143    return nextOffset;
144  }
145
146  /**************** copy key and value *********************/
147
148  public static int appendToByteArray(ExtendedCell cell, byte[] output, int offset,
149    boolean withTags) {
150    int pos = offset;
151    pos = Bytes.putInt(output, pos, keyLength(cell));
152    pos = Bytes.putInt(output, pos, cell.getValueLength());
153    pos = appendKeyTo(cell, output, pos);
154    pos = CellUtil.copyValueTo(cell, output, pos);
155    if (withTags && (cell.getTagsLength() > 0)) {
156      pos = Bytes.putAsShort(output, pos, cell.getTagsLength());
157      pos = PrivateCellUtil.copyTagsTo(cell, output, pos);
158    }
159    return pos;
160  }
161
162  /**
163   * Copy the Cell content into the passed buf in KeyValue serialization format.
164   */
165  public static int appendTo(ExtendedCell cell, ByteBuffer buf, int offset, boolean withTags) {
166    offset = ByteBufferUtils.putInt(buf, offset, keyLength(cell));// Key length
167    offset = ByteBufferUtils.putInt(buf, offset, cell.getValueLength());// Value length
168    offset = appendKeyTo(cell, buf, offset);
169    offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes
170    int tagsLength = cell.getTagsLength();
171    if (withTags && (tagsLength > 0)) {
172      offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length
173      offset = PrivateCellUtil.copyTagsTo(cell, buf, offset);// Tags bytes
174    }
175    return offset;
176  }
177
178  public static int appendKeyTo(ExtendedCell cell, ByteBuffer buf, int offset) {
179    offset = ByteBufferUtils.putShort(buf, offset, cell.getRowLength());// RK length
180    offset = CellUtil.copyRowTo(cell, buf, offset);// Row bytes
181    offset = ByteBufferUtils.putByte(buf, offset, cell.getFamilyLength());// CF length
182    offset = CellUtil.copyFamilyTo(cell, buf, offset);// CF bytes
183    offset = CellUtil.copyQualifierTo(cell, buf, offset);// Qualifier bytes
184    offset = ByteBufferUtils.putLong(buf, offset, cell.getTimestamp());// TS
185    offset = ByteBufferUtils.putByte(buf, offset, cell.getTypeByte());// Type
186    return offset;
187  }
188
189  public static void appendToByteBuffer(final ByteBuffer bb, final KeyValue kv,
190    final boolean includeMvccVersion) {
191    // keep pushing the limit out. assume enough capacity
192    bb.limit(bb.position() + kv.getLength());
193    bb.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
194    if (includeMvccVersion) {
195      int numMvccVersionBytes = WritableUtils.getVIntSize(kv.getSequenceId());
196      ByteBufferUtils.extendLimit(bb, numMvccVersionBytes);
197      ByteBufferUtils.writeVLong(bb, kv.getSequenceId());
198    }
199  }
200
201  /**************** iterating *******************************/
202
203  /**
204   * Creates a new KeyValue object positioned in the supplied ByteBuffer and sets the ByteBuffer's
205   * position to the start of the next KeyValue. Does not allocate a new array or copy data.
206   */
207  public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion,
208    boolean includesTags) {
209    if (bb.isDirect()) {
210      throw new IllegalArgumentException("only supports heap buffers");
211    }
212    if (bb.remaining() < 1) {
213      return null;
214    }
215    int underlyingArrayOffset = bb.arrayOffset() + bb.position();
216    int keyLength = bb.getInt();
217    int valueLength = bb.getInt();
218    ByteBufferUtils.skip(bb, keyLength + valueLength);
219    int tagsLength = 0;
220    if (includesTags) {
221      // Read short as unsigned, high byte first
222      tagsLength = ((bb.get() & 0xff) << 8) ^ (bb.get() & 0xff);
223      ByteBufferUtils.skip(bb, tagsLength);
224    }
225    int kvLength = (int) KeyValue.getKeyValueDataStructureSize(keyLength, valueLength, tagsLength);
226    KeyValue keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
227    if (includesMvccVersion) {
228      long mvccVersion = ByteBufferUtils.readVLong(bb);
229      keyValue.setSequenceId(mvccVersion);
230    }
231    return keyValue;
232  }
233
234  /*************** next/previous **********************************/
235
236  /**
237   * Decrement the timestamp. For tests (currently wasteful) Remember timestamps are sorted reverse
238   * chronologically.
239   * @return previous key
240   */
241  public static KeyValue previousKey(final KeyValue in) {
242    return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in),
243      CellUtil.cloneQualifier(in), in.getTimestamp() - 1);
244  }
245
246  /**
247   * Create a KeyValue for the specified row, family and qualifier that would be larger than or
248   * equal to all other possible KeyValues that have the same row, family, qualifier. Used for
249   * reseeking. Should NEVER be returned to a client. row key row offset row length family name
250   * family offset family length column qualifier qualifier offset qualifier length
251   * @return Last possible key on passed row, family, qualifier.
252   */
253  public static KeyValue createLastOnRow(final byte[] row, final int roffset, final int rlength,
254    final byte[] family, final int foffset, final int flength, final byte[] qualifier,
255    final int qoffset, final int qlength) {
256    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
257      qlength, PrivateConstants.OLDEST_TIMESTAMP, KeyValue.Type.Minimum, null, 0, 0);
258  }
259
260  /**
261   * Create a KeyValue that is smaller than all other possible KeyValues for the given row. That is
262   * any (valid) KeyValue on 'row' would sort _after_ the result.
263   * @param row - row key (arbitrary byte array)
264   * @return First possible KeyValue on passed <code>row</code>
265   */
266  public static KeyValue createFirstOnRow(final byte[] row, int roffset, short rlength) {
267    return new KeyValue(row, roffset, rlength, null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP,
268      KeyValue.Type.Maximum, null, 0, 0);
269  }
270
271  /**
272   * Creates a KeyValue that is last on the specified row id. That is, every other possible KeyValue
273   * for the given row would compareTo() less than the result of this call.
274   * @param row row key
275   * @return Last possible KeyValue on passed <code>row</code>
276   */
277  public static KeyValue createLastOnRow(final byte[] row) {
278    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Minimum);
279  }
280
281  /**
282   * Create a KeyValue that is smaller than all other possible KeyValues for the given row. That is
283   * any (valid) KeyValue on 'row' would sort _after_ the result.
284   * @param row - row key (arbitrary byte array)
285   * @return First possible KeyValue on passed <code>row</code>
286   */
287  public static KeyValue createFirstOnRow(final byte[] row) {
288    return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
289  }
290
291  /**
292   * Creates a KeyValue that is smaller than all other KeyValues that are older than the passed
293   * timestamp.
294   * @param row - row key (arbitrary byte array)
295   * @param ts  - timestamp
296   * @return First possible key on passed <code>row</code> and timestamp.
297   */
298  public static KeyValue createFirstOnRow(final byte[] row, final long ts) {
299    return new KeyValue(row, null, null, ts, KeyValue.Type.Maximum);
300  }
301
302  /**
303   * Create a KeyValue for the specified row, family and qualifier that would be smaller than all
304   * other possible KeyValues that have the same row,family,qualifier. Used for seeking.
305   * @param row       - row key (arbitrary byte array)
306   * @param family    - family name
307   * @param qualifier - column qualifier
308   * @return First possible key on passed <code>row</code>, and column.
309   */
310  public static KeyValue createFirstOnRow(final byte[] row, final byte[] family,
311    final byte[] qualifier) {
312    return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum);
313  }
314
315  /**
316   * Create a KeyValue for the specified row, family and qualifier that would be smaller than all
317   * other possible KeyValues that have the same row, family, qualifier. Used for seeking.
318   * @param row - row key (arbitrary byte array)
319   * @param f   - family name
320   * @param q   - column qualifier
321   * @param ts  - timestamp
322   * @return First possible key on passed <code>row</code>, column and timestamp
323   */
324  public static KeyValue createFirstOnRow(final byte[] row, final byte[] f, final byte[] q,
325    final long ts) {
326    return new KeyValue(row, f, q, ts, KeyValue.Type.Maximum);
327  }
328
329  /**
330   * Create a KeyValue for the specified row, family and qualifier that would be smaller than all
331   * other possible KeyValues that have the same row, family, qualifier. Used for seeking.
332   * @param row       row key
333   * @param roffset   row offset
334   * @param rlength   row length
335   * @param family    family name
336   * @param foffset   family offset
337   * @param flength   family length
338   * @param qualifier column qualifier
339   * @param qoffset   qualifier offset
340   * @param qlength   qualifier length
341   * @return First possible key on passed Row, Family, Qualifier.
342   */
343  public static KeyValue createFirstOnRow(final byte[] row, final int roffset, final int rlength,
344    final byte[] family, final int foffset, final int flength, final byte[] qualifier,
345    final int qoffset, final int qlength) {
346    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
347      qlength, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum, null, 0, 0);
348  }
349
350  /**
351   * Create a KeyValue for the specified row, family and qualifier that would be smaller than all
352   * other possible KeyValues that have the same row, family, qualifier. Used for seeking.
353   * @param buffer    the buffer to use for the new <code>KeyValue</code> object
354   * @param row       the value key
355   * @param family    family name
356   * @param qualifier column qualifier
357   * @return First possible key on passed Row, Family, Qualifier.
358   * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
359   *                                  than the provided buffer or than
360   *                                  <code>Integer.MAX_VALUE</code>
361   */
362  public static KeyValue createFirstOnRow(byte[] buffer, final byte[] row, final byte[] family,
363    final byte[] qualifier) throws IllegalArgumentException {
364    return createFirstOnRow(buffer, 0, row, 0, row.length, family, 0, family.length, qualifier, 0,
365      qualifier.length);
366  }
367
368  /**
369   * Create a KeyValue for the specified row, family and qualifier that would be smaller than all
370   * other possible KeyValues that have the same row, family, qualifier. Used for seeking.
371   * @param buffer    the buffer to use for the new <code>KeyValue</code> object
372   * @param boffset   buffer offset
373   * @param row       the value key
374   * @param roffset   row offset
375   * @param rlength   row length
376   * @param family    family name
377   * @param foffset   family offset
378   * @param flength   family length
379   * @param qualifier column qualifier
380   * @param qoffset   qualifier offset
381   * @param qlength   qualifier length
382   * @return First possible key on passed Row, Family, Qualifier.
383   * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
384   *                                  than the provided buffer or than
385   *                                  <code>Integer.MAX_VALUE</code>
386   */
387  public static KeyValue createFirstOnRow(byte[] buffer, final int boffset, final byte[] row,
388    final int roffset, final int rlength, final byte[] family, final int foffset, final int flength,
389    final byte[] qualifier, final int qoffset, final int qlength) throws IllegalArgumentException {
390
391    long lLength = KeyValue.getKeyValueDataStructureSize(rlength, flength, qlength, 0);
392
393    if (lLength > Integer.MAX_VALUE) {
394      throw new IllegalArgumentException("KeyValue length " + lLength + " > " + Integer.MAX_VALUE);
395    }
396    int iLength = (int) lLength;
397    if (buffer.length - boffset < iLength) {
398      throw new IllegalArgumentException(
399        "Buffer size " + (buffer.length - boffset) + " < " + iLength);
400    }
401
402    int len = KeyValue.writeByteArray(buffer, boffset, row, roffset, rlength, family, foffset,
403      flength, qualifier, qoffset, qlength, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum,
404      null, 0, 0, null);
405    return new KeyValue(buffer, boffset, len);
406  }
407
408  /*************** misc **********************************/
409  /**
410   * @return <code>cell</code> if it is an object of class {@link KeyValue} else we will return a
411   *         new {@link KeyValue} instance made from <code>cell</code> Note: Even if the cell is an
412   *         object of any of the subclass of {@link KeyValue}, we will create a new
413   *         {@link KeyValue} object wrapping same buffer. This API is used only with MR based tools
414   *         which expect the type to be exactly KeyValue. That is the reason for doing this way.
415   * @deprecated without any replacement.
416   */
417  @Deprecated
418  public static KeyValue ensureKeyValue(final ExtendedCell cell) {
419    if (cell == null) return null;
420    if (cell instanceof KeyValue) {
421      if (cell.getClass().getName().equals(KeyValue.class.getName())) {
422        return (KeyValue) cell;
423      }
424      // Cell is an Object of any of the sub classes of KeyValue. Make a new KeyValue wrapping the
425      // same byte[]
426      KeyValue kv = (KeyValue) cell;
427      KeyValue newKv = new KeyValue(kv.bytes, kv.offset, kv.length);
428      newKv.setSequenceId(kv.getSequenceId());
429      return newKv;
430    }
431    return copyToNewKeyValue(cell);
432  }
433
434  @Deprecated
435  public static List<KeyValue> ensureKeyValues(List<ExtendedCell> cells) {
436    List<KeyValue> lazyList = Lists.transform(cells, new Function<ExtendedCell, KeyValue>() {
437      @Override
438      public KeyValue apply(ExtendedCell arg0) {
439        return KeyValueUtil.ensureKeyValue(arg0);
440      }
441    });
442    return new ArrayList<>(lazyList);
443  }
444
445  /**
446   * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
447   * @return Length written on stream
448   * @see #create(DataInput) for the inverse function
449   */
450  public static long write(final KeyValue kv, final DataOutput out) throws IOException {
451    // This is how the old Writables write used to serialize KVs. Need to figure
452    // way to make it
453    // work for all implementations.
454    int length = kv.getLength();
455    out.writeInt(length);
456    out.write(kv.getBuffer(), kv.getOffset(), length);
457    return (long) length + Bytes.SIZEOF_INT;
458  }
459
460  static String bytesToHex(byte[] buf, int offset, int length) {
461    String bufferContents = buf != null ? Bytes.toStringBinary(buf, offset, length) : "<null>";
462    return ", KeyValueBytesHex=" + bufferContents + ", offset=" + offset + ", length=" + length;
463  }
464
465  static void checkKeyValueBytes(byte[] buf, int offset, int length, boolean withTags) {
466    if (buf == null) {
467      String msg = "Invalid to have null byte array in KeyValue.";
468      LOG.warn(msg);
469      throw new IllegalArgumentException(msg);
470    }
471
472    int pos = offset, endOffset = offset + length;
473    // check the key
474    if (pos + Bytes.SIZEOF_INT > endOffset) {
475      String msg =
476        "Overflow when reading key length at position=" + pos + bytesToHex(buf, offset, length);
477      LOG.warn(msg);
478      throw new IllegalArgumentException(msg);
479    }
480    int keyLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
481    pos += Bytes.SIZEOF_INT;
482    if (keyLen <= 0 || pos + keyLen > endOffset) {
483      String msg =
484        "Invalid key length in KeyValue. keyLength=" + keyLen + bytesToHex(buf, offset, length);
485      LOG.warn(msg);
486      throw new IllegalArgumentException(msg);
487    }
488    // check the value
489    if (pos + Bytes.SIZEOF_INT > endOffset) {
490      String msg =
491        "Overflow when reading value length at position=" + pos + bytesToHex(buf, offset, length);
492      LOG.warn(msg);
493      throw new IllegalArgumentException(msg);
494    }
495    int valLen = Bytes.toInt(buf, pos, Bytes.SIZEOF_INT);
496    pos += Bytes.SIZEOF_INT;
497    if (valLen < 0 || pos + valLen > endOffset) {
498      String msg =
499        "Invalid value length in KeyValue, valueLength=" + valLen + bytesToHex(buf, offset, length);
500      LOG.warn(msg);
501      throw new IllegalArgumentException(msg);
502    }
503    // check the row
504    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
505      String msg =
506        "Overflow when reading row length at position=" + pos + bytesToHex(buf, offset, length);
507      LOG.warn(msg);
508      throw new IllegalArgumentException(msg);
509    }
510    short rowLen = Bytes.toShort(buf, pos, Bytes.SIZEOF_SHORT);
511    pos += Bytes.SIZEOF_SHORT;
512    if (rowLen < 0 || pos + rowLen > endOffset) {
513      String msg =
514        "Invalid row length in KeyValue, rowLength=" + rowLen + bytesToHex(buf, offset, length);
515      LOG.warn(msg);
516      throw new IllegalArgumentException(msg);
517    }
518    pos += rowLen;
519    // check the family
520    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
521      String msg =
522        "Overflow when reading family length at position=" + pos + bytesToHex(buf, offset, length);
523      LOG.warn(msg);
524      throw new IllegalArgumentException(msg);
525    }
526    int familyLen = buf[pos];
527    pos += Bytes.SIZEOF_BYTE;
528    if (familyLen < 0 || pos + familyLen > endOffset) {
529      String msg = "Invalid family length in KeyValue, familyLength=" + familyLen
530        + bytesToHex(buf, offset, length);
531      LOG.warn(msg);
532      throw new IllegalArgumentException(msg);
533    }
534    pos += familyLen;
535    // check the qualifier
536    int qualifierLen = keyLen - Bytes.SIZEOF_SHORT - rowLen - Bytes.SIZEOF_BYTE - familyLen
537      - Bytes.SIZEOF_LONG - Bytes.SIZEOF_BYTE;
538    if (qualifierLen < 0 || pos + qualifierLen > endOffset) {
539      String msg = "Invalid qualifier length in KeyValue, qualifierLen=" + qualifierLen
540        + bytesToHex(buf, offset, length);
541      LOG.warn(msg);
542      throw new IllegalArgumentException(msg);
543    }
544    pos += qualifierLen;
545    // check the timestamp
546    if (pos + Bytes.SIZEOF_LONG > endOffset) {
547      String msg =
548        "Overflow when reading timestamp at position=" + pos + bytesToHex(buf, offset, length);
549      LOG.warn(msg);
550      throw new IllegalArgumentException(msg);
551    }
552    long timestamp = Bytes.toLong(buf, pos, Bytes.SIZEOF_LONG);
553    if (timestamp < 0) {
554      String msg =
555        "Timestamp cannot be negative, ts=" + timestamp + bytesToHex(buf, offset, length);
556      LOG.warn(msg);
557      throw new IllegalArgumentException(msg);
558    }
559    pos += Bytes.SIZEOF_LONG;
560    // check the type
561    if (pos + Bytes.SIZEOF_BYTE > endOffset) {
562      String msg =
563        "Overflow when reading type at position=" + pos + bytesToHex(buf, offset, length);
564      LOG.warn(msg);
565      throw new IllegalArgumentException(msg);
566    }
567    byte type = buf[pos];
568    if (!KeyValue.Type.isValidType(type)) {
569      String msg = "Invalid type in KeyValue, type=" + type + bytesToHex(buf, offset, length);
570      LOG.warn(msg);
571      throw new IllegalArgumentException(msg);
572    }
573    pos += Bytes.SIZEOF_BYTE;
574    // check the value
575    if (pos + valLen > endOffset) {
576      String msg =
577        "Overflow when reading value part at position=" + pos + bytesToHex(buf, offset, length);
578      LOG.warn(msg);
579      throw new IllegalArgumentException(msg);
580    }
581    pos += valLen;
582    // check the tags
583    if (withTags) {
584      if (pos == endOffset) {
585        // withTags is true but no tag in the cell.
586        return;
587      }
588      pos = checkKeyValueTagBytes(buf, offset, length, pos, endOffset);
589    }
590    if (pos != endOffset) {
591      String msg = "Some redundant bytes in KeyValue's buffer, startOffset=" + pos + ", endOffset="
592        + endOffset + bytesToHex(buf, offset, length);
593      LOG.warn(msg);
594      throw new IllegalArgumentException(msg);
595    }
596  }
597
598  private static int checkKeyValueTagBytes(byte[] buf, int offset, int length, int pos,
599    int endOffset) {
600    if (pos + Bytes.SIZEOF_SHORT > endOffset) {
601      String msg =
602        "Overflow when reading tags length at position=" + pos + bytesToHex(buf, offset, length);
603      LOG.warn(msg);
604      throw new IllegalArgumentException(msg);
605    }
606    short tagsLen = Bytes.toShort(buf, pos);
607    pos += Bytes.SIZEOF_SHORT;
608    if (tagsLen < 0 || pos + tagsLen > endOffset) {
609      String msg = "Invalid tags length in KeyValue at position=" + (pos - Bytes.SIZEOF_SHORT)
610        + bytesToHex(buf, offset, length);
611      LOG.warn(msg);
612      throw new IllegalArgumentException(msg);
613    }
614    int tagsEndOffset = pos + tagsLen;
615    for (; pos < tagsEndOffset;) {
616      if (pos + Tag.TAG_LENGTH_SIZE > endOffset) {
617        String msg =
618          "Overflow when reading tag length at position=" + pos + bytesToHex(buf, offset, length);
619        LOG.warn(msg);
620        throw new IllegalArgumentException(msg);
621      }
622      short tagLen = Bytes.toShort(buf, pos);
623      pos += Tag.TAG_LENGTH_SIZE;
624      // tagLen contains one byte tag type, so must be not less than 1.
625      if (tagLen < 1 || pos + tagLen > endOffset) {
626        String msg = "Invalid tag length at position=" + (pos - Tag.TAG_LENGTH_SIZE)
627          + ", tagLength=" + tagLen + bytesToHex(buf, offset, length);
628        LOG.warn(msg);
629        throw new IllegalArgumentException(msg);
630      }
631      pos += tagLen;
632    }
633    return pos;
634  }
635
636  /**
637   * Create a KeyValue reading from the raw InputStream. Named
638   * <code>createKeyValueFromInputStream</code> so doesn't clash with {@link #create(DataInput)}
639   * @param in       inputStream to read.
640   * @param withTags whether the keyvalue should include tags are not
641   * @return Created KeyValue OR if we find a length of zero, we will return null which can be
642   *         useful marking a stream as done.
643   */
644  public static KeyValue createKeyValueFromInputStream(InputStream in, boolean withTags)
645    throws IOException {
646    byte[] intBytes = new byte[Bytes.SIZEOF_INT];
647    int bytesRead = 0;
648    while (bytesRead < intBytes.length) {
649      int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
650      if (n < 0) {
651        if (bytesRead == 0) {
652          throw new EOFException();
653        }
654        throw new IOException("Failed read of int, read " + bytesRead + " bytes");
655      }
656      bytesRead += n;
657    }
658    byte[] bytes = new byte[Bytes.toInt(intBytes)];
659    IOUtils.readFully(in, bytes, 0, bytes.length);
660    return withTags
661      ? new KeyValue(bytes, 0, bytes.length)
662      : new NoTagsKeyValue(bytes, 0, bytes.length);
663  }
664
665  /**
666   * Returns a KeyValue made of a byte array that holds the key-only part. Needed to convert hfile
667   * index members to KeyValues.
668   */
669  public static KeyValue createKeyValueFromKey(final byte[] b) {
670    return createKeyValueFromKey(b, 0, b.length);
671  }
672
673  /**
674   * Return a KeyValue made of a byte buffer that holds the key-only part. Needed to convert hfile
675   * index members to KeyValues.
676   */
677  public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
678    return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
679  }
680
681  /**
682   * Return a KeyValue made of a byte array that holds the key-only part. Needed to convert hfile
683   * index members to KeyValues.
684   */
685  public static KeyValue createKeyValueFromKey(final byte[] b, final int o, final int l) {
686    byte[] newb = new byte[l + KeyValue.ROW_OFFSET];
687    System.arraycopy(b, o, newb, KeyValue.ROW_OFFSET, l);
688    Bytes.putInt(newb, 0, l);
689    Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
690    return new KeyValue(newb);
691  }
692
693  /**
694   * Where to read bytes from. Creates a byte array to hold the KeyValue backing bytes copied from
695   * the steam.
696   * @return KeyValue created by deserializing from <code>in</code> OR if we find a length of zero,
697   *         we will return null which can be useful marking a stream as done.
698   */
699  public static KeyValue create(final DataInput in) throws IOException {
700    return create(in.readInt(), in);
701  }
702
703  /**
704   * Create a KeyValue reading <code>length</code> from <code>in</code>
705   * @return Created KeyValue OR if we find a length of zero, we will return null which can be
706   *         useful marking a stream as done.
707   */
708  public static KeyValue create(int length, final DataInput in) throws IOException {
709    if (length <= 0) {
710      if (length == 0) return null;
711      throw new IOException("Failed read " + length + " bytes, stream corrupt?");
712    }
713
714    // This is how the old Writables.readFrom used to deserialize. Didn't even
715    // vint.
716    byte[] bytes = new byte[length];
717    in.readFully(bytes);
718    return new KeyValue(bytes, 0, length);
719  }
720}