001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import edu.umd.cs.findbugs.annotations.Nullable;
021import java.util.Comparator;
022import java.util.NavigableSet;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.ConcurrentSkipListSet;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.io.compress.CompressionCodec;
030import org.apache.hadoop.io.compress.Compressor;
031import org.apache.hadoop.io.compress.Decompressor;
032import org.apache.hadoop.io.compress.DoNotPool;
033import org.apache.hadoop.util.ReflectionUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
039import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
040import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
041
042/**
043 * A global compressor/decompressor pool used to save and reuse (possibly native)
044 * compression/decompression codecs. Copied from the class of the same name in hadoop-common and
045 * augmented to improve borrow/return performance.
046 */
047@InterfaceAudience.Private
048public class CodecPool {
049  private static final Logger LOG = LoggerFactory.getLogger(CodecPool.class);
050
051  private static final ConcurrentMap<Class<Compressor>, NavigableSet<Compressor>> COMPRESSOR_POOL =
052    new ConcurrentHashMap<>();
053
054  private static final ConcurrentMap<Class<Decompressor>,
055    NavigableSet<Decompressor>> DECOMPRESSOR_POOL = new ConcurrentHashMap<>();
056
057  private static final ConcurrentMap<Class<ByteBuffDecompressor>,
058    NavigableSet<ByteBuffDecompressor>> BYTE_BUFF_DECOMPRESSOR_POOL = new ConcurrentHashMap<>();
059
060  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache() {
061    return CacheBuilder.newBuilder().build(new CacheLoader<>() {
062      @Override
063      public AtomicInteger load(Class<T> key) throws Exception {
064        return new AtomicInteger();
065      }
066    });
067  }
068
069  /**
070   * Map to track the number of leased compressors. Only used in unit tests, kept null otherwise.
071   */
072  @Nullable
073  private static LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = null;
074
075  /**
076   * Map to tracks the number of leased decompressors. Only used in unit tests, kept null otherwise.
077   */
078  @Nullable
079  private static LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = null;
080
081  /**
082   * Call if you want lease counting to be enabled. Only used in unit tests.
083   */
084  static void initLeaseCounting() {
085    compressorCounts = createCache();
086    decompressorCounts = createCache();
087  }
088
089  private static <T> T borrow(ConcurrentMap<Class<T>, NavigableSet<T>> pool,
090    Class<? extends T> codecClass) {
091    if (codecClass == null) {
092      return null;
093    }
094
095    NavigableSet<T> codecSet = pool.get(codecClass);
096    if (codecSet != null) {
097      // If a copy of the codec is available, pollFirst() will grab one.
098      // If not, it will return null.
099      return codecSet.pollFirst();
100    } else {
101      return null;
102    }
103  }
104
105  private static <T> boolean payback(ConcurrentMap<Class<T>, NavigableSet<T>> pool, T codec) {
106    if (codec != null) {
107      Class<T> codecClass = ReflectionUtils.getClass(codec);
108      Set<T> codecSet = pool.computeIfAbsent(codecClass,
109        k -> new ConcurrentSkipListSet<>(Comparator.comparingInt(System::identityHashCode)));
110      return codecSet.add(codec);
111    }
112    return false;
113  }
114
115  /**
116   * Copied from hadoop-common without significant modification.
117   */
118  private static <T> int getLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
119    Class<? extends T> codecClass) {
120    return usageCounts.getUnchecked((Class<T>) codecClass).get();
121  }
122
123  /**
124   * Copied from hadoop-common without significant modification.
125   */
126  private static <T> void updateLeaseCount(LoadingCache<Class<T>, AtomicInteger> usageCounts,
127    T codec, int delta) {
128    if (codec != null && usageCounts != null) {
129      Class<T> codecClass = ReflectionUtils.getClass(codec);
130      usageCounts.getUnchecked(codecClass).addAndGet(delta);
131    }
132  }
133
134  /**
135   * Get a {@link Compressor} for the given {@link CompressionCodec} from the pool, or get a new one
136   * if the pool is empty. Copied from hadoop-common without significant modification.
137   */
138  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
139    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
140    if (compressor == null) {
141      compressor = codec.createCompressor();
142      LOG.info("Got brand-new compressor [" + codec.getDefaultExtension() + "]");
143    } else {
144      compressor.reinit(conf);
145      if (LOG.isDebugEnabled()) {
146        LOG.debug("Got recycled compressor");
147      }
148    }
149    if (compressor != null && !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
150      updateLeaseCount(compressorCounts, compressor, 1);
151    }
152    return compressor;
153  }
154
155  public static Compressor getCompressor(CompressionCodec codec) {
156    return getCompressor(codec, null);
157  }
158
159  /**
160   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the pool, or get a new
161   * one if the pool is empty. Copied from hadoop-common without significant modification.
162   */
163  public static Decompressor getDecompressor(CompressionCodec codec) {
164    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec.getDecompressorType());
165    if (decompressor == null) {
166      decompressor = codec.createDecompressor();
167      LOG.debug("Got brand-new Decompressor [{}]", decompressor.getClass().getName());
168    } else {
169      LOG.debug("Got recycled Decompressor [{}]", decompressor.getClass().getName());
170    }
171    if (decompressor != null && !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
172      updateLeaseCount(decompressorCounts, decompressor, 1);
173    }
174    return decompressor;
175  }
176
177  public static ByteBuffDecompressor getByteBuffDecompressor(ByteBuffDecompressionCodec codec) {
178    ByteBuffDecompressor decompressor =
179      borrow(BYTE_BUFF_DECOMPRESSOR_POOL, codec.getByteBuffDecompressorType());
180    if (decompressor == null) {
181      decompressor = codec.createByteBuffDecompressor();
182      LOG.debug("Got brand-new ByteBuffDecompressor [{}]", decompressor.getClass().getName());
183    } else {
184      LOG.debug("Got recycled ByteBuffDecompressor [{}]", decompressor.getClass().getName());
185    }
186    return decompressor;
187  }
188
189  /**
190   * Return the {@link Compressor} to the pool. Copied from hadoop-common without significant
191   * modification.
192   */
193  public static void returnCompressor(Compressor compressor) {
194    if (compressor == null) {
195      return;
196    }
197    // if the compressor can't be reused, don't pool it.
198    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
199      compressor.end();
200      return;
201    }
202    compressor.reset();
203    if (payback(COMPRESSOR_POOL, compressor)) {
204      updateLeaseCount(compressorCounts, compressor, -1);
205    }
206  }
207
208  /**
209   * Return the {@link Decompressor} to the pool. Copied from hadoop-common without significant
210   * modification.
211   */
212  public static void returnDecompressor(Decompressor decompressor) {
213    if (decompressor == null) {
214      return;
215    }
216    // if the decompressor can't be reused, don't pool it.
217    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
218      decompressor.end();
219      return;
220    }
221    decompressor.reset();
222    if (payback(DECOMPRESSOR_POOL, decompressor)) {
223      updateLeaseCount(decompressorCounts, decompressor, -1);
224    }
225  }
226
227  public static void returnByteBuffDecompressor(ByteBuffDecompressor decompressor) {
228    if (decompressor == null) {
229      return;
230    }
231    // if the decompressor can't be reused, don't pool it.
232    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
233      return;
234    }
235    payback(BYTE_BUFF_DECOMPRESSOR_POOL, decompressor);
236  }
237
238  /**
239   * Returns the number of leased {@link Compressor}s for this {@link CompressionCodec}. Copied from
240   * hadoop-common without significant modification.
241   */
242  static int getLeasedCompressorsCount(@Nullable CompressionCodec codec) {
243    if (compressorCounts == null) {
244      throw new IllegalStateException("initLeaseCounting() not called to set up lease counting");
245    }
246    return (codec == null) ? 0 : getLeaseCount(compressorCounts, codec.getCompressorType());
247  }
248
249  /**
250   * Returns the number of leased {@link Decompressor}s for this {@link CompressionCodec}. Copied
251   * from hadoop-common without significant modification.
252   */
253  static int getLeasedDecompressorsCount(@Nullable CompressionCodec codec) {
254    if (decompressorCounts == null) {
255      throw new IllegalStateException("initLeaseCounting() not called to set up lease counting");
256    }
257    return (codec == null) ? 0 : getLeaseCount(decompressorCounts, codec.getDecompressorType());
258  }
259}