001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.OutputStream;
026import java.util.HashSet;
027import java.util.Random;
028import java.util.Set;
029import java.util.concurrent.Callable;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.LinkedBlockingDeque;
033import java.util.concurrent.TimeUnit;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.testclassification.MiscTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.io.compress.AlreadyClosedException;
038import org.apache.hadoop.io.compress.CompressionInputStream;
039import org.apache.hadoop.io.compress.Compressor;
040import org.apache.hadoop.io.compress.Decompressor;
041import org.apache.hadoop.io.compress.DefaultCodec;
042import org.apache.hadoop.io.compress.GzipCodec;
043import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
044import org.junit.jupiter.api.BeforeAll;
045import org.junit.jupiter.api.BeforeEach;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049/**
050 * Along with CodecPool, this is copied from the class of the same name in hadoop-common. Modified
051 * to accommodate changes to HBase's CodecPool.
052 */
053@Tag(MiscTests.TAG)
054@Tag(SmallTests.TAG)
055public class TestCodecPool {
056
057  private final String LEASE_COUNT_ERR = "Incorrect number of leased (de)compressors";
058  DefaultCodec codec;
059
060  @BeforeAll
061  public static void beforeClass() {
062    CodecPool.initLeaseCounting();
063  }
064
065  @BeforeEach
066  public void setup() {
067    this.codec = new DefaultCodec();
068    this.codec.setConf(new Configuration());
069  }
070
071  @Test
072  public void testCompressorPoolCounts() {
073    // Get two compressors and return them
074    Compressor comp1 = CodecPool.getCompressor(codec);
075    Compressor comp2 = CodecPool.getCompressor(codec);
076    assertEquals(2, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR);
077
078    CodecPool.returnCompressor(comp2);
079    assertEquals(1, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR);
080
081    CodecPool.returnCompressor(comp1);
082    assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR);
083
084    CodecPool.returnCompressor(comp1);
085    assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR);
086  }
087
088  @Test
089  public void testCompressorNotReturnSameInstance() {
090    Compressor comp = CodecPool.getCompressor(codec);
091    CodecPool.returnCompressor(comp);
092    CodecPool.returnCompressor(comp);
093    Set<Compressor> compressors = new HashSet<Compressor>();
094    for (int i = 0; i < 10; ++i) {
095      compressors.add(CodecPool.getCompressor(codec));
096    }
097    assertEquals(10, compressors.size());
098    for (Compressor compressor : compressors) {
099      CodecPool.returnCompressor(compressor);
100    }
101  }
102
103  @Test
104  public void testDecompressorPoolCounts() {
105    // Get two decompressors and return them
106    Decompressor decomp1 = CodecPool.getDecompressor(codec);
107    Decompressor decomp2 = CodecPool.getDecompressor(codec);
108    assertEquals(2, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR);
109
110    CodecPool.returnDecompressor(decomp2);
111    assertEquals(1, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR);
112
113    CodecPool.returnDecompressor(decomp1);
114    assertEquals(0, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR);
115
116    CodecPool.returnDecompressor(decomp1);
117    assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR);
118  }
119
120  @Test
121  public void testMultiThreadedCompressorPool() throws InterruptedException {
122    final int iterations = 4;
123    ExecutorService threadpool = Executors.newFixedThreadPool(3);
124    final LinkedBlockingDeque<Compressor> queue =
125      new LinkedBlockingDeque<Compressor>(2 * iterations);
126
127    Callable<Boolean> consumer = new Callable<Boolean>() {
128      @Override
129      public Boolean call() throws Exception {
130        Compressor c = queue.take();
131        CodecPool.returnCompressor(c);
132        return c != null;
133      }
134    };
135
136    Callable<Boolean> producer = new Callable<Boolean>() {
137      @Override
138      public Boolean call() throws Exception {
139        Compressor c = CodecPool.getCompressor(codec);
140        queue.put(c);
141        return c != null;
142      }
143    };
144
145    for (int i = 0; i < iterations; i++) {
146      threadpool.submit(consumer);
147      threadpool.submit(producer);
148    }
149
150    // wait for completion
151    threadpool.shutdown();
152    threadpool.awaitTermination(1000, TimeUnit.SECONDS);
153
154    assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR);
155  }
156
157  @Test
158  public void testMultiThreadedDecompressorPool() throws InterruptedException {
159    final int iterations = 4;
160    ExecutorService threadpool = Executors.newFixedThreadPool(3);
161    final LinkedBlockingDeque<Decompressor> queue =
162      new LinkedBlockingDeque<Decompressor>(2 * iterations);
163
164    Callable<Boolean> consumer = new Callable<Boolean>() {
165      @Override
166      public Boolean call() throws Exception {
167        Decompressor dc = queue.take();
168        CodecPool.returnDecompressor(dc);
169        return dc != null;
170      }
171    };
172
173    Callable<Boolean> producer = new Callable<Boolean>() {
174      @Override
175      public Boolean call() throws Exception {
176        Decompressor c = CodecPool.getDecompressor(codec);
177        queue.put(c);
178        return c != null;
179      }
180    };
181
182    for (int i = 0; i < iterations; i++) {
183      threadpool.submit(consumer);
184      threadpool.submit(producer);
185    }
186
187    // wait for completion
188    threadpool.shutdown();
189    threadpool.awaitTermination(1000, TimeUnit.SECONDS);
190
191    assertEquals(0, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR);
192  }
193
194  @Test
195  public void testDecompressorNotReturnSameInstance() {
196    Decompressor decomp = CodecPool.getDecompressor(codec);
197    CodecPool.returnDecompressor(decomp);
198    CodecPool.returnDecompressor(decomp);
199    Set<Decompressor> decompressors = new HashSet<Decompressor>();
200    for (int i = 0; i < 10; ++i) {
201      decompressors.add(CodecPool.getDecompressor(codec));
202    }
203    assertEquals(10, decompressors.size());
204    for (Decompressor decompressor : decompressors) {
205      CodecPool.returnDecompressor(decompressor);
206    }
207  }
208
209  @Test
210  public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception {
211
212    final GzipCodec gzipCodec = new GzipCodec();
213    gzipCodec.setConf(new Configuration());
214
215    final Random random = new Random();
216    final byte[] bytes = new byte[1024];
217    random.nextBytes(bytes);
218
219    ByteArrayOutputStream baos = new ByteArrayOutputStream();
220    try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
221      outputStream.write(bytes);
222    }
223
224    final byte[] gzipBytes = baos.toByteArray();
225    final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);
226
227    // BuiltInGzipDecompressor is an explicit example of a Decompressor
228    // with the @DoNotPool annotation
229    final Decompressor decompressor = new BuiltInGzipDecompressor();
230    CodecPool.returnDecompressor(decompressor);
231
232    final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor);
233    boolean passed = false;
234    try {
235      inputStream.read();
236    } catch (AlreadyClosedException e) {
237      if (e.getMessage().contains("decompress called on closed decompressor")) {
238        passed = true;
239      }
240    }
241
242    if (!passed) {
243      fail("Decompressor from Codec with @DoNotPool should not be "
244        + "useable after returning to CodecPool");
245    }
246  }
247
248}