001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.InputStream; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.stream.Stream; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ArrayBackedTag; 032import org.apache.hadoop.hbase.ByteBufferKeyValue; 033import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.KeyValue; 037import org.apache.hadoop.hbase.PrivateCellUtil; 038import org.apache.hadoop.hbase.codec.Codec.Decoder; 039import org.apache.hadoop.hbase.codec.Codec.Encoder; 040import org.apache.hadoop.hbase.io.compress.Compression; 041import org.apache.hadoop.hbase.io.util.LRUDictionary; 042import org.apache.hadoop.hbase.testclassification.RegionServerTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.TestTemplate; 047import org.junit.jupiter.params.provider.Arguments; 048 049@Tag(RegionServerTests.TAG) 050@Tag(SmallTests.TAG) 051@HBaseParameterizedTestTemplate 052public class TestWALCellCodecWithCompression { 053 054 private final Compression.Algorithm compression; 055 056 public TestWALCellCodecWithCompression(Compression.Algorithm algo) { 057 this.compression = algo; 058 } 059 060 public static Stream<Arguments> parameters() { 061 return HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS_PARAMETERIZED.stream().map(Arguments::of); 062 } 063 064 @TestTemplate 065 public void testEncodeDecodeKVsWithTags() throws Exception { 066 doTest(false, false); 067 } 068 069 @TestTemplate 070 public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { 071 doTest(true, false); 072 } 073 074 @TestTemplate 075 public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { 076 doTest(true, true); 077 } 078 079 @TestTemplate 080 public void testValueCompressionEnabled() throws Exception { 081 doTest(false, true); 082 } 083 084 @TestTemplate 085 public void testValueCompression() throws Exception { 086 final byte[] row_1 = Bytes.toBytes("row_1"); 087 final byte[] value_1 = new byte[20]; 088 Bytes.zero(value_1); 089 final byte[] row_2 = Bytes.toBytes("row_2"); 090 final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; 091 Bytes.random(value_2); 092 final byte[] row_3 = Bytes.toBytes("row_3"); 093 final byte[] value_3 = new byte[100]; 094 Bytes.random(value_3); 095 final byte[] row_4 = Bytes.toBytes("row_4"); 096 final byte[] value_4 = new byte[128]; 097 fillBytes(value_4, Bytes.toBytes("DEADBEEF")); 098 final byte[] row_5 = Bytes.toBytes("row_5"); 099 final byte[] value_5 = new byte[64]; 100 fillBytes(value_5, Bytes.toBytes("CAFEBABE")); 101 102 Configuration conf = new Configuration(false); 103 WALCellCodec codec = new WALCellCodec(conf, 104 new CompressionContext(LRUDictionary.class, false, true, true, compression)); 105 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 106 Encoder encoder = codec.getEncoder(bos); 107 encoder.write(createKV(row_1, value_1, 0)); 108 encoder.write(createOffheapKV(row_2, value_2, 0)); 109 encoder.write(createKV(row_3, value_3, 0)); 110 encoder.write(createOffheapKV(row_4, value_4, 0)); 111 encoder.write(createKV(row_5, value_5, 0)); 112 encoder.flush(); 113 try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { 114 Decoder decoder = codec.getDecoder(is); 115 decoder.advance(); 116 KeyValue kv = (KeyValue) decoder.current(); 117 assertTrue(Bytes.equals(row_1, 0, row_1.length, kv.getRowArray(), kv.getRowOffset(), 118 kv.getRowLength())); 119 assertTrue(Bytes.equals(value_1, 0, value_1.length, kv.getValueArray(), kv.getValueOffset(), 120 kv.getValueLength())); 121 decoder.advance(); 122 kv = (KeyValue) decoder.current(); 123 assertTrue(Bytes.equals(row_2, 0, row_2.length, kv.getRowArray(), kv.getRowOffset(), 124 kv.getRowLength())); 125 assertTrue(Bytes.equals(value_2, 0, value_2.length, kv.getValueArray(), kv.getValueOffset(), 126 kv.getValueLength())); 127 decoder.advance(); 128 kv = (KeyValue) decoder.current(); 129 assertTrue(Bytes.equals(row_3, 0, row_3.length, kv.getRowArray(), kv.getRowOffset(), 130 kv.getRowLength())); 131 assertTrue(Bytes.equals(value_3, 0, value_3.length, kv.getValueArray(), kv.getValueOffset(), 132 kv.getValueLength())); 133 decoder.advance(); 134 kv = (KeyValue) decoder.current(); 135 assertTrue(Bytes.equals(row_4, 0, row_4.length, kv.getRowArray(), kv.getRowOffset(), 136 kv.getRowLength())); 137 assertTrue(Bytes.equals(value_4, 0, value_4.length, kv.getValueArray(), kv.getValueOffset(), 138 kv.getValueLength())); 139 decoder.advance(); 140 kv = (KeyValue) decoder.current(); 141 assertTrue(Bytes.equals(row_5, 0, row_5.length, kv.getRowArray(), kv.getRowOffset(), 142 kv.getRowLength())); 143 assertTrue(Bytes.equals(value_5, 0, value_5.length, kv.getValueArray(), kv.getValueOffset(), 144 kv.getValueLength())); 145 } 146 } 147 148 static void fillBytes(byte[] buffer, byte[] fill) { 149 int offset = 0; 150 int remaining = buffer.length; 151 while (remaining > 0) { 152 int len = remaining < fill.length ? remaining : fill.length; 153 System.arraycopy(fill, 0, buffer, offset, len); 154 offset += len; 155 remaining -= len; 156 } 157 } 158 159 private void doTest(boolean compressTags, boolean offheapKV) throws Exception { 160 final byte[] key = Bytes.toBytes("myRow"); 161 final byte[] value = Bytes.toBytes("myValue"); 162 Configuration conf = new Configuration(false); 163 conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); 164 WALCellCodec codec = 165 new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, compressTags)); 166 ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); 167 Encoder encoder = codec.getEncoder(bos); 168 if (offheapKV) { 169 encoder.write(createOffheapKV(key, value, 1)); 170 encoder.write(createOffheapKV(key, value, 0)); 171 encoder.write(createOffheapKV(key, value, 2)); 172 } else { 173 encoder.write(createKV(key, value, 1)); 174 encoder.write(createKV(key, value, 0)); 175 encoder.write(createKV(key, value, 2)); 176 } 177 178 InputStream is = new ByteArrayInputStream(bos.toByteArray()); 179 Decoder decoder = codec.getDecoder(is); 180 decoder.advance(); 181 KeyValue kv = (KeyValue) decoder.current(); 182 List<org.apache.hadoop.hbase.Tag> tags = PrivateCellUtil.getTags(kv); 183 assertEquals(1, tags.size()); 184 assertEquals("tagValue1", Bytes.toString(org.apache.hadoop.hbase.Tag.cloneValue(tags.get(0)))); 185 decoder.advance(); 186 kv = (KeyValue) decoder.current(); 187 tags = PrivateCellUtil.getTags(kv); 188 assertEquals(0, tags.size()); 189 decoder.advance(); 190 kv = (KeyValue) decoder.current(); 191 tags = PrivateCellUtil.getTags(kv); 192 assertEquals(2, tags.size()); 193 assertEquals("tagValue1", Bytes.toString(org.apache.hadoop.hbase.Tag.cloneValue(tags.get(0)))); 194 assertEquals("tagValue2", Bytes.toString(org.apache.hadoop.hbase.Tag.cloneValue(tags.get(1)))); 195 } 196 197 private KeyValue createKV(byte[] row, byte[] value, int noOfTags) { 198 byte[] cf = Bytes.toBytes("myCF"); 199 byte[] q = Bytes.toBytes("myQualifier"); 200 List<org.apache.hadoop.hbase.Tag> tags = new ArrayList<>(noOfTags); 201 for (int i = 1; i <= noOfTags; i++) { 202 tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); 203 } 204 return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); 205 } 206 207 private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) { 208 byte[] cf = Bytes.toBytes("myCF"); 209 byte[] q = Bytes.toBytes("myQualifier"); 210 List<org.apache.hadoop.hbase.Tag> tags = new ArrayList<>(noOfTags); 211 for (int i = 1; i <= noOfTags; i++) { 212 tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); 213 } 214 KeyValue kv = new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); 215 ByteBuffer dbb = ByteBuffer.allocateDirect(kv.getBuffer().length); 216 dbb.put(kv.getBuffer()); 217 return new ByteBufferKeyValue(dbb, 0, kv.getBuffer().length); 218 } 219}