001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import com.github.benmanes.caffeine.cache.Caffeine;
021import com.github.benmanes.caffeine.cache.LoadingCache;
022import edu.umd.cs.findbugs.annotations.Nullable;
023import java.util.Comparator;
024import java.util.NavigableSet;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.ConcurrentSkipListSet;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.Compressor;
033import org.apache.hadoop.io.compress.Decompressor;
034import org.apache.hadoop.io.compress.DoNotPool;
035import org.apache.hadoop.util.ReflectionUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A global compressor/decompressor pool used to save and reuse (possibly native)
042 * compression/decompression codecs. Copied from the class of the same name in hadoop-common and
043 * augmented to improve borrow/return performance.
044 */
045@InterfaceAudience.Private
046public class CodecPool {
047  private static final Logger LOG = LoggerFactory.getLogger(CodecPool.class);
048
049  private static final ConcurrentMap<Class<Compressor>, NavigableSet<Compressor>> COMPRESSOR_POOL =
050    new ConcurrentHashMap<>();
051
052  private static final ConcurrentMap<Class<Decompressor>,
053    NavigableSet<Decompressor>> DECOMPRESSOR_POOL = new ConcurrentHashMap<>();
054
055  private static final ConcurrentMap<Class<ByteBuffDecompressor>,
056    NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>();
057
058  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() {
059    return Caffeine.newBuilder().build(key -> new AtomicInteger());
060  }
061
062  /**
063   * Map to track the number of leased compressors. Only used in unit tests, kept null otherwise.
064   */
065  @Nullable
066  private static LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = null;
067
068  /**
069   * Map to tracks the number of leased decompressors. Only used in unit tests, kept null otherwise.
070   */
071  @Nullable
072  private static LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = null;
073
074  /**
075   * Call if you want lease counting to be enabled. Only used in unit tests.
076   */
077  static void initLeaseCounting() {
078    compressorCounts = createCache();
079    decompressorCounts = createCache();
080  }
081
082  private static <T> T borrow(ConcurrentMap<Class<T>, NavigableSet<T>> pool,
083    Class<? extends T> codecClass) {
084    if (codecClass == null) {
085      return null;
086    }
087
088    NavigableSet<T> codecSet = pool.get(codecClass);
089    if (codecSet != null) {
090      // If a copy of the codec is available, pollFirst() will grab one.
091      // If not, it will return null.
092      return codecSet.pollFirst();
093    } else {
094      return null;
095    }
096  }
097
098  private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool, T codec) {
099    if (codec != null) {
100      Class<T> codecClass = ReflectionUtils.getClass(codec);
101      Set<T> codecSet = pool.computeIfAbsent(codecClass,
102        k -> new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode)));
103      return codecSet.add(codec);
104    }
105    return false;
106  }
107
108  /**
109   * Copied from hadoop-common without significant modification.
110   */
111  @SuppressWarnings("unchecked")
112  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
113      value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
114      justification = "LoadingCache will compute value if absent")
115  private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
116    Class<? extends T> codecClass) {
117    return usageCounts.get((Class<T>) codecClass).get();
118  }
119
120  /**
121   * Copied from hadoop-common without significant modification.
122   */
123  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
124      value = "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE",
125      justification = "LoadingCache will compute value if absent")
126  private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
127    T codec, int delta) {
128    if (codec != null && usageCounts != null) {
129      Class<T> codecClass = ReflectionUtils.getClass(codec);
130      usageCounts.get(codecClass).addAndGet(delta);
131    }
132  }
133
134  /**
135   * Get a {@link Compressor} for the given {@link CompressionCodec} from the pool, or get a new one
136   * if the pool is empty. Copied from hadoop-common without significant modification.
137   */
138  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
139    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
140    if (compressor == null) {
141      compressor = codec.createCompressor();
142      LOG.info("Got brand-new compressor [" + codec.getDefaultExtension() + "]");
143    } else {
144      compressor.reinit(conf);
145      if (LOG.isDebugEnabled()) {
146        LOG.debug("Got recycled compressor");
147      }
148    }
149    if (compressor != null && !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
150      updateLeaseCount(compressorCounts, compressor, 1);
151    }
152    return compressor;
153  }
154
155  public static Compressor getCompressor(CompressionCodec codec) {
156    return getCompressor(codec, null);
157  }
158
159  /**
160   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the pool, or get a new
161   * one if the pool is empty. Copied from hadoop-common without significant modification.
162   */
163  public static Decompressor getDecompressor(CompressionCodec codec) {
164    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec.getDecompressorType());
165    if (decompressor == null) {
166      decompressor = codec.createDecompressor();
167      LOG.info("Got brand-new Decompressor [" + codec.getDefaultExtension() + "]");
168    } else {
169      if (LOG.isDebugEnabled()) {
170        LOG.debug("Got recycled Decompressor");
171      }
172    }
173    if (decompressor != null && !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
174      updateLeaseCount(decompressorCounts, decompressor, 1);
175    }
176    return decompressor;
177  }
178
179  public static ByteBuffDecompressor getByteBuffDecompressor(ByteBuffDecompressionCodec codec) {
180    ByteBuffDecompressor decompressor =
181      borrow(BYTE_BUFF_DECOMPRESSOR_POOL, codec.getByteBuffDecompressorType());
182    if (decompressor == null) {
183      decompressor = codec.createByteBuffDecompressor();
184      LOG.info("Got brand-new ByteBuffDecompressor " + decompressor.getClass().getName());
185    } else {
186      if (LOG.isDebugEnabled()) {
187        LOG.debug("Got recycled ByteBuffDecompressor");
188      }
189    }
190    return decompressor;
191  }
192
193  /**
194   * Return the {@link Compressor} to the pool. Copied from hadoop-common without significant
195   * modification.
196   */
197  public static void returnCompressor(Compressor compressor) {
198    if (compressor == null) {
199      return;
200    }
201    // if the compressor can't be reused, don't pool it.
202    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
203      compressor.end();
204      return;
205    }
206    compressor.reset();
207    if (payback(COMPRESSOR_POOL, compressor)) {
208      updateLeaseCount(compressorCounts, compressor, -1);
209    }
210  }
211
212  /**
213   * Return the {@link Decompressor} to the pool. Copied from hadoop-common without significant
214   * modification.
215   */
216  public static void returnDecompressor(Decompressor decompressor) {
217    if (decompressor == null) {
218      return;
219    }
220    // if the decompressor can't be reused, don't pool it.
221    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
222      decompressor.end();
223      return;
224    }
225    decompressor.reset();
226    if (payback(DECOMPRESSOR_POOL, decompressor)) {
227      updateLeaseCount(decompressorCounts, decompressor, -1);
228    }
229  }
230
231  public static void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) {
232    if (decompressor == null) {
233      return;
234    }
235    // if the decompressor can't be reused, don't pool it.
236    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
237      return;
238    }
239    payback(BYTE_BUFF_DECOMPRESSOR_POOL, decompressor);
240  }
241
242  /**
243   * Returns the number of leased {@link Compressor}s for this {@link CompressionCodec}. Copied from
244   * hadoop-common without significant modification.
245   */
246  static int getLeasedCompressorsCount(@Nullable CompressionCodec codec) {
247    if (compressorCounts == null) {
248      throw new IllegalStateException("initLeaseCounting() not called to set up lease counting");
249    }
250    return (codec == null) ? 0 : getLeaseCount(compressorCounts, codec.getCompressorType());
251  }
252
253  /**
254   * Returns the number of leased {@link Decompressor}s for this {@link CompressionCodec}. Copied
255   * from hadoop-common without significant modification.
256   */
257  static int getLeasedDecompressorsCount(@Nullable CompressionCodec codec) {
258    if (decompressorCounts == null) {
259      throw new IllegalStateException("initLeaseCounting() not called to set up lease counting");
260    }
261    return (codec == null) ? 0 : getLeaseCount(decompressorCounts, codec.getDecompressorType());
262  }
263}