001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.List;
032import java.util.Random;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseConfiguration;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.KeyValue;
044import org.apache.hadoop.hbase.KeyValue.Type;
045import org.apache.hadoop.hbase.PrivateCellUtil;
046import org.apache.hadoop.hbase.Tag;
047import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
048import org.apache.hadoop.hbase.io.compress.Compression;
049import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
050import org.apache.hadoop.hbase.io.hfile.HFileContext;
051import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
052import org.apache.hadoop.hbase.nio.SingleByteBuff;
053import org.apache.hadoop.hbase.testclassification.IOTests;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.RedundantKVGenerator;
057import org.junit.Assert;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameters;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * Test all of the data block encoding algorithms for correctness. Most of the class generate data
069 * which will test different branches in code.
070 */
071@Category({ IOTests.class, LargeTests.class })
072@RunWith(Parameterized.class)
073public class TestDataBlockEncoders {
074
075  @ClassRule
076  public static final HBaseClassTestRule CLASS_RULE =
077    HBaseClassTestRule.forClass(TestDataBlockEncoders.class);
078
079  private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class);
080
081  private static int NUMBER_OF_KV = 10000;
082  private static int NUM_RANDOM_SEEKS = 1000;
083
084  private static int ENCODED_DATA_OFFSET =
085    HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE;
086  static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
087
088  private final Configuration conf = HBaseConfiguration.create();
089  private final RedundantKVGenerator generator = new RedundantKVGenerator();
090  private final boolean includesMemstoreTS;
091  private final boolean includesTags;
092  private final boolean useOffheapData;
093
094  @Parameters
095  public static Collection<Object[]> parameters() {
096    return HBaseTestingUtility.memStoreTSTagsAndOffheapCombination();
097  }
098
099  public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag,
100    boolean useOffheapData) {
101    this.includesMemstoreTS = includesMemstoreTS;
102    this.includesTags = includesTag;
103    this.useOffheapData = useOffheapData;
104  }
105
106  private HFileBlockEncodingContext getEncodingContext(Configuration conf,
107    Compression.Algorithm algo, DataBlockEncoding encoding) {
108    DataBlockEncoder encoder = encoding.getEncoder();
109    HFileContext meta =
110      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
111        .withIncludesTags(includesTags).withCompression(algo).build();
112    if (encoder != null) {
113      return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
114    } else {
115      return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
116    }
117  }
118
119  /**
120   * Test data block encoding of empty KeyValue. n * On test failure.
121   */
122  @Test
123  public void testEmptyKeyValues() throws IOException {
124    List<KeyValue> kvList = new ArrayList<>();
125    byte[] row = new byte[0];
126    byte[] family = new byte[0];
127    byte[] qualifier = new byte[0];
128    byte[] value = new byte[0];
129    if (!includesTags) {
130      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
131      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
132    } else {
133      byte[] metaValue1 = Bytes.toBytes("metaValue1");
134      byte[] metaValue2 = Bytes.toBytes("metaValue2");
135      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
136        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
137      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
138        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
139    }
140    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
141  }
142
143  /**
144   * Test KeyValues with negative timestamp. n * On test failure.
145   */
146  @Test
147  public void testNegativeTimestamps() throws IOException {
148    List<KeyValue> kvList = new ArrayList<>();
149    byte[] row = new byte[0];
150    byte[] family = new byte[0];
151    byte[] qualifier = new byte[0];
152    byte[] value = new byte[0];
153    if (includesTags) {
154      byte[] metaValue1 = Bytes.toBytes("metaValue1");
155      byte[] metaValue2 = Bytes.toBytes("metaValue2");
156      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
157        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
158      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
159        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
160    } else {
161      kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value));
162      kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value));
163    }
164    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
165  }
166
167  /**
168   * Test whether compression -> decompression gives the consistent results on pseudorandom sample.
169   * @throws IOException On test failure.
170   */
171  @Test
172  public void testExecutionOnSample() throws IOException {
173    List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
174    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
175  }
176
177  /**
178   * Test seeking while file is encoded.
179   */
180  @Test
181  public void testSeekingOnSample() throws IOException {
182    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
183
184    // create all seekers
185    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
186    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
187      LOG.info("Encoding: " + encoding);
188      DataBlockEncoder encoder = encoding.getEncoder();
189      if (encoder == null) {
190        continue;
191      }
192      LOG.info("Encoder: " + encoder);
193      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
194        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
195      HFileContext meta =
196        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
197          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
198      DataBlockEncoder.EncodedSeeker seeker =
199        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
200      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
201      encodedSeekers.add(seeker);
202    }
203    LOG.info("Testing it!");
204    // test it!
205    // try a few random seeks
206    Random rand = ThreadLocalRandom.current();
207    for (boolean seekBefore : new boolean[] { false, true }) {
208      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
209        int keyValueId;
210        if (!seekBefore) {
211          keyValueId = rand.nextInt(sampleKv.size());
212        } else {
213          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
214        }
215
216        KeyValue keyValue = sampleKv.get(keyValueId);
217        checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
218      }
219    }
220
221    // check edge cases
222    LOG.info("Checking edge cases");
223    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
224    for (boolean seekBefore : new boolean[] { false, true }) {
225      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
226      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
227      Cell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
228      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
229    }
230    LOG.info("Done");
231  }
232
233  static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
234    HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
235    DataBlockEncoder encoder = encoding.getEncoder();
236    ByteArrayOutputStream baos = new ByteArrayOutputStream();
237    baos.write(HFILEBLOCK_DUMMY_HEADER);
238    DataOutputStream dos = new DataOutputStream(baos);
239    encoder.startBlockEncoding(encodingContext, dos);
240    for (KeyValue kv : kvs) {
241      encoder.encode(kv, encodingContext, dos);
242    }
243    encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
244    byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
245    System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
246    if (useOffheapData) {
247      ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length);
248      bb.put(encodedData);
249      bb.rewind();
250      return bb;
251    }
252    return ByteBuffer.wrap(encodedData);
253  }
254
255  @Test
256  public void testNextOnSample() throws IOException {
257    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
258
259    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
260      if (encoding.getEncoder() == null) {
261        continue;
262      }
263      DataBlockEncoder encoder = encoding.getEncoder();
264      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
265        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
266      HFileContext meta =
267        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
268          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
269      DataBlockEncoder.EncodedSeeker seeker =
270        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
271      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
272      int i = 0;
273      do {
274        KeyValue expectedKeyValue = sampleKv.get(i);
275        Cell cell = seeker.getCell();
276        if (
277          PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue,
278            cell) != 0
279        ) {
280          int commonPrefix =
281            PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true);
282          fail(String.format(
283            "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d"
284              + "\n expected %s\n actual      %s",
285            encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(),
286              expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()),
287            CellUtil.toString(cell, false)));
288        }
289        i++;
290      } while (seeker.next());
291    }
292  }
293
294  /**
295   * Test whether the decompression of first key is implemented correctly. n
296   */
297  @Test
298  public void testFirstKeyInBlockOnSample() throws IOException {
299    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
300
301    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
302      if (encoding.getEncoder() == null) {
303        continue;
304      }
305      DataBlockEncoder encoder = encoding.getEncoder();
306      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
307        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
308      Cell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer));
309      KeyValue firstKv = sampleKv.get(0);
310      if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) {
311        int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true);
312        fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix));
313      }
314    }
315  }
316
317  @Test
318  public void testRowIndexWithTagsButNoTagsInCell() throws IOException {
319    List<KeyValue> kvList = new ArrayList<>();
320    byte[] row = new byte[0];
321    byte[] family = new byte[0];
322    byte[] qualifier = new byte[0];
323    byte[] value = new byte[0];
324    KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value);
325    kvList.add(expectedKV);
326    DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1;
327    DataBlockEncoder encoder = encoding.getEncoder();
328    ByteBuffer encodedBuffer =
329      encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false);
330    HFileContext meta =
331      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
332        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
333    DataBlockEncoder.EncodedSeeker seeker =
334      encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
335    seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
336    Cell cell = seeker.getCell();
337    Assert.assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength());
338  }
339
340  private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers,
341    boolean seekBefore, Cell keyValue) {
342    Cell expectedKeyValue = null;
343    ByteBuffer expectedKey = null;
344    ByteBuffer expectedValue = null;
345    for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
346      seeker.seekToKeyInBlock(keyValue, seekBefore);
347      seeker.rewind();
348
349      Cell actualKeyValue = seeker.getCell();
350      ByteBuffer actualKey = null;
351      actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey());
352      ByteBuffer actualValue = seeker.getValueShallowCopy();
353
354      if (expectedKeyValue != null) {
355        assertTrue(CellUtil.equals(expectedKeyValue, actualKeyValue));
356      } else {
357        expectedKeyValue = actualKeyValue;
358      }
359
360      if (expectedKey != null) {
361        assertEquals(expectedKey, actualKey);
362      } else {
363        expectedKey = actualKey;
364      }
365
366      if (expectedValue != null) {
367        assertEquals(expectedValue, actualValue);
368      } else {
369        expectedValue = actualValue;
370      }
371    }
372  }
373
374  private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
375    boolean includesTags) throws IOException {
376    ByteBuffer unencodedDataBuf =
377      RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS);
378    HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
379      .withIncludesTags(includesTags).build();
380    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
381      DataBlockEncoder encoder = encoding.getEncoder();
382      if (encoder == null) {
383        continue;
384      }
385      HFileBlockEncodingContext encodingContext =
386        new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext);
387      ByteArrayOutputStream baos = new ByteArrayOutputStream();
388      baos.write(HFILEBLOCK_DUMMY_HEADER);
389      DataOutputStream dos = new DataOutputStream(baos);
390      encoder.startBlockEncoding(encodingContext, dos);
391      for (KeyValue kv : kvList) {
392        encoder.encode(kv, encodingContext, dos);
393      }
394      encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
395      byte[] encodedData = baos.toByteArray();
396
397      testAlgorithm(encodedData, unencodedDataBuf, encoder);
398    }
399  }
400
401  @Test
402  public void testZeroByte() throws IOException {
403    List<KeyValue> kvList = new ArrayList<>();
404    byte[] row = Bytes.toBytes("abcd");
405    byte[] family = new byte[] { 'f' };
406    byte[] qualifier0 = new byte[] { 'b' };
407    byte[] qualifier1 = new byte[] { 'c' };
408    byte[] value0 = new byte[] { 'd' };
409    byte[] value1 = new byte[] { 0x00 };
410    if (includesTags) {
411      kvList.add(new KeyValue(row, family, qualifier0, 0, value0,
412        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
413      kvList.add(new KeyValue(row, family, qualifier1, 0, value1,
414        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
415    } else {
416      kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
417      kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
418    }
419    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
420  }
421
422  private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
423    DataBlockEncoder encoder) throws IOException {
424    // decode
425    ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
426      encodedData.length - ENCODED_DATA_OFFSET);
427    DataInputStream dis = new DataInputStream(bais);
428    ByteBuffer actualDataset;
429    HFileContext meta =
430      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
431        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
432    actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta));
433    actualDataset.rewind();
434
435    // this is because in case of prefix tree the decoded stream will not have
436    // the
437    // mvcc in it.
438    assertEquals("Encoding -> decoding gives different results for " + encoder,
439      Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset));
440  }
441}