001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.fail;
022
023import java.io.ByteArrayInputStream;
024import java.io.ByteArrayOutputStream;
025import java.io.OutputStream;
026import java.util.HashSet;
027import java.util.Random;
028import java.util.Set;
029import java.util.concurrent.Callable;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.LinkedBlockingDeque;
033import java.util.concurrent.TimeUnit;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.testclassification.MiscTests;
037import org.apache.hadoop.hbase.testclassification.SmallTests;
038import org.apache.hadoop.io.compress.AlreadyClosedException;
039import org.apache.hadoop.io.compress.CompressionInputStream;
040import org.apache.hadoop.io.compress.Compressor;
041import org.apache.hadoop.io.compress.Decompressor;
042import org.apache.hadoop.io.compress.DefaultCodec;
043import org.apache.hadoop.io.compress.GzipCodec;
044import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
045import org.junit.Before;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051/**
052 * Along with CodecPool, this is copied from the class of the same name in hadoop-common. Modified
053 * to accommodate changes to HBase's CodecPool.
054 */
055@Category({ MiscTests.class, SmallTests.class })
056public class TestCodecPool {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestCodecPool.class);
061
062  private final String LEASE_COUNT_ERR = "Incorrect number of leased (de)compressors";
063  DefaultCodec codec;
064
065  @BeforeClass
066  public static void beforeClass() {
067    CodecPool.initLeaseCounting();
068  }
069
070  @Before
071  public void setup() {
072    this.codec = new DefaultCodec();
073    this.codec.setConf(new Configuration());
074  }
075
076  @Test(timeout = 10000)
077  public void testCompressorPoolCounts() {
078    // Get two compressors and return them
079    Compressor comp1 = CodecPool.getCompressor(codec);
080    Compressor comp2 = CodecPool.getCompressor(codec);
081    assertEquals(LEASE_COUNT_ERR, 2, CodecPool.getLeasedCompressorsCount(codec));
082
083    CodecPool.returnCompressor(comp2);
084    assertEquals(LEASE_COUNT_ERR, 1, CodecPool.getLeasedCompressorsCount(codec));
085
086    CodecPool.returnCompressor(comp1);
087    assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec));
088
089    CodecPool.returnCompressor(comp1);
090    assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec));
091  }
092
093  @Test(timeout = 10000)
094  public void testCompressorNotReturnSameInstance() {
095    Compressor comp = CodecPool.getCompressor(codec);
096    CodecPool.returnCompressor(comp);
097    CodecPool.returnCompressor(comp);
098    Set<Compressor> compressors = new HashSet<Compressor>();
099    for (int i = 0; i < 10; ++i) {
100      compressors.add(CodecPool.getCompressor(codec));
101    }
102    assertEquals(10, compressors.size());
103    for (Compressor compressor : compressors) {
104      CodecPool.returnCompressor(compressor);
105    }
106  }
107
108  @Test(timeout = 10000)
109  public void testDecompressorPoolCounts() {
110    // Get two decompressors and return them
111    Decompressor decomp1 = CodecPool.getDecompressor(codec);
112    Decompressor decomp2 = CodecPool.getDecompressor(codec);
113    assertEquals(LEASE_COUNT_ERR, 2, CodecPool.getLeasedDecompressorsCount(codec));
114
115    CodecPool.returnDecompressor(decomp2);
116    assertEquals(LEASE_COUNT_ERR, 1, CodecPool.getLeasedDecompressorsCount(codec));
117
118    CodecPool.returnDecompressor(decomp1);
119    assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec));
120
121    CodecPool.returnDecompressor(decomp1);
122    assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec));
123  }
124
125  @Test(timeout = 10000)
126  public void testMultiThreadedCompressorPool() throws InterruptedException {
127    final int iterations = 4;
128    ExecutorService threadpool = Executors.newFixedThreadPool(3);
129    final LinkedBlockingDeque<Compressor> queue =
130      new LinkedBlockingDeque<Compressor>(2 * iterations);
131
132    Callable<Boolean> consumer = new Callable<Boolean>() {
133      @Override
134      public Boolean call() throws Exception {
135        Compressor c = queue.take();
136        CodecPool.returnCompressor(c);
137        return c != null;
138      }
139    };
140
141    Callable<Boolean> producer = new Callable<Boolean>() {
142      @Override
143      public Boolean call() throws Exception {
144        Compressor c = CodecPool.getCompressor(codec);
145        queue.put(c);
146        return c != null;
147      }
148    };
149
150    for (int i = 0; i < iterations; i++) {
151      threadpool.submit(consumer);
152      threadpool.submit(producer);
153    }
154
155    // wait for completion
156    threadpool.shutdown();
157    threadpool.awaitTermination(1000, TimeUnit.SECONDS);
158
159    assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec));
160  }
161
162  @Test(timeout = 10000)
163  public void testMultiThreadedDecompressorPool() throws InterruptedException {
164    final int iterations = 4;
165    ExecutorService threadpool = Executors.newFixedThreadPool(3);
166    final LinkedBlockingDeque<Decompressor> queue =
167      new LinkedBlockingDeque<Decompressor>(2 * iterations);
168
169    Callable<Boolean> consumer = new Callable<Boolean>() {
170      @Override
171      public Boolean call() throws Exception {
172        Decompressor dc = queue.take();
173        CodecPool.returnDecompressor(dc);
174        return dc != null;
175      }
176    };
177
178    Callable<Boolean> producer = new Callable<Boolean>() {
179      @Override
180      public Boolean call() throws Exception {
181        Decompressor c = CodecPool.getDecompressor(codec);
182        queue.put(c);
183        return c != null;
184      }
185    };
186
187    for (int i = 0; i < iterations; i++) {
188      threadpool.submit(consumer);
189      threadpool.submit(producer);
190    }
191
192    // wait for completion
193    threadpool.shutdown();
194    threadpool.awaitTermination(1000, TimeUnit.SECONDS);
195
196    assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec));
197  }
198
199  @Test(timeout = 10000)
200  public void testDecompressorNotReturnSameInstance() {
201    Decompressor decomp = CodecPool.getDecompressor(codec);
202    CodecPool.returnDecompressor(decomp);
203    CodecPool.returnDecompressor(decomp);
204    Set<Decompressor> decompressors = new HashSet<Decompressor>();
205    for (int i = 0; i < 10; ++i) {
206      decompressors.add(CodecPool.getDecompressor(codec));
207    }
208    assertEquals(10, decompressors.size());
209    for (Decompressor decompressor : decompressors) {
210      CodecPool.returnDecompressor(decompressor);
211    }
212  }
213
214  @Test(timeout = 10000)
215  public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception {
216
217    final GzipCodec gzipCodec = new GzipCodec();
218    gzipCodec.setConf(new Configuration());
219
220    final Random random = new Random();
221    final byte[] bytes = new byte[1024];
222    random.nextBytes(bytes);
223
224    ByteArrayOutputStream baos = new ByteArrayOutputStream();
225    try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
226      outputStream.write(bytes);
227    }
228
229    final byte[] gzipBytes = baos.toByteArray();
230    final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);
231
232    // BuiltInGzipDecompressor is an explicit example of a Decompressor
233    // with the @DoNotPool annotation
234    final Decompressor decompressor = new BuiltInGzipDecompressor();
235    CodecPool.returnDecompressor(decompressor);
236
237    final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor);
238    boolean passed = false;
239    try {
240      inputStream.read();
241    } catch (AlreadyClosedException e) {
242      if (e.getMessage().contains("decompress called on closed decompressor")) {
243        passed = true;
244      }
245    }
246
247    if (!passed) {
248      fail("Decompressor from Codec with @DoNotPool should not be "
249        + "useable after returning to CodecPool");
250    }
251  }
252
253}