001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import com.github.benmanes.caffeine.cache.Caffeine; 021import com.github.benmanes.caffeine.cache.LoadingCache; 022import edu.umd.cs.findbugs.annotations.Nullable; 023import java.util.Comparator; 024import java.util.NavigableSet; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.ConcurrentSkipListSet; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.Compressor; 033import org.apache.hadoop.io.compress.Decompressor; 034import org.apache.hadoop.io.compress.DoNotPool; 035import org.apache.hadoop.util.ReflectionUtils; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A global compressor/decompressor pool used to save and reuse (possibly native) 042 * compression/decompression codecs. Copied from the class of the same name in hadoop-common and 043 * augmented to improve borrow/return performance. 044 */ 045@InterfaceAudience.Private 046public class CodecPool { 047 private static final Logger LOG = LoggerFactory.getLogger(CodecPool.class); 048 049 private static final ConcurrentMap<Class<Compressor>, NavigableSet<Compressor>> COMPRESSOR_POOL = 050 new ConcurrentHashMap<>(); 051 052 private static final ConcurrentMap<Class<Decompressor>, 053 NavigableSet<Decompressor>> DECOMPRESSOR_POOL = new ConcurrentHashMap<>(); 054 055 private static final ConcurrentMap<Class<ByteBuffDecompressor>, 056 NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>(); 057 058 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() { 059 return Caffeine.newBuilder().build(key -> new AtomicInteger()); 060 } 061 062 /** 063 * Map to track the number of leased compressors. Only used in unit tests, kept null otherwise. 064 */ 065 @Nullable 066 private static LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = null; 067 068 /** 069 * Map to tracks the number of leased decompressors. Only used in unit tests, kept null otherwise. 070 */ 071 @Nullable 072 private static LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = null; 073 074 /** 075 * Call if you want lease counting to be enabled. Only used in unit tests. 076 */ 077 static void initLeaseCounting() { 078 compressorCounts = createCache(); 079 decompressorCounts = createCache(); 080 } 081 082 private static <T> T borrow(ConcurrentMap<Class<T>, NavigableSet<T>> pool, 083 Class<? extends T> codecClass) { 084 if (codecClass == null) { 085 return null; 086 } 087 088 NavigableSet<T> codecSet = pool.get(codecClass); 089 if (codecSet != null) { 090 // If a copy of the codec is available, pollFirst() will grab one. 091 // If not, it will return null. 092 return codecSet.pollFirst(); 093 } else { 094 return null; 095 } 096 } 097 098 private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool, T codec) { 099 if (codec != null) { 100 Class<T> codecClass = ReflectionUtils.getClass(codec); 101 Set<T> codecSet = pool.computeIfAbsent(codecClass, 102 k -> new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode))); 103 return codecSet.add(codec); 104 } 105 return false; 106 } 107 108 /** 109 * Copied from hadoop-common without significant modification. 110 */ 111 @SuppressWarnings("unchecked") 112 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 113 value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", 114 justification = "LoadingCache will compute value if absent") 115 private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts, 116 Class<? extends T> codecClass) { 117 return usageCounts.get((Class<T>) codecClass).get(); 118 } 119 120 /** 121 * Copied from hadoop-common without significant modification. 122 */ 123 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 124 value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE", 125 justification = "LoadingCache will compute value if absent") 126 private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts, 127 T codec, int delta) { 128 if (codec != null && usageCounts != null) { 129 Class<T> codecClass = ReflectionUtils.getClass(codec); 130 usageCounts.get(codecClass).addAndGet(delta); 131 } 132 } 133 134 /** 135 * Get a {@link Compressor} for the given {@link CompressionCodec} from the pool, or get a new one 136 * if the pool is empty. Copied from hadoop-common without significant modification. 137 */ 138 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 139 Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); 140 if (compressor == null) { 141 compressor = codec.createCompressor(); 142 LOG.info("Got brand-new compressor [" + codec.getDefaultExtension() + "]"); 143 } else { 144 compressor.reinit(conf); 145 if (LOG.isDebugEnabled()) { 146 LOG.debug("Got recycled compressor"); 147 } 148 } 149 if (compressor != null && !compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 150 updateLeaseCount(compressorCounts, compressor, 1); 151 } 152 return compressor; 153 } 154 155 public static Compressor getCompressor(CompressionCodec codec) { 156 return getCompressor(codec, null); 157 } 158 159 /** 160 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the pool, or get a new 161 * one if the pool is empty. Copied from hadoop-common without significant modification. 162 */ 163 public static Decompressor getDecompressor(CompressionCodec codec) { 164 Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec.getDecompressorType()); 165 if (decompressor == null) { 166 decompressor = codec.createDecompressor(); 167 LOG.info("Got brand-new Decompressor [" + codec.getDefaultExtension() + "]"); 168 } else { 169 if (LOG.isDebugEnabled()) { 170 LOG.debug("Got recycled Decompressor"); 171 } 172 } 173 if (decompressor != null && !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 174 updateLeaseCount(decompressorCounts, decompressor, 1); 175 } 176 return decompressor; 177 } 178 179 public static ByteBuffDecompressor getByteBuffDecompressor(ByteBuffDecompressionCodec codec) { 180 ByteBuffDecompressor decompressor = 181 borrow(BYTE_BUFF_DECOMPRESSOR_POOL, codec.getByteBuffDecompressorType()); 182 if (decompressor == null) { 183 decompressor = codec.createByteBuffDecompressor(); 184 LOG.info("Got brand-new ByteBuffDecompressor " + decompressor.getClass().getName()); 185 } else { 186 if (LOG.isDebugEnabled()) { 187 LOG.debug("Got recycled ByteBuffDecompressor"); 188 } 189 } 190 return decompressor; 191 } 192 193 /** 194 * Return the {@link Compressor} to the pool. Copied from hadoop-common without significant 195 * modification. 196 */ 197 public static void returnCompressor(Compressor compressor) { 198 if (compressor == null) { 199 return; 200 } 201 // if the compressor can't be reused, don't pool it. 202 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 203 compressor.end(); 204 return; 205 } 206 compressor.reset(); 207 if (payback(COMPRESSOR_POOL, compressor)) { 208 updateLeaseCount(compressorCounts, compressor, -1); 209 } 210 } 211 212 /** 213 * Return the {@link Decompressor} to the pool. Copied from hadoop-common without significant 214 * modification. 215 */ 216 public static void returnDecompressor(Decompressor decompressor) { 217 if (decompressor == null) { 218 return; 219 } 220 // if the decompressor can't be reused, don't pool it. 221 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 222 decompressor.end(); 223 return; 224 } 225 decompressor.reset(); 226 if (payback(DECOMPRESSOR_POOL, decompressor)) { 227 updateLeaseCount(decompressorCounts, decompressor, -1); 228 } 229 } 230 231 public static void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) { 232 if (decompressor == null) { 233 return; 234 } 235 // if the decompressor can't be reused, don't pool it. 236 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 237 return; 238 } 239 payback(BYTE_BUFF_DECOMPRESSOR_POOL, decompressor); 240 } 241 242 /** 243 * Returns the number of leased {@link Compressor}s for this {@link CompressionCodec}. Copied from 244 * hadoop-common without significant modification. 245 */ 246 static int getLeasedCompressorsCount(@Nullable CompressionCodec codec) { 247 if (compressorCounts == null) { 248 throw new IllegalStateException("initLeaseCounting() not called to set up lease counting"); 249 } 250 return (codec == null) ? 0 : getLeaseCount(compressorCounts, codec.getCompressorType()); 251 } 252 253 /** 254 * Returns the number of leased {@link Decompressor}s for this {@link CompressionCodec}. Copied 255 * from hadoop-common without significant modification. 256 */ 257 static int getLeasedDecompressorsCount(@Nullable CompressionCodec codec) { 258 if (decompressorCounts == null) { 259 throw new IllegalStateException("initLeaseCounting() not called to set up lease counting"); 260 } 261 return (codec == null) ? 0 : getLeaseCount(decompressorCounts, codec.getDecompressorType()); 262 } 263}