001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.util.Comparator; 022import java.util.NavigableSet; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.ConcurrentSkipListSet; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.io.compress.CompressionCodec; 030import org.apache.hadoop.io.compress.Compressor; 031import org.apache.hadoop.io.compress.Decompressor; 032import org.apache.hadoop.io.compress.DoNotPool; 033import org.apache.hadoop.util.ReflectionUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 039import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 040import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 041 042/** 043 * A global compressor/decompressor pool used to save and reuse (possibly native) 044 * compression/decompression codecs. Copied from the class of the same name in hadoop-common and 045 * augmented to improve borrow/return performance. 046 */ 047@InterfaceAudience.Private 048public class CodecPool { 049 private static final Logger LOG = LoggerFactory.getLogger(CodecPool.class); 050 051 private static final ConcurrentMap<Class<Compressor>, NavigableSet<Compressor>> COMPRESSOR_POOL = 052 new ConcurrentHashMap<>(); 053 054 private static final ConcurrentMap<Class<Decompressor>, 055 NavigableSet<Decompressor>> DECOMPRESSOR_POOL = new ConcurrentHashMap<>(); 056 057 private static final ConcurrentMap<Class<ByteBuffDecompressor>, 058 NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>(); 059 060 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() { 061 return CacheBuilder.newBuilder().build(new CacheLoader<>() { 062 @Override 063 public AtomicInteger load(Class<T> key) throws Exception { 064 return new AtomicInteger(); 065 } 066 }); 067 } 068 069 /** 070 * Map to track the number of leased compressors. Only used in unit tests, kept null otherwise. 071 */ 072 @Nullable 073 private static LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = null; 074 075 /** 076 * Map to tracks the number of leased decompressors. Only used in unit tests, kept null otherwise. 077 */ 078 @Nullable 079 private static LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = null; 080 081 /** 082 * Call if you want lease counting to be enabled. Only used in unit tests. 083 */ 084 static void initLeaseCounting() { 085 compressorCounts = createCache(); 086 decompressorCounts = createCache(); 087 } 088 089 private static <T> T borrow(ConcurrentMap<Class<T>, NavigableSet<T>> pool, 090 Class<? extends T> codecClass) { 091 if (codecClass == null) { 092 return null; 093 } 094 095 NavigableSet<T> codecSet = pool.get(codecClass); 096 if (codecSet != null) { 097 // If a copy of the codec is available, pollFirst() will grab one. 098 // If not, it will return null. 099 return codecSet.pollFirst(); 100 } else { 101 return null; 102 } 103 } 104 105 private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool, T codec) { 106 if (codec != null) { 107 Class<T> codecClass = ReflectionUtils.getClass(codec); 108 Set<T> codecSet = pool.computeIfAbsent(codecClass, 109 k -> new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode))); 110 return codecSet.add(codec); 111 } 112 return false; 113 } 114 115 /** 116 * Copied from hadoop-common without significant modification. 117 */ 118 private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts, 119 Class<? extends T> codecClass) { 120 return usageCounts.getUnchecked((Class<T>) codecClass).get(); 121 } 122 123 /** 124 * Copied from hadoop-common without significant modification. 125 */ 126 private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts, 127 T codec, int delta) { 128 if (codec != null && usageCounts != null) { 129 Class<T> codecClass = ReflectionUtils.getClass(codec); 130 usageCounts.getUnchecked(codecClass).addAndGet(delta); 131 } 132 } 133 134 /** 135 * Get a {@link Compressor} for the given {@link CompressionCodec} from the pool, or get a new one 136 * if the pool is empty. Copied from hadoop-common without significant modification. 137 */ 138 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 139 Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); 140 if (compressor == null) { 141 compressor = codec.createCompressor(); 142 LOG.info("Got brand-new compressor [" + codec.getDefaultExtension() + "]"); 143 } else { 144 compressor.reinit(conf); 145 if (LOG.isDebugEnabled()) { 146 LOG.debug("Got recycled compressor"); 147 } 148 } 149 if (compressor != null && !compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 150 updateLeaseCount(compressorCounts, compressor, 1); 151 } 152 return compressor; 153 } 154 155 public static Compressor getCompressor(CompressionCodec codec) { 156 return getCompressor(codec, null); 157 } 158 159 /** 160 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the pool, or get a new 161 * one if the pool is empty. Copied from hadoop-common without significant modification. 162 */ 163 public static Decompressor getDecompressor(CompressionCodec codec) { 164 Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec.getDecompressorType()); 165 if (decompressor == null) { 166 decompressor = codec.createDecompressor(); 167 LOG.debug("Got brand-new Decompressor [{}]", decompressor.getClass().getName()); 168 } else { 169 LOG.debug("Got recycled Decompressor [{}]", decompressor.getClass().getName()); 170 } 171 if (decompressor != null && !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 172 updateLeaseCount(decompressorCounts, decompressor, 1); 173 } 174 return decompressor; 175 } 176 177 public static ByteBuffDecompressor getByteBuffDecompressor(ByteBuffDecompressionCodec codec) { 178 ByteBuffDecompressor decompressor = 179 borrow(BYTE_BUFF_DECOMPRESSOR_POOL, codec.getByteBuffDecompressorType()); 180 if (decompressor == null) { 181 decompressor = codec.createByteBuffDecompressor(); 182 LOG.debug("Got brand-new ByteBuffDecompressor [{}]", decompressor.getClass().getName()); 183 } else { 184 LOG.debug("Got recycled ByteBuffDecompressor [{}]", decompressor.getClass().getName()); 185 } 186 return decompressor; 187 } 188 189 /** 190 * Return the {@link Compressor} to the pool. Copied from hadoop-common without significant 191 * modification. 192 */ 193 public static void returnCompressor(Compressor compressor) { 194 if (compressor == null) { 195 return; 196 } 197 // if the compressor can't be reused, don't pool it. 198 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 199 compressor.end(); 200 return; 201 } 202 compressor.reset(); 203 if (payback(COMPRESSOR_POOL, compressor)) { 204 updateLeaseCount(compressorCounts, compressor, -1); 205 } 206 } 207 208 /** 209 * Return the {@link Decompressor} to the pool. Copied from hadoop-common without significant 210 * modification. 211 */ 212 public static void returnDecompressor(Decompressor decompressor) { 213 if (decompressor == null) { 214 return; 215 } 216 // if the decompressor can't be reused, don't pool it. 217 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 218 decompressor.end(); 219 return; 220 } 221 decompressor.reset(); 222 if (payback(DECOMPRESSOR_POOL, decompressor)) { 223 updateLeaseCount(decompressorCounts, decompressor, -1); 224 } 225 } 226 227 public static void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) { 228 if (decompressor == null) { 229 return; 230 } 231 // if the decompressor can't be reused, don't pool it. 232 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 233 return; 234 } 235 payback(BYTE_BUFF_DECOMPRESSOR_POOL, decompressor); 236 } 237 238 /** 239 * Returns the number of leased {@link Compressor}s for this {@link CompressionCodec}. Copied from 240 * hadoop-common without significant modification. 241 */ 242 static int getLeasedCompressorsCount(@Nullable CompressionCodec codec) { 243 if (compressorCounts == null) { 244 throw new IllegalStateException("initLeaseCounting() not called to set up lease counting"); 245 } 246 return (codec == null) ? 0 : getLeaseCount(compressorCounts, codec.getCompressorType()); 247 } 248 249 /** 250 * Returns the number of leased {@link Decompressor}s for this {@link CompressionCodec}. Copied 251 * from hadoop-common without significant modification. 252 */ 253 static int getLeasedDecompressorsCount(@Nullable CompressionCodec codec) { 254 if (decompressorCounts == null) { 255 throw new IllegalStateException("initLeaseCounting() not called to set up lease counting"); 256 } 257 return (codec == null) ? 0 : getLeaseCount(decompressorCounts, codec.getDecompressorType()); 258 } 259}