001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.io.ByteArrayInputStream; 024import java.io.ByteArrayOutputStream; 025import java.io.OutputStream; 026import java.util.HashSet; 027import java.util.Random; 028import java.util.Set; 029import java.util.concurrent.Callable; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.LinkedBlockingDeque; 033import java.util.concurrent.TimeUnit; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.testclassification.MiscTests; 036import org.apache.hadoop.hbase.testclassification.SmallTests; 037import org.apache.hadoop.io.compress.AlreadyClosedException; 038import org.apache.hadoop.io.compress.CompressionInputStream; 039import org.apache.hadoop.io.compress.Compressor; 040import org.apache.hadoop.io.compress.Decompressor; 041import org.apache.hadoop.io.compress.DefaultCodec; 042import org.apache.hadoop.io.compress.GzipCodec; 043import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; 044import org.junit.jupiter.api.BeforeAll; 045import org.junit.jupiter.api.BeforeEach; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048 049/** 050 * Along with CodecPool, this is copied from the class of the same name in hadoop-common. Modified 051 * to accommodate changes to HBase's CodecPool. 052 */ 053@Tag(MiscTests.TAG) 054@Tag(SmallTests.TAG) 055public class TestCodecPool { 056 057 private final String LEASE_COUNT_ERR = "Incorrect number of leased (de)compressors"; 058 DefaultCodec codec; 059 060 @BeforeAll 061 public static void beforeClass() { 062 CodecPool.initLeaseCounting(); 063 } 064 065 @BeforeEach 066 public void setup() { 067 this.codec = new DefaultCodec(); 068 this.codec.setConf(new Configuration()); 069 } 070 071 @Test 072 public void testCompressorPoolCounts() { 073 // Get two compressors and return them 074 Compressor comp1 = CodecPool.getCompressor(codec); 075 Compressor comp2 = CodecPool.getCompressor(codec); 076 assertEquals(2, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR); 077 078 CodecPool.returnCompressor(comp2); 079 assertEquals(1, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR); 080 081 CodecPool.returnCompressor(comp1); 082 assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR); 083 084 CodecPool.returnCompressor(comp1); 085 assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR); 086 } 087 088 @Test 089 public void testCompressorNotReturnSameInstance() { 090 Compressor comp = CodecPool.getCompressor(codec); 091 CodecPool.returnCompressor(comp); 092 CodecPool.returnCompressor(comp); 093 Set<Compressor> compressors = new HashSet<Compressor>(); 094 for (int i = 0; i < 10; ++i) { 095 compressors.add(CodecPool.getCompressor(codec)); 096 } 097 assertEquals(10, compressors.size()); 098 for (Compressor compressor : compressors) { 099 CodecPool.returnCompressor(compressor); 100 } 101 } 102 103 @Test 104 public void testDecompressorPoolCounts() { 105 // Get two decompressors and return them 106 Decompressor decomp1 = CodecPool.getDecompressor(codec); 107 Decompressor decomp2 = CodecPool.getDecompressor(codec); 108 assertEquals(2, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR); 109 110 CodecPool.returnDecompressor(decomp2); 111 assertEquals(1, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR); 112 113 CodecPool.returnDecompressor(decomp1); 114 assertEquals(0, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR); 115 116 CodecPool.returnDecompressor(decomp1); 117 assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR); 118 } 119 120 @Test 121 public void testMultiThreadedCompressorPool() throws InterruptedException { 122 final int iterations = 4; 123 ExecutorService threadpool = Executors.newFixedThreadPool(3); 124 final LinkedBlockingDeque<Compressor> queue = 125 new LinkedBlockingDeque<Compressor>(2 * iterations); 126 127 Callable<Boolean> consumer = new Callable<Boolean>() { 128 @Override 129 public Boolean call() throws Exception { 130 Compressor c = queue.take(); 131 CodecPool.returnCompressor(c); 132 return c != null; 133 } 134 }; 135 136 Callable<Boolean> producer = new Callable<Boolean>() { 137 @Override 138 public Boolean call() throws Exception { 139 Compressor c = CodecPool.getCompressor(codec); 140 queue.put(c); 141 return c != null; 142 } 143 }; 144 145 for (int i = 0; i < iterations; i++) { 146 threadpool.submit(consumer); 147 threadpool.submit(producer); 148 } 149 150 // wait for completion 151 threadpool.shutdown(); 152 threadpool.awaitTermination(1000, TimeUnit.SECONDS); 153 154 assertEquals(0, CodecPool.getLeasedCompressorsCount(codec), LEASE_COUNT_ERR); 155 } 156 157 @Test 158 public void testMultiThreadedDecompressorPool() throws InterruptedException { 159 final int iterations = 4; 160 ExecutorService threadpool = Executors.newFixedThreadPool(3); 161 final LinkedBlockingDeque<Decompressor> queue = 162 new LinkedBlockingDeque<Decompressor>(2 * iterations); 163 164 Callable<Boolean> consumer = new Callable<Boolean>() { 165 @Override 166 public Boolean call() throws Exception { 167 Decompressor dc = queue.take(); 168 CodecPool.returnDecompressor(dc); 169 return dc != null; 170 } 171 }; 172 173 Callable<Boolean> producer = new Callable<Boolean>() { 174 @Override 175 public Boolean call() throws Exception { 176 Decompressor c = CodecPool.getDecompressor(codec); 177 queue.put(c); 178 return c != null; 179 } 180 }; 181 182 for (int i = 0; i < iterations; i++) { 183 threadpool.submit(consumer); 184 threadpool.submit(producer); 185 } 186 187 // wait for completion 188 threadpool.shutdown(); 189 threadpool.awaitTermination(1000, TimeUnit.SECONDS); 190 191 assertEquals(0, CodecPool.getLeasedDecompressorsCount(codec), LEASE_COUNT_ERR); 192 } 193 194 @Test 195 public void testDecompressorNotReturnSameInstance() { 196 Decompressor decomp = CodecPool.getDecompressor(codec); 197 CodecPool.returnDecompressor(decomp); 198 CodecPool.returnDecompressor(decomp); 199 Set<Decompressor> decompressors = new HashSet<Decompressor>(); 200 for (int i = 0; i < 10; ++i) { 201 decompressors.add(CodecPool.getDecompressor(codec)); 202 } 203 assertEquals(10, decompressors.size()); 204 for (Decompressor decompressor : decompressors) { 205 CodecPool.returnDecompressor(decompressor); 206 } 207 } 208 209 @Test 210 public void testDoNotPoolDecompressorNotUseableAfterReturn() throws Exception { 211 212 final GzipCodec gzipCodec = new GzipCodec(); 213 gzipCodec.setConf(new Configuration()); 214 215 final Random random = new Random(); 216 final byte[] bytes = new byte[1024]; 217 random.nextBytes(bytes); 218 219 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 220 try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) { 221 outputStream.write(bytes); 222 } 223 224 final byte[] gzipBytes = baos.toByteArray(); 225 final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes); 226 227 // BuiltInGzipDecompressor is an explicit example of a Decompressor 228 // with the @DoNotPool annotation 229 final Decompressor decompressor = new BuiltInGzipDecompressor(); 230 CodecPool.returnDecompressor(decompressor); 231 232 final CompressionInputStream inputStream = gzipCodec.createInputStream(bais, decompressor); 233 boolean passed = false; 234 try { 235 inputStream.read(); 236 } catch (AlreadyClosedException e) { 237 if (e.getMessage().contains("decompress called on closed decompressor")) { 238 passed = true; 239 } 240 } 241 242 if (!passed) { 243 fail("Decompressor from Codec with @DoNotPool should not be " 244 + "useable after returning to CodecPool"); 245 } 246 } 247 248}