001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.InputStream;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ArrayBackedTag;
031import org.apache.hadoop.hbase.ByteBufferKeyValue;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.Tag;
038import org.apache.hadoop.hbase.codec.Codec.Decoder;
039import org.apache.hadoop.hbase.codec.Codec.Encoder;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.util.LRUDictionary;
042import org.apache.hadoop.hbase.testclassification.RegionServerTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.runner.RunWith;
049import org.junit.runners.Parameterized;
050import org.junit.runners.Parameterized.Parameters;
051
052@Category({ RegionServerTests.class, SmallTests.class })
053@RunWith(Parameterized.class)
054public class TestWALCellCodecWithCompression {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestWALCellCodecWithCompression.class);
059
060  private Compression.Algorithm compression;
061
062  public TestWALCellCodecWithCompression(Compression.Algorithm algo) {
063    this.compression = algo;
064  }
065
066  @Parameters
067  public static List<Object[]> params() {
068    return HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS_PARAMETERIZED;
069  }
070
071  @Test
072  public void testEncodeDecodeKVsWithTags() throws Exception {
073    doTest(false, false);
074  }
075
076  @Test
077  public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
078    doTest(true, false);
079  }
080
081  @Test
082  public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception {
083    doTest(true, false);
084  }
085
086  @Test
087  public void testValueCompressionEnabled() throws Exception {
088    doTest(false, true);
089  }
090
091  @Test
092  public void testValueCompression() throws Exception {
093    final byte[] row_1 = Bytes.toBytes("row_1");
094    final byte[] value_1 = new byte[20];
095    Bytes.zero(value_1);
096    final byte[] row_2 = Bytes.toBytes("row_2");
097    final byte[] value_2 = new byte[Bytes.SIZEOF_LONG];
098    Bytes.random(value_2);
099    final byte[] row_3 = Bytes.toBytes("row_3");
100    final byte[] value_3 = new byte[100];
101    Bytes.random(value_3);
102    final byte[] row_4 = Bytes.toBytes("row_4");
103    final byte[] value_4 = new byte[128];
104    fillBytes(value_4, Bytes.toBytes("DEADBEEF"));
105    final byte[] row_5 = Bytes.toBytes("row_5");
106    final byte[] value_5 = new byte[64];
107    fillBytes(value_5, Bytes.toBytes("CAFEBABE"));
108
109    Configuration conf = new Configuration(false);
110    WALCellCodec codec = new WALCellCodec(conf,
111      new CompressionContext(LRUDictionary.class, false, true, true, compression));
112    ByteArrayOutputStream bos = new ByteArrayOutputStream();
113    Encoder encoder = codec.getEncoder(bos);
114    encoder.write(createKV(row_1, value_1, 0));
115    encoder.write(createOffheapKV(row_2, value_2, 0));
116    encoder.write(createKV(row_3, value_3, 0));
117    encoder.write(createOffheapKV(row_4, value_4, 0));
118    encoder.write(createKV(row_5, value_5, 0));
119    encoder.flush();
120    try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
121      Decoder decoder = codec.getDecoder(is);
122      decoder.advance();
123      KeyValue kv = (KeyValue) decoder.current();
124      assertTrue(Bytes.equals(row_1, 0, row_1.length, kv.getRowArray(), kv.getRowOffset(),
125        kv.getRowLength()));
126      assertTrue(Bytes.equals(value_1, 0, value_1.length, kv.getValueArray(), kv.getValueOffset(),
127        kv.getValueLength()));
128      decoder.advance();
129      kv = (KeyValue) decoder.current();
130      assertTrue(Bytes.equals(row_2, 0, row_2.length, kv.getRowArray(), kv.getRowOffset(),
131        kv.getRowLength()));
132      assertTrue(Bytes.equals(value_2, 0, value_2.length, kv.getValueArray(), kv.getValueOffset(),
133        kv.getValueLength()));
134      decoder.advance();
135      kv = (KeyValue) decoder.current();
136      assertTrue(Bytes.equals(row_3, 0, row_3.length, kv.getRowArray(), kv.getRowOffset(),
137        kv.getRowLength()));
138      assertTrue(Bytes.equals(value_3, 0, value_3.length, kv.getValueArray(), kv.getValueOffset(),
139        kv.getValueLength()));
140      decoder.advance();
141      kv = (KeyValue) decoder.current();
142      assertTrue(Bytes.equals(row_4, 0, row_4.length, kv.getRowArray(), kv.getRowOffset(),
143        kv.getRowLength()));
144      assertTrue(Bytes.equals(value_4, 0, value_4.length, kv.getValueArray(), kv.getValueOffset(),
145        kv.getValueLength()));
146      decoder.advance();
147      kv = (KeyValue) decoder.current();
148      assertTrue(Bytes.equals(row_5, 0, row_5.length, kv.getRowArray(), kv.getRowOffset(),
149        kv.getRowLength()));
150      assertTrue(Bytes.equals(value_5, 0, value_5.length, kv.getValueArray(), kv.getValueOffset(),
151        kv.getValueLength()));
152    }
153  }
154
155  static void fillBytes(byte[] buffer, byte[] fill) {
156    int offset = 0;
157    int remaining = buffer.length;
158    while (remaining > 0) {
159      int len = remaining < fill.length ? remaining : fill.length;
160      System.arraycopy(fill, 0, buffer, offset, len);
161      offset += len;
162      remaining -= len;
163    }
164  }
165
166  private void doTest(boolean compressTags, boolean offheapKV) throws Exception {
167    final byte[] key = Bytes.toBytes("myRow");
168    final byte[] value = Bytes.toBytes("myValue");
169    Configuration conf = new Configuration(false);
170    conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
171    WALCellCodec codec =
172      new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, compressTags));
173    ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
174    Encoder encoder = codec.getEncoder(bos);
175    if (offheapKV) {
176      encoder.write(createOffheapKV(key, value, 1));
177      encoder.write(createOffheapKV(key, value, 0));
178      encoder.write(createOffheapKV(key, value, 2));
179    } else {
180      encoder.write(createKV(key, value, 1));
181      encoder.write(createKV(key, value, 0));
182      encoder.write(createKV(key, value, 2));
183    }
184
185    InputStream is = new ByteArrayInputStream(bos.toByteArray());
186    Decoder decoder = codec.getDecoder(is);
187    decoder.advance();
188    KeyValue kv = (KeyValue) decoder.current();
189    List<Tag> tags = PrivateCellUtil.getTags(kv);
190    assertEquals(1, tags.size());
191    assertEquals("tagValue1", Bytes.toString(Tag.cloneValue(tags.get(0))));
192    decoder.advance();
193    kv = (KeyValue) decoder.current();
194    tags = PrivateCellUtil.getTags(kv);
195    assertEquals(0, tags.size());
196    decoder.advance();
197    kv = (KeyValue) decoder.current();
198    tags = PrivateCellUtil.getTags(kv);
199    assertEquals(2, tags.size());
200    assertEquals("tagValue1", Bytes.toString(Tag.cloneValue(tags.get(0))));
201    assertEquals("tagValue2", Bytes.toString(Tag.cloneValue(tags.get(1))));
202  }
203
204  private KeyValue createKV(byte[] row, byte[] value, int noOfTags) {
205    byte[] cf = Bytes.toBytes("myCF");
206    byte[] q = Bytes.toBytes("myQualifier");
207    List<Tag> tags = new ArrayList<>(noOfTags);
208    for (int i = 1; i <= noOfTags; i++) {
209      tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
210    }
211    return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
212  }
213
214  private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) {
215    byte[] cf = Bytes.toBytes("myCF");
216    byte[] q = Bytes.toBytes("myQualifier");
217    List<Tag> tags = new ArrayList<>(noOfTags);
218    for (int i = 1; i <= noOfTags; i++) {
219      tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
220    }
221    KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
222    ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
223    dbb.put(kv.getBuffer());
224    return new ByteBufferKeyValue(dbb, 0, kv.getBuffer().length);
225  }
226}