001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.Random;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.stream.Stream;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.ByteBufferKeyValue;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.KeyValue.Type;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.Tag;
049import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
052import org.apache.hadoop.hbase.io.hfile.HFileContext;
053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
054import org.apache.hadoop.hbase.nio.SingleByteBuff;
055import org.apache.hadoop.hbase.testclassification.IOTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.RedundantKVGenerator;
059import org.junit.jupiter.api.TestTemplate;
060import org.junit.jupiter.params.provider.Arguments;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * Test all of the data block encoding algorithms for correctness. Most of the class generate data
066 * which will test different branches in code.
067 */
068@org.junit.jupiter.api.Tag(IOTests.TAG)
069@org.junit.jupiter.api.Tag(LargeTests.TAG)
070@HBaseParameterizedTestTemplate(
071    name = "{index}: includesMemstoreTS={0}, includesTags={1}, useOffheapData={2}")
072public class TestDataBlockEncoders {
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class);
075
076  private static int NUMBER_OF_KV = 10000;
077  private static int NUM_RANDOM_SEEKS = 1000;
078
079  private static int ENCODED_DATA_OFFSET =
080    HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE;
081  static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
082
083  private final Configuration conf = HBaseConfiguration.create();
084  private final RedundantKVGenerator generator = new RedundantKVGenerator();
085  private final boolean includesMemstoreTS;
086  private final boolean includesTags;
087  private final boolean useOffheapData;
088
089  public static Stream<Arguments> parameters() {
090    return HBaseTestingUtil.memStoreTSTagsAndOffheapCombination().stream().map(Arguments::of);
091  }
092
093  public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag,
094    boolean useOffheapData) {
095    this.includesMemstoreTS = includesMemstoreTS;
096    this.includesTags = includesTag;
097    this.useOffheapData = useOffheapData;
098  }
099
100  private HFileBlockEncodingContext getEncodingContext(Configuration conf,
101    Compression.Algorithm algo, DataBlockEncoding encoding) {
102    DataBlockEncoder encoder = encoding.getEncoder();
103    HFileContext meta =
104      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
105        .withIncludesTags(includesTags).withCompression(algo).build();
106    if (encoder != null) {
107      return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
108    } else {
109      return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
110    }
111  }
112
113  /**
114   * Test data block encoding of empty KeyValue. On test failure.
115   */
116  @TestTemplate
117  public void testEmptyKeyValues() throws IOException {
118    List<KeyValue> kvList = new ArrayList<>();
119    byte[] row = new byte[0];
120    byte[] family = new byte[0];
121    byte[] qualifier = new byte[0];
122    byte[] value = new byte[0];
123    if (!includesTags) {
124      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
125      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
126    } else {
127      byte[] metaValue1 = Bytes.toBytes("metaValue1");
128      byte[] metaValue2 = Bytes.toBytes("metaValue2");
129      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
130        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
131      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
132        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
133    }
134    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
135  }
136
137  /**
138   * Test KeyValues with negative timestamp. On test failure.
139   */
140  @TestTemplate
141  public void testNegativeTimestamps() throws IOException {
142    List<KeyValue> kvList = new ArrayList<>();
143    byte[] row = new byte[0];
144    byte[] family = new byte[0];
145    byte[] qualifier = new byte[0];
146    byte[] value = new byte[0];
147    if (includesTags) {
148      byte[] metaValue1 = Bytes.toBytes("metaValue1");
149      byte[] metaValue2 = Bytes.toBytes("metaValue2");
150      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
151        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
152      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
153        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
154    } else {
155      kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value));
156      kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value));
157    }
158    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
159  }
160
161  /**
162   * Test whether compression -> decompression gives the consistent results on pseudorandom sample.
163   * @throws IOException On test failure.
164   */
165  @TestTemplate
166  public void testExecutionOnSample() throws IOException {
167    List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
168    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
169  }
170
171  /**
172   * Test seeking while file is encoded.
173   */
174  @TestTemplate
175  public void testSeekingOnSample() throws IOException {
176    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
177
178    // create all seekers
179    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
180    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
181      LOG.info("Encoding: " + encoding);
182      DataBlockEncoder encoder = encoding.getEncoder();
183      if (encoder == null) {
184        continue;
185      }
186      LOG.info("Encoder: " + encoder);
187      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
188        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
189      HFileContext meta =
190        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
191          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
192      DataBlockEncoder.EncodedSeeker seeker =
193        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
194      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
195      encodedSeekers.add(seeker);
196    }
197    LOG.info("Testing it!");
198    // test it!
199    // try a few random seeks
200    Random rand = ThreadLocalRandom.current();
201    for (boolean seekBefore : new boolean[] { false, true }) {
202      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
203        int keyValueId;
204        if (!seekBefore) {
205          keyValueId = rand.nextInt(sampleKv.size());
206        } else {
207          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
208        }
209
210        KeyValue keyValue = sampleKv.get(keyValueId);
211        checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
212      }
213    }
214
215    // check edge cases
216    LOG.info("Checking edge cases");
217    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
218    for (boolean seekBefore : new boolean[] { false, true }) {
219      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
220      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
221      ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
222      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
223    }
224    LOG.info("Done");
225  }
226
227  @TestTemplate
228  public void testSeekingToOffHeapKeyValueInSample() throws IOException {
229    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
230
231    // create all seekers
232    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
233    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
234      LOG.info("Encoding: " + encoding);
235      DataBlockEncoder encoder = encoding.getEncoder();
236      if (encoder == null) {
237        continue;
238      }
239      LOG.info("Encoder: " + encoder);
240      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
241        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
242      HFileContext meta =
243        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
244          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
245      DataBlockEncoder.EncodedSeeker seeker =
246        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
247      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
248      encodedSeekers.add(seeker);
249    }
250    LOG.info("Testing it!");
251    // test it!
252    // try a few random seeks
253    Random rand = ThreadLocalRandom.current();
254    for (boolean seekBefore : new boolean[] { false, true }) {
255      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
256        int keyValueId;
257        if (!seekBefore) {
258          keyValueId = rand.nextInt(sampleKv.size());
259        } else {
260          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
261        }
262
263        KeyValue keyValue = sampleKv.get(keyValueId);
264        checkSeekingConsistency(encodedSeekers, seekBefore, buildOffHeapKeyValue(keyValue));
265      }
266    }
267
268    // check edge cases
269    LOG.info("Checking edge cases");
270    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
271    for (boolean seekBefore : new boolean[] { false, true }) {
272      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
273      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
274      ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
275      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
276    }
277    LOG.info("Done");
278  }
279
280  static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
281    HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
282    DataBlockEncoder encoder = encoding.getEncoder();
283    ByteArrayOutputStream baos = new ByteArrayOutputStream();
284    baos.write(HFILEBLOCK_DUMMY_HEADER);
285    DataOutputStream dos = new DataOutputStream(baos);
286    encoder.startBlockEncoding(encodingContext, dos);
287    for (KeyValue kv : kvs) {
288      encoder.encode(kv, encodingContext, dos);
289    }
290    encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
291    byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
292    System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
293    if (useOffheapData) {
294      ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length);
295      bb.put(encodedData);
296      bb.rewind();
297      return bb;
298    }
299    return ByteBuffer.wrap(encodedData);
300  }
301
302  @TestTemplate
303  public void testNextOnSample() throws IOException {
304    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
305
306    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
307      if (encoding.getEncoder() == null) {
308        continue;
309      }
310      DataBlockEncoder encoder = encoding.getEncoder();
311      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
312        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
313      HFileContext meta =
314        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
315          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
316      DataBlockEncoder.EncodedSeeker seeker =
317        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
318      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
319      int i = 0;
320      do {
321        KeyValue expectedKeyValue = sampleKv.get(i);
322        ExtendedCell cell = seeker.getCell();
323        if (
324          PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue,
325            cell) != 0
326        ) {
327          int commonPrefix =
328            PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true);
329          fail(String.format(
330            "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d"
331              + "\n expected %s\n actual      %s",
332            encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(),
333              expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()),
334            CellUtil.toString(cell, false)));
335        }
336        i++;
337      } while (seeker.next());
338    }
339  }
340
341  /**
342   * Test whether the decompression of first key is implemented correctly.
343   */
344  @TestTemplate
345  public void testFirstKeyInBlockOnSample() throws IOException {
346    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
347
348    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
349      if (encoding.getEncoder() == null) {
350        continue;
351      }
352      DataBlockEncoder encoder = encoding.getEncoder();
353      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
354        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
355      ExtendedCell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer));
356      KeyValue firstKv = sampleKv.get(0);
357      if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) {
358        int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true);
359        fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix));
360      }
361    }
362  }
363
364  @TestTemplate
365  public void testRowIndexWithTagsButNoTagsInCell() throws IOException {
366    List<KeyValue> kvList = new ArrayList<>();
367    byte[] row = new byte[0];
368    byte[] family = new byte[0];
369    byte[] qualifier = new byte[0];
370    byte[] value = new byte[0];
371    KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value);
372    kvList.add(expectedKV);
373    DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1;
374    DataBlockEncoder encoder = encoding.getEncoder();
375    ByteBuffer encodedBuffer =
376      encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false);
377    HFileContext meta =
378      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
379        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
380    DataBlockEncoder.EncodedSeeker seeker =
381      encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
382    seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
383    Cell cell = seeker.getCell();
384    assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength());
385  }
386
387  private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers,
388    boolean seekBefore, ExtendedCell keyValue) {
389    ExtendedCell expectedKeyValue = null;
390    ByteBuffer expectedKey = null;
391    ByteBuffer expectedValue = null;
392    for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
393      seeker.seekToKeyInBlock(keyValue, seekBefore);
394      seeker.rewind();
395
396      ExtendedCell actualKeyValue = seeker.getCell();
397      ByteBuffer actualKey = null;
398      actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey());
399      ByteBuffer actualValue = seeker.getValueShallowCopy();
400
401      if (expectedKeyValue != null) {
402        assertTrue(PrivateCellUtil.equals(expectedKeyValue, actualKeyValue));
403      } else {
404        expectedKeyValue = actualKeyValue;
405      }
406
407      if (expectedKey != null) {
408        assertEquals(expectedKey, actualKey);
409      } else {
410        expectedKey = actualKey;
411      }
412
413      if (expectedValue != null) {
414        assertEquals(expectedValue, actualValue);
415      } else {
416        expectedValue = actualValue;
417      }
418    }
419  }
420
421  private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
422    boolean includesTags) throws IOException {
423    ByteBuffer unencodedDataBuf =
424      RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS);
425    HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
426      .withIncludesTags(includesTags).build();
427    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
428      DataBlockEncoder encoder = encoding.getEncoder();
429      if (encoder == null) {
430        continue;
431      }
432      HFileBlockEncodingContext encodingContext =
433        new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext);
434      ByteArrayOutputStream baos = new ByteArrayOutputStream();
435      baos.write(HFILEBLOCK_DUMMY_HEADER);
436      DataOutputStream dos = new DataOutputStream(baos);
437      encoder.startBlockEncoding(encodingContext, dos);
438      for (KeyValue kv : kvList) {
439        encoder.encode(kv, encodingContext, dos);
440      }
441      encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
442      byte[] encodedData = baos.toByteArray();
443
444      testAlgorithm(encodedData, unencodedDataBuf, encoder);
445    }
446  }
447
448  @TestTemplate
449  public void testZeroByte() throws IOException {
450    List<KeyValue> kvList = new ArrayList<>();
451    byte[] row = Bytes.toBytes("abcd");
452    byte[] family = new byte[] { 'f' };
453    byte[] qualifier0 = new byte[] { 'b' };
454    byte[] qualifier1 = new byte[] { 'c' };
455    byte[] value0 = new byte[] { 'd' };
456    byte[] value1 = new byte[] { 0x00 };
457    if (includesTags) {
458      kvList.add(new KeyValue(row, family, qualifier0, 0, value0,
459        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
460      kvList.add(new KeyValue(row, family, qualifier1, 0, value1,
461        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
462    } else {
463      kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
464      kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
465    }
466    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
467  }
468
469  private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
470    DataBlockEncoder encoder) throws IOException {
471    // decode
472    ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
473      encodedData.length - ENCODED_DATA_OFFSET);
474    DataInputStream dis = new DataInputStream(bais);
475    ByteBuffer actualDataset;
476    HFileContext meta =
477      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
478        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
479    actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta));
480    actualDataset.rewind();
481
482    // this is because in case of prefix tree the decoded stream will not have
483    // the
484    // mvcc in it.
485    assertEquals(Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset),
486      "Encoding -> decoding gives different results for " + encoder);
487  }
488
489  private static ByteBufferKeyValue buildOffHeapKeyValue(KeyValue keyValue) throws IOException {
490    ByteArrayOutputStream out = new ByteArrayOutputStream();
491    keyValue.write(out, false);
492    byte[] bytes = out.toByteArray();
493    ByteBuffer bb = ByteBuffer.allocateDirect(bytes.length);
494    bb.put(bytes);
495    bb.flip();
496
497    return new ByteBufferKeyValue(bb, 0, bytes.length);
498  }
499}