001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.InputStream;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.stream.Stream;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ArrayBackedTag;
032import org.apache.hadoop.hbase.ByteBufferKeyValue;
033import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.KeyValue;
037import org.apache.hadoop.hbase.PrivateCellUtil;
038import org.apache.hadoop.hbase.codec.Codec.Decoder;
039import org.apache.hadoop.hbase.codec.Codec.Encoder;
040import org.apache.hadoop.hbase.io.compress.Compression;
041import org.apache.hadoop.hbase.io.util.LRUDictionary;
042import org.apache.hadoop.hbase.testclassification.RegionServerTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.TestTemplate;
047import org.junit.jupiter.params.provider.Arguments;
048
049@Tag(RegionServerTests.TAG)
050@Tag(SmallTests.TAG)
051@HBaseParameterizedTestTemplate
052public class TestWALCellCodecWithCompression {
053
054  private final Compression.Algorithm compression;
055
056  public TestWALCellCodecWithCompression(Compression.Algorithm algo) {
057    this.compression = algo;
058  }
059
060  public static Stream<Arguments> parameters() {
061    return HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS_PARAMETERIZED.stream().map(Arguments::of);
062  }
063
064  @TestTemplate
065  public void testEncodeDecodeKVsWithTags() throws Exception {
066    doTest(false, false);
067  }
068
069  @TestTemplate
070  public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception {
071    doTest(true, false);
072  }
073
074  @TestTemplate
075  public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception {
076    doTest(true, true);
077  }
078
079  @TestTemplate
080  public void testValueCompressionEnabled() throws Exception {
081    doTest(false, true);
082  }
083
084  @TestTemplate
085  public void testValueCompression() throws Exception {
086    final byte[] row_1 = Bytes.toBytes("row_1");
087    final byte[] value_1 = new byte[20];
088    Bytes.zero(value_1);
089    final byte[] row_2 = Bytes.toBytes("row_2");
090    final byte[] value_2 = new byte[Bytes.SIZEOF_LONG];
091    Bytes.random(value_2);
092    final byte[] row_3 = Bytes.toBytes("row_3");
093    final byte[] value_3 = new byte[100];
094    Bytes.random(value_3);
095    final byte[] row_4 = Bytes.toBytes("row_4");
096    final byte[] value_4 = new byte[128];
097    fillBytes(value_4, Bytes.toBytes("DEADBEEF"));
098    final byte[] row_5 = Bytes.toBytes("row_5");
099    final byte[] value_5 = new byte[64];
100    fillBytes(value_5, Bytes.toBytes("CAFEBABE"));
101
102    Configuration conf = new Configuration(false);
103    WALCellCodec codec = new WALCellCodec(conf,
104      new CompressionContext(LRUDictionary.class, false, true, true, compression));
105    ByteArrayOutputStream bos = new ByteArrayOutputStream();
106    Encoder encoder = codec.getEncoder(bos);
107    encoder.write(createKV(row_1, value_1, 0));
108    encoder.write(createOffheapKV(row_2, value_2, 0));
109    encoder.write(createKV(row_3, value_3, 0));
110    encoder.write(createOffheapKV(row_4, value_4, 0));
111    encoder.write(createKV(row_5, value_5, 0));
112    encoder.flush();
113    try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
114      Decoder decoder = codec.getDecoder(is);
115      decoder.advance();
116      KeyValue kv = (KeyValue) decoder.current();
117      assertTrue(Bytes.equals(row_1, 0, row_1.length, kv.getRowArray(), kv.getRowOffset(),
118        kv.getRowLength()));
119      assertTrue(Bytes.equals(value_1, 0, value_1.length, kv.getValueArray(), kv.getValueOffset(),
120        kv.getValueLength()));
121      decoder.advance();
122      kv = (KeyValue) decoder.current();
123      assertTrue(Bytes.equals(row_2, 0, row_2.length, kv.getRowArray(), kv.getRowOffset(),
124        kv.getRowLength()));
125      assertTrue(Bytes.equals(value_2, 0, value_2.length, kv.getValueArray(), kv.getValueOffset(),
126        kv.getValueLength()));
127      decoder.advance();
128      kv = (KeyValue) decoder.current();
129      assertTrue(Bytes.equals(row_3, 0, row_3.length, kv.getRowArray(), kv.getRowOffset(),
130        kv.getRowLength()));
131      assertTrue(Bytes.equals(value_3, 0, value_3.length, kv.getValueArray(), kv.getValueOffset(),
132        kv.getValueLength()));
133      decoder.advance();
134      kv = (KeyValue) decoder.current();
135      assertTrue(Bytes.equals(row_4, 0, row_4.length, kv.getRowArray(), kv.getRowOffset(),
136        kv.getRowLength()));
137      assertTrue(Bytes.equals(value_4, 0, value_4.length, kv.getValueArray(), kv.getValueOffset(),
138        kv.getValueLength()));
139      decoder.advance();
140      kv = (KeyValue) decoder.current();
141      assertTrue(Bytes.equals(row_5, 0, row_5.length, kv.getRowArray(), kv.getRowOffset(),
142        kv.getRowLength()));
143      assertTrue(Bytes.equals(value_5, 0, value_5.length, kv.getValueArray(), kv.getValueOffset(),
144        kv.getValueLength()));
145    }
146  }
147
148  static void fillBytes(byte[] buffer, byte[] fill) {
149    int offset = 0;
150    int remaining = buffer.length;
151    while (remaining > 0) {
152      int len = remaining < fill.length ? remaining : fill.length;
153      System.arraycopy(fill, 0, buffer, offset, len);
154      offset += len;
155      remaining -= len;
156    }
157  }
158
159  private void doTest(boolean compressTags, boolean offheapKV) throws Exception {
160    final byte[] key = Bytes.toBytes("myRow");
161    final byte[] value = Bytes.toBytes("myValue");
162    Configuration conf = new Configuration(false);
163    conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
164    WALCellCodec codec =
165      new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, compressTags));
166    ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
167    Encoder encoder = codec.getEncoder(bos);
168    if (offheapKV) {
169      encoder.write(createOffheapKV(key, value, 1));
170      encoder.write(createOffheapKV(key, value, 0));
171      encoder.write(createOffheapKV(key, value, 2));
172    } else {
173      encoder.write(createKV(key, value, 1));
174      encoder.write(createKV(key, value, 0));
175      encoder.write(createKV(key, value, 2));
176    }
177
178    InputStream is = new ByteArrayInputStream(bos.toByteArray());
179    Decoder decoder = codec.getDecoder(is);
180    decoder.advance();
181    KeyValue kv = (KeyValue) decoder.current();
182    List<org.apache.hadoop.hbase.Tag> tags = PrivateCellUtil.getTags(kv);
183    assertEquals(1, tags.size());
184    assertEquals("tagValue1", Bytes.toString(org.apache.hadoop.hbase.Tag.cloneValue(tags.get(0))));
185    decoder.advance();
186    kv = (KeyValue) decoder.current();
187    tags = PrivateCellUtil.getTags(kv);
188    assertEquals(0, tags.size());
189    decoder.advance();
190    kv = (KeyValue) decoder.current();
191    tags = PrivateCellUtil.getTags(kv);
192    assertEquals(2, tags.size());
193    assertEquals("tagValue1", Bytes.toString(org.apache.hadoop.hbase.Tag.cloneValue(tags.get(0))));
194    assertEquals("tagValue2", Bytes.toString(org.apache.hadoop.hbase.Tag.cloneValue(tags.get(1))));
195  }
196
197  private KeyValue createKV(byte[] row, byte[] value, int noOfTags) {
198    byte[] cf = Bytes.toBytes("myCF");
199    byte[] q = Bytes.toBytes("myQualifier");
200    List<org.apache.hadoop.hbase.Tag> tags = new ArrayList<>(noOfTags);
201    for (int i = 1; i <= noOfTags; i++) {
202      tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
203    }
204    return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
205  }
206
207  private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) {
208    byte[] cf = Bytes.toBytes("myCF");
209    byte[] q = Bytes.toBytes("myQualifier");
210    List<org.apache.hadoop.hbase.Tag> tags = new ArrayList<>(noOfTags);
211    for (int i = 1; i <= noOfTags; i++) {
212      tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i)));
213    }
214    KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags);
215    ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length);
216    dbb.put(kv.getBuffer());
217    return new ByteBufferKeyValue(dbb, 0, kv.getBuffer().length);
218  }
219}