001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
021import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
022
023import java.io.IOException;
024import java.util.concurrent.ForkJoinPool;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
028import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
029import org.apache.hadoop.hbase.util.ReflectionUtils;
030import org.apache.hadoop.util.StringUtils;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035@InterfaceAudience.Private
036public final class BlockCacheFactory {
037
038  private static final Logger LOG = LoggerFactory.getLogger(BlockCacheFactory.class.getName());
039
040  /**
041   * Configuration keys for Bucket cache
042   */
043
044  /**
045   * Configuration key to cache block policy (Lru, TinyLfu, AdaptiveLRU, IndexOnlyLRU).
046   */
047  public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy";
048  public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU";
049
050  /**
051   * If the chosen ioengine can persist its state across restarts, the path to the file to persist
052   * to. This file is NOT the data file. It is a file into which we will serialize the map of what
053   * is in the data file. For example, if you pass the following argument as
054   * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
055   * <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
056   * <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
057   * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory
058   * state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
059   */
060  public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path";
061
062  public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
063
064  public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength";
065
066  /**
067   * A comma-delimited array of values for use as bucket sizes.
068   */
069  public static final String BUCKET_CACHE_BUCKETS_KEY = "hbase.bucketcache.bucket.sizes";
070
071  /**
072   * Defaults for Bucket cache
073   */
074  public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
075  public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
076
077  /**
078   * The target block size used by blockcache instances. Defaults to
079   * {@link HConstants#DEFAULT_BLOCKSIZE}.
080   */
081  public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.blockcache.minblocksize";
082
083  private static final String EXTERNAL_BLOCKCACHE_KEY = "hbase.blockcache.use.external";
084  private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false;
085
086  private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY = "hbase.blockcache.external.class";
087
088  /**
089   * @deprecated use {@link BlockCacheFactory#BLOCKCACHE_BLOCKSIZE_KEY} instead.
090   */
091  @Deprecated
092  static final String DEPRECATED_BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize";
093
094  private BlockCacheFactory() {
095  }
096
097  public static BlockCache createBlockCache(Configuration conf) {
098    FirstLevelBlockCache l1Cache = createFirstLevelCache(conf);
099    if (l1Cache == null) {
100      return null;
101    }
102    boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);
103    if (useExternal) {
104      BlockCache l2CacheInstance = createExternalBlockcache(conf);
105      return l2CacheInstance == null
106        ? l1Cache
107        : new InclusiveCombinedBlockCache(l1Cache, l2CacheInstance);
108    } else {
109      // otherwise use the bucket cache.
110      BucketCache bucketCache = createBucketCache(conf);
111      if (!conf.getBoolean("hbase.bucketcache.combinedcache.enabled", true)) {
112        // Non combined mode is off from 2.0
113        LOG.warn(
114          "From HBase 2.0 onwards only combined mode of LRU cache and bucket cache is available");
115      }
116      return bucketCache == null ? l1Cache : new CombinedBlockCache(l1Cache, bucketCache);
117    }
118  }
119
120  private static FirstLevelBlockCache createFirstLevelCache(final Configuration c) {
121    final long cacheSize = MemorySizeUtil.getOnHeapCacheSize(c);
122    if (cacheSize < 0) {
123      return null;
124    }
125    String policy = c.get(BLOCKCACHE_POLICY_KEY, BLOCKCACHE_POLICY_DEFAULT);
126    int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
127    LOG.info("Allocating BlockCache size=" + StringUtils.byteDesc(cacheSize) + ", blockSize="
128      + StringUtils.byteDesc(blockSize));
129    if (policy.equalsIgnoreCase("LRU")) {
130      return new LruBlockCache(cacheSize, blockSize, true, c);
131    } else if (policy.equalsIgnoreCase("IndexOnlyLRU")) {
132      return new IndexOnlyLruBlockCache(cacheSize, blockSize, true, c);
133    } else if (policy.equalsIgnoreCase("TinyLFU")) {
134      return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c);
135    } else if (policy.equalsIgnoreCase("AdaptiveLRU")) {
136      return new LruAdaptiveBlockCache(cacheSize, blockSize, true, c);
137    } else {
138      throw new IllegalArgumentException("Unknown policy: " + policy);
139    }
140  }
141
142  /**
143   * Enum of all built in external block caches. This is used for config.
144   */
145  private static enum ExternalBlockCaches {
146    memcached("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache");
147
148    // TODO(eclark): Consider more. Redis, etc.
149    Class<? extends BlockCache> clazz;
150
151    ExternalBlockCaches(String clazzName) {
152      try {
153        clazz = (Class<? extends BlockCache>) Class.forName(clazzName);
154      } catch (ClassNotFoundException cnef) {
155        clazz = null;
156      }
157    }
158
159    ExternalBlockCaches(Class<? extends BlockCache> clazz) {
160      this.clazz = clazz;
161    }
162  }
163
164  private static BlockCache createExternalBlockcache(Configuration c) {
165    if (LOG.isDebugEnabled()) {
166      LOG.debug("Trying to use External l2 cache");
167    }
168    Class klass = null;
169
170    // Get the class, from the config. s
171    try {
172      klass = ExternalBlockCaches.valueOf(c.get(EXTERNAL_BLOCKCACHE_CLASS_KEY, "memcache")).clazz;
173    } catch (IllegalArgumentException exception) {
174      try {
175        klass = c.getClass(EXTERNAL_BLOCKCACHE_CLASS_KEY,
176          Class.forName("org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache"));
177      } catch (ClassNotFoundException e) {
178        return null;
179      }
180    }
181
182    // Now try and create an instance of the block cache.
183    try {
184      LOG.info("Creating external block cache of type: " + klass);
185      return (BlockCache) ReflectionUtils.newInstance(klass, c);
186    } catch (Exception e) {
187      LOG.warn("Error creating external block cache", e);
188    }
189    return null;
190
191  }
192
193  private static BucketCache createBucketCache(Configuration c) {
194    // Check for L2. ioengine name must be non-null.
195    String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
196    if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) {
197      return null;
198    }
199
200    int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE);
201    final long bucketCacheSize = MemorySizeUtil.getBucketCacheSize(c);
202    if (bucketCacheSize <= 0) {
203      throw new IllegalStateException("bucketCacheSize <= 0; Check " + BUCKET_CACHE_SIZE_KEY
204        + " setting and/or server java heap size");
205    }
206    if (c.get("hbase.bucketcache.percentage.in.combinedcache") != null) {
207      LOG.warn("Configuration 'hbase.bucketcache.percentage.in.combinedcache' is no longer "
208        + "respected. See comments in http://hbase.apache.org/book.html#_changes_of_note");
209    }
210    int writerThreads =
211      c.getInt(BUCKET_CACHE_WRITER_THREADS_KEY, DEFAULT_BUCKET_CACHE_WRITER_THREADS);
212    int writerQueueLen = c.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY, DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
213    String persistentPath = c.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
214    String[] configuredBucketSizes = c.getStrings(BUCKET_CACHE_BUCKETS_KEY);
215    int[] bucketSizes = null;
216    if (configuredBucketSizes != null) {
217      bucketSizes = new int[configuredBucketSizes.length];
218      for (int i = 0; i < configuredBucketSizes.length; i++) {
219        int bucketSize = Integer.parseInt(configuredBucketSizes[i].trim());
220        if (bucketSize % 256 != 0) {
221          // We need all the bucket sizes to be multiples of 256. Having all the configured bucket
222          // sizes to be multiples of 256 will ensure that the block offsets within buckets,
223          // that are calculated, will also be multiples of 256.
224          // See BucketEntry where offset to each block is represented using 5 bytes (instead of 8
225          // bytes long). We would like to save heap overhead as less as possible.
226          throw new IllegalArgumentException("Illegal value: " + bucketSize + " configured for '"
227            + BUCKET_CACHE_BUCKETS_KEY + "'. All bucket sizes to be multiples of 256");
228        }
229        bucketSizes[i] = bucketSize;
230      }
231    }
232    BucketCache bucketCache = null;
233    try {
234      int ioErrorsTolerationDuration =
235        c.getInt("hbase.bucketcache.ioengine.errors.tolerated.duration",
236          BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
237      // Bucket cache logs its stats on creation internal to the constructor.
238      bucketCache = new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize,
239        bucketSizes, writerThreads, writerQueueLen, persistentPath, ioErrorsTolerationDuration, c);
240    } catch (IOException ioex) {
241      LOG.error("Can't instantiate bucket cache", ioex);
242      throw new RuntimeException(ioex);
243    }
244    return bucketCache;
245  }
246}