001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.OutputStream; 026import java.util.HashSet; 027import java.util.Random; 028import java.util.Set; 029import java.util.concurrent.Callable; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.LinkedBlockingDeque; 033import java.util.concurrent.TimeUnit; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.testclassification.MiscTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.apache.hadoop.io.compress.AlreadyClosedException; 039import org.apache.hadoop.io.compress.CompressionInputStream; 040import org.apache.hadoop.io.compress.Compressor; 041import org.apache.hadoop.io.compress.Decompressor; 042import org.apache.hadoop.io.compress.DefaultCodec; 043import org.apache.hadoop.io.compress.GzipCodec; 044import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; 045import org.junit.Before; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051/** 052 * Along with CodecPool, this is copied from the class of the same name in hadoop-common. Modified 053 * to accommodate changes to HBase's CodecPool. 054 */ 055@Category({ MiscTests.class, SmallTests.class }) 056public class TestCodecPool { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestCodecPool.class); 061 062 private final String LEASE_COUNT_ERR = "Incorrect number of leased (de)compressors"; 063 DefaultCodec codec; 064 065 @BeforeClass 066 public static void beforeClass() { 067 CodecPool.initLeaseCounting(); 068 } 069 070 @Before 071 public void setup() { 072 this.codec = new DefaultCodec(); 073 this.codec.setConf(new Configuration()); 074 } 075 076 @Test(timeout = 10000) 077 public void testCompressorPoolCounts() { 078 // Get two compressors and return them 079 Compressor comp1 = CodecPool.getCompressor(codec); 080 Compressor comp2 = CodecPool.getCompressor(codec); 081 assertEquals(LEASE_COUNT_ERR, 2, CodecPool.getLeasedCompressorsCount(codec)); 082 083 CodecPool.returnCompressor(comp2); 084 assertEquals(LEASE_COUNT_ERR, 1, CodecPool.getLeasedCompressorsCount(codec)); 085 086 CodecPool.returnCompressor(comp1); 087 assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec)); 088 089 CodecPool.returnCompressor(comp1); 090 assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec)); 091 } 092 093 @Test(timeout = 10000) 094 public void testCompressorNotReturnSameInstance() { 095 Compressor comp = CodecPool.getCompressor(codec); 096 CodecPool.returnCompressor(comp); 097 CodecPool.returnCompressor(comp); 098 Set<Compressor> compressors = new HashSet<Compressor>(); 099 for (int i = 0; i < 10; ++i) { 100 compressors.add(CodecPool.getCompressor(codec)); 101 } 102 assertEquals(10, compressors.size()); 103 for (Compressor compressor : compressors) { 104 CodecPool.returnCompressor(compressor); 105 } 106 } 107 108 @Test(timeout = 10000) 109 public void testDecompressorPoolCounts() { 110 // Get two decompressors and return them 111 Decompressor decomp1 = CodecPool.getDecompressor(codec); 112 Decompressor decomp2 = CodecPool.getDecompressor(codec); 113 assertEquals(LEASE_COUNT_ERR, 2, CodecPool.getLeasedDecompressorsCount(codec)); 114 115 CodecPool.returnDecompressor(decomp2); 116 assertEquals(LEASE_COUNT_ERR, 1, CodecPool.getLeasedDecompressorsCount(codec)); 117 118 CodecPool.returnDecompressor(decomp1); 119 assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec)); 120 121 CodecPool.returnDecompressor(decomp1); 122 assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec)); 123 } 124 125 @Test(timeout = 10000) 126 public void testMultiThreadedCompressorPool() throws InterruptedException { 127 final int iterations = 4; 128 ExecutorService threadpool = Executors.newFixedThreadPool(3); 129 final LinkedBlockingDeque<Compressor> queue = 130 new LinkedBlockingDeque<Compressor>(2 * iterations); 131 132 Callable<Boolean> consumer = new Callable<Boolean>() { 133 @Override 134 public Boolean call() throws Exception { 135 Compressor c = queue.take(); 136 CodecPool.returnCompressor(c); 137 return c != null; 138 } 139 }; 140 141 Callable<Boolean> producer = new Callable<Boolean>() { 142 @Override 143 public Boolean call() throws Exception { 144 Compressor c = CodecPool.getCompressor(codec); 145 queue.put(c); 146 return c != null; 147 } 148 }; 149 150 for (int i = 0; i < iterations; i++) { 151 threadpool.submit(consumer); 152 threadpool.submit(producer); 153 } 154 155 // wait for completion 156 threadpool.shutdown(); 157 threadpool.awaitTermination(1000, TimeUnit.SECONDS); 158 159 assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedCompressorsCount(codec)); 160 } 161 162 @Test(timeout = 10000) 163 public void testMultiThreadedDecompressorPool() throws InterruptedException { 164 final int iterations = 4; 165 ExecutorService threadpool = Executors.newFixedThreadPool(3); 166 final LinkedBlockingDeque<Decompressor> queue = 167 new LinkedBlockingDeque<Decompressor>(2 * iterations); 168 169 Callable<Boolean> consumer = new Callable<Boolean>() { 170 @Override 171 public Boolean call() throws Exception { 172 Decompressor dc = queue.take(); 173 CodecPool.returnDecompressor(dc); 174 return dc != null; 175 } 176 }; 177 178 Callable<Boolean> producer = new Callable<Boolean>() { 179 @Override 180 public Boolean call() throws Exception { 181 Decompressor c = CodecPool.getDecompressor(codec); 182 queue.put(c); 183 return c != null; 184 } 185 }; 186 187 for (int i = 0; i < iterations; i++) { 188 threadpool.submit(consumer); 189 threadpool.submit(producer); 190 } 191 192 // wait for completion 193 threadpool.shutdown(); 194 threadpool.awaitTermination(1000, TimeUnit.SECONDS); 195 196 assertEquals(LEASE_COUNT_ERR, 0, CodecPool.getLeasedDecompressorsCount(codec)); 197 } 198 199 @Test(timeout = 10000) 200 public void testDecompressorNotReturnSameInstance() { 201 Decompressor decomp = CodecPool.getDecompressor(codec); 202 CodecPool.returnDecompressor(decomp); 203 CodecPool.returnDecompressor(decomp); 204 Set<Decompressor> decompressors = new HashSet<Decompressor>(); 205 for (int i = 0; i < 10; ++i) { 206 decompressors.add(CodecPool.getDecompressor(codec)); 207 } 208 assertEquals(10, decompressors.size()); 209 for (Decompressor decompressor : decompressors) { 210 CodecPool.returnDecompressor(decompressor); 211 } 212 } 213 214 @Test(timeout = 10000) 215 public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { 216 217 final GzipCodec gzipCodec = new GzipCodec(); 218 gzipCodec.setConf(new Configuration()); 219 220 final Random random = new Random(); 221 final byte[] bytes = new byte[1024]; 222 random.nextBytes(bytes); 223 224 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 225 try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) { 226 outputStream.write(bytes); 227 } 228 229 final byte[] gzipBytes = baos.toByteArray(); 230 final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes); 231 232 // BuiltInGzipDecompressor is an explicit example of a Decompressor 233 // with the @DoNotPool annotation 234 final Decompressor decompressor = new BuiltInGzipDecompressor(); 235 CodecPool.returnDecompressor(decompressor); 236 237 final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); 238 boolean passed = false; 239 try { 240 inputStream.read(); 241 } catch (AlreadyClosedException e) { 242 if (e.getMessage().contains("decompress called on closed decompressor")) { 243 passed = true; 244 } 245 } 246 247 if (!passed) { 248 fail("Decompressor from Codec with @DoNotPool should not be " 249 + "useable after returning to CodecPool"); 250 } 251 } 252 253}