001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.Random; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.stream.Stream; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ArrayBackedTag; 036import org.apache.hadoop.hbase.ByteBufferKeyValue; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellComparatorImpl; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.ExtendedCell; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.KeyValue.Type; 047import org.apache.hadoop.hbase.PrivateCellUtil; 048import org.apache.hadoop.hbase.Tag; 049import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 052import org.apache.hadoop.hbase.io.hfile.HFileContext; 053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 054import org.apache.hadoop.hbase.nio.SingleByteBuff; 055import org.apache.hadoop.hbase.testclassification.IOTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.RedundantKVGenerator; 059import org.junit.jupiter.api.TestTemplate; 060import org.junit.jupiter.params.provider.Arguments; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * Test all of the data block encoding algorithms for correctness. Most of the class generate data 066 * which will test different branches in code. 067 */ 068@org.junit.jupiter.api.Tag(IOTests.TAG) 069@org.junit.jupiter.api.Tag(LargeTests.TAG) 070@HBaseParameterizedTestTemplate( 071 name = "{index}: includesMemstoreTS={0}, includesTags={1}, useOffheapData={2}") 072public class TestDataBlockEncoders { 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class); 075 076 private static int NUMBER_OF_KV = 10000; 077 private static int NUM_RANDOM_SEEKS = 1000; 078 079 private static int ENCODED_DATA_OFFSET = 080 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE; 081 static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 082 083 private final Configuration conf = HBaseConfiguration.create(); 084 private final RedundantKVGenerator generator = new RedundantKVGenerator(); 085 private final boolean includesMemstoreTS; 086 private final boolean includesTags; 087 private final boolean useOffheapData; 088 089 public static Stream<Arguments> parameters() { 090 return HBaseTestingUtil.memStoreTSTagsAndOffheapCombination().stream().map(Arguments::of); 091 } 092 093 public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag, 094 boolean useOffheapData) { 095 this.includesMemstoreTS = includesMemstoreTS; 096 this.includesTags = includesTag; 097 this.useOffheapData = useOffheapData; 098 } 099 100 private HFileBlockEncodingContext getEncodingContext(Configuration conf, 101 Compression.Algorithm algo, DataBlockEncoding encoding) { 102 DataBlockEncoder encoder = encoding.getEncoder(); 103 HFileContext meta = 104 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 105 .withIncludesTags(includesTags).withCompression(algo).build(); 106 if (encoder != null) { 107 return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta); 108 } else { 109 return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta); 110 } 111 } 112 113 /** 114 * Test data block encoding of empty KeyValue. On test failure. 115 */ 116 @TestTemplate 117 public void testEmptyKeyValues() throws IOException { 118 List<KeyValue> kvList = new ArrayList<>(); 119 byte[] row = new byte[0]; 120 byte[] family = new byte[0]; 121 byte[] qualifier = new byte[0]; 122 byte[] value = new byte[0]; 123 if (!includesTags) { 124 kvList.add(new KeyValue(row, family, qualifier, 0L, value)); 125 kvList.add(new KeyValue(row, family, qualifier, 0L, value)); 126 } else { 127 byte[] metaValue1 = Bytes.toBytes("metaValue1"); 128 byte[] metaValue2 = Bytes.toBytes("metaValue2"); 129 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 130 new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) })); 131 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 132 new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) })); 133 } 134 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 135 } 136 137 /** 138 * Test KeyValues with negative timestamp. On test failure. 139 */ 140 @TestTemplate 141 public void testNegativeTimestamps() throws IOException { 142 List<KeyValue> kvList = new ArrayList<>(); 143 byte[] row = new byte[0]; 144 byte[] family = new byte[0]; 145 byte[] qualifier = new byte[0]; 146 byte[] value = new byte[0]; 147 if (includesTags) { 148 byte[] metaValue1 = Bytes.toBytes("metaValue1"); 149 byte[] metaValue2 = Bytes.toBytes("metaValue2"); 150 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 151 new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) })); 152 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 153 new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) })); 154 } else { 155 kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value)); 156 kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value)); 157 } 158 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 159 } 160 161 /** 162 * Test whether compression -> decompression gives the consistent results on pseudorandom sample. 163 * @throws IOException On test failure. 164 */ 165 @TestTemplate 166 public void testExecutionOnSample() throws IOException { 167 List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 168 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 169 } 170 171 /** 172 * Test seeking while file is encoded. 173 */ 174 @TestTemplate 175 public void testSeekingOnSample() throws IOException { 176 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 177 178 // create all seekers 179 List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>(); 180 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 181 LOG.info("Encoding: " + encoding); 182 DataBlockEncoder encoder = encoding.getEncoder(); 183 if (encoder == null) { 184 continue; 185 } 186 LOG.info("Encoder: " + encoder); 187 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 188 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 189 HFileContext meta = 190 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 191 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 192 DataBlockEncoder.EncodedSeeker seeker = 193 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 194 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 195 encodedSeekers.add(seeker); 196 } 197 LOG.info("Testing it!"); 198 // test it! 199 // try a few random seeks 200 Random rand = ThreadLocalRandom.current(); 201 for (boolean seekBefore : new boolean[] { false, true }) { 202 for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) { 203 int keyValueId; 204 if (!seekBefore) { 205 keyValueId = rand.nextInt(sampleKv.size()); 206 } else { 207 keyValueId = rand.nextInt(sampleKv.size() - 1) + 1; 208 } 209 210 KeyValue keyValue = sampleKv.get(keyValueId); 211 checkSeekingConsistency(encodedSeekers, seekBefore, keyValue); 212 } 213 } 214 215 // check edge cases 216 LOG.info("Checking edge cases"); 217 checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0)); 218 for (boolean seekBefore : new boolean[] { false, true }) { 219 checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1)); 220 KeyValue midKv = sampleKv.get(sampleKv.size() / 2); 221 ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv); 222 checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv); 223 } 224 LOG.info("Done"); 225 } 226 227 @TestTemplate 228 public void testSeekingToOffHeapKeyValueInSample() throws IOException { 229 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 230 231 // create all seekers 232 List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>(); 233 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 234 LOG.info("Encoding: " + encoding); 235 DataBlockEncoder encoder = encoding.getEncoder(); 236 if (encoder == null) { 237 continue; 238 } 239 LOG.info("Encoder: " + encoder); 240 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 241 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 242 HFileContext meta = 243 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 244 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 245 DataBlockEncoder.EncodedSeeker seeker = 246 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 247 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 248 encodedSeekers.add(seeker); 249 } 250 LOG.info("Testing it!"); 251 // test it! 252 // try a few random seeks 253 Random rand = ThreadLocalRandom.current(); 254 for (boolean seekBefore : new boolean[] { false, true }) { 255 for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) { 256 int keyValueId; 257 if (!seekBefore) { 258 keyValueId = rand.nextInt(sampleKv.size()); 259 } else { 260 keyValueId = rand.nextInt(sampleKv.size() - 1) + 1; 261 } 262 263 KeyValue keyValue = sampleKv.get(keyValueId); 264 checkSeekingConsistency(encodedSeekers, seekBefore, buildOffHeapKeyValue(keyValue)); 265 } 266 } 267 268 // check edge cases 269 LOG.info("Checking edge cases"); 270 checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0)); 271 for (boolean seekBefore : new boolean[] { false, true }) { 272 checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1)); 273 KeyValue midKv = sampleKv.get(sampleKv.size() / 2); 274 ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv); 275 checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv); 276 } 277 LOG.info("Done"); 278 } 279 280 static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs, 281 HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException { 282 DataBlockEncoder encoder = encoding.getEncoder(); 283 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 284 baos.write(HFILEBLOCK_DUMMY_HEADER); 285 DataOutputStream dos = new DataOutputStream(baos); 286 encoder.startBlockEncoding(encodingContext, dos); 287 for (KeyValue kv : kvs) { 288 encoder.encode(kv, encodingContext, dos); 289 } 290 encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); 291 byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; 292 System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); 293 if (useOffheapData) { 294 ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length); 295 bb.put(encodedData); 296 bb.rewind(); 297 return bb; 298 } 299 return ByteBuffer.wrap(encodedData); 300 } 301 302 @TestTemplate 303 public void testNextOnSample() throws IOException { 304 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 305 306 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 307 if (encoding.getEncoder() == null) { 308 continue; 309 } 310 DataBlockEncoder encoder = encoding.getEncoder(); 311 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 312 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 313 HFileContext meta = 314 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 315 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 316 DataBlockEncoder.EncodedSeeker seeker = 317 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 318 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 319 int i = 0; 320 do { 321 KeyValue expectedKeyValue = sampleKv.get(i); 322 ExtendedCell cell = seeker.getCell(); 323 if ( 324 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue, 325 cell) != 0 326 ) { 327 int commonPrefix = 328 PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true); 329 fail(String.format( 330 "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d" 331 + "\n expected %s\n actual %s", 332 encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(), 333 expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()), 334 CellUtil.toString(cell, false))); 335 } 336 i++; 337 } while (seeker.next()); 338 } 339 } 340 341 /** 342 * Test whether the decompression of first key is implemented correctly. 343 */ 344 @TestTemplate 345 public void testFirstKeyInBlockOnSample() throws IOException { 346 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 347 348 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 349 if (encoding.getEncoder() == null) { 350 continue; 351 } 352 DataBlockEncoder encoder = encoding.getEncoder(); 353 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 354 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 355 ExtendedCell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer)); 356 KeyValue firstKv = sampleKv.get(0); 357 if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) { 358 int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true); 359 fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix)); 360 } 361 } 362 } 363 364 @TestTemplate 365 public void testRowIndexWithTagsButNoTagsInCell() throws IOException { 366 List<KeyValue> kvList = new ArrayList<>(); 367 byte[] row = new byte[0]; 368 byte[] family = new byte[0]; 369 byte[] qualifier = new byte[0]; 370 byte[] value = new byte[0]; 371 KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value); 372 kvList.add(expectedKV); 373 DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1; 374 DataBlockEncoder encoder = encoding.getEncoder(); 375 ByteBuffer encodedBuffer = 376 encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false); 377 HFileContext meta = 378 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 379 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 380 DataBlockEncoder.EncodedSeeker seeker = 381 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 382 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 383 Cell cell = seeker.getCell(); 384 assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength()); 385 } 386 387 private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers, 388 boolean seekBefore, ExtendedCell keyValue) { 389 ExtendedCell expectedKeyValue = null; 390 ByteBuffer expectedKey = null; 391 ByteBuffer expectedValue = null; 392 for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) { 393 seeker.seekToKeyInBlock(keyValue, seekBefore); 394 seeker.rewind(); 395 396 ExtendedCell actualKeyValue = seeker.getCell(); 397 ByteBuffer actualKey = null; 398 actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey()); 399 ByteBuffer actualValue = seeker.getValueShallowCopy(); 400 401 if (expectedKeyValue != null) { 402 assertTrue(PrivateCellUtil.equals(expectedKeyValue, actualKeyValue)); 403 } else { 404 expectedKeyValue = actualKeyValue; 405 } 406 407 if (expectedKey != null) { 408 assertEquals(expectedKey, actualKey); 409 } else { 410 expectedKey = actualKey; 411 } 412 413 if (expectedValue != null) { 414 assertEquals(expectedValue, actualValue); 415 } else { 416 expectedValue = actualValue; 417 } 418 } 419 } 420 421 private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS, 422 boolean includesTags) throws IOException { 423 ByteBuffer unencodedDataBuf = 424 RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS); 425 HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS) 426 .withIncludesTags(includesTags).build(); 427 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 428 DataBlockEncoder encoder = encoding.getEncoder(); 429 if (encoder == null) { 430 continue; 431 } 432 HFileBlockEncodingContext encodingContext = 433 new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext); 434 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 435 baos.write(HFILEBLOCK_DUMMY_HEADER); 436 DataOutputStream dos = new DataOutputStream(baos); 437 encoder.startBlockEncoding(encodingContext, dos); 438 for (KeyValue kv : kvList) { 439 encoder.encode(kv, encodingContext, dos); 440 } 441 encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); 442 byte[] encodedData = baos.toByteArray(); 443 444 testAlgorithm(encodedData, unencodedDataBuf, encoder); 445 } 446 } 447 448 @TestTemplate 449 public void testZeroByte() throws IOException { 450 List<KeyValue> kvList = new ArrayList<>(); 451 byte[] row = Bytes.toBytes("abcd"); 452 byte[] family = new byte[] { 'f' }; 453 byte[] qualifier0 = new byte[] { 'b' }; 454 byte[] qualifier1 = new byte[] { 'c' }; 455 byte[] value0 = new byte[] { 'd' }; 456 byte[] value1 = new byte[] { 0x00 }; 457 if (includesTags) { 458 kvList.add(new KeyValue(row, family, qualifier0, 0, value0, 459 new Tag[] { new ArrayBackedTag((byte) 1, "value1") })); 460 kvList.add(new KeyValue(row, family, qualifier1, 0, value1, 461 new Tag[] { new ArrayBackedTag((byte) 1, "value1") })); 462 } else { 463 kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0)); 464 kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1)); 465 } 466 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 467 } 468 469 private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf, 470 DataBlockEncoder encoder) throws IOException { 471 // decode 472 ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET, 473 encodedData.length - ENCODED_DATA_OFFSET); 474 DataInputStream dis = new DataInputStream(bais); 475 ByteBuffer actualDataset; 476 HFileContext meta = 477 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 478 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 479 actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta)); 480 actualDataset.rewind(); 481 482 // this is because in case of prefix tree the decoded stream will not have 483 // the 484 // mvcc in it. 485 assertEquals(Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset), 486 "Encoding -> decoding gives different results for " + encoder); 487 } 488 489 private static ByteBufferKeyValue buildOffHeapKeyValue(KeyValue keyValue) throws IOException { 490 ByteArrayOutputStream out = new ByteArrayOutputStream(); 491 keyValue.write(out, false); 492 byte[] bytes = out.toByteArray(); 493 ByteBuffer bb = ByteBuffer.allocateDirect(bytes.length); 494 bb.put(bytes); 495 bb.flip(); 496 497 return new ByteBufferKeyValue(bb, 0, bytes.length); 498 } 499}