001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import java.io.ByteArrayOutputStream;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FSDataInputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
035import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
036import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
037
038/**
039 * A utility class for managing compressor/decompressor dictionary loading and caching of load
040 * results. Useful for any codec that can support changing dictionaries at runtime, such as
041 * ZStandard.
042 */
043@InterfaceAudience.Private
044public final class DictionaryCache {
045
046  public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
047  public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
048  public static final String RESOURCE_SCHEME = "resource://";
049
050  private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
051  private static volatile LoadingCache<String, byte[]> CACHE;
052
053  private DictionaryCache() {
054  }
055
056  /**
057   * Load a dictionary or return a previously cached load.
058   * @param conf configuration
059   * @param path the hadoop Path where the dictionary is located, as a String
060   * @return the dictionary bytes if successful, null otherwise
061   */
062  public static byte[] getDictionary(final Configuration conf, final String path)
063    throws IOException {
064    if (path == null || path.isEmpty()) {
065      return null;
066    }
067    // Create the dictionary loading cache if we haven't already
068    if (CACHE == null) {
069      synchronized (DictionaryCache.class) {
070        if (CACHE == null) {
071          final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
072          CACHE = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES)
073            .build(new CacheLoader<String, byte[]>() {
074              @Override
075              public byte[] load(String s) throws Exception {
076                byte[] bytes;
077                if (path.startsWith(RESOURCE_SCHEME)) {
078                  bytes = loadFromResource(conf, path, maxSize);
079                } else {
080                  bytes = loadFromHadoopFs(conf, path, maxSize);
081                }
082                LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
083                return bytes;
084              }
085            });
086        }
087      }
088    }
089
090    // Get or load the dictionary for the given path
091    try {
092      return CACHE.get(path);
093    } catch (ExecutionException e) {
094      throw new IOException(e);
095    }
096  }
097
098  // Visible for testing
099  public static byte[] loadFromResource(final Configuration conf, final String s, final int maxSize)
100    throws IOException {
101    if (!s.startsWith(RESOURCE_SCHEME)) {
102      throw new IOException("Path does not start with " + RESOURCE_SCHEME);
103    }
104    final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
105    LOG.info("Loading resource {}", path);
106    final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
107    if (in == null) {
108      throw new FileNotFoundException("Resource " + path + " not found");
109    }
110    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
111    try {
112      final byte[] buffer = new byte[8192];
113      int n, len = 0;
114      do {
115        n = in.read(buffer);
116        if (n > 0) {
117          len += n;
118          if (len > maxSize) {
119            throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
120          }
121          baos.write(buffer, 0, n);
122        }
123      } while (n > 0);
124    } finally {
125      in.close();
126    }
127    return baos.toByteArray();
128  }
129
130  private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
131    final int maxSize) throws IOException {
132    final Path path = new Path(s);
133    final FileSystem fs = FileSystem.get(path.toUri(), conf);
134    LOG.info("Loading file {}", path);
135    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
136    final FSDataInputStream in = fs.open(path);
137    try {
138      final byte[] buffer = new byte[8192];
139      int n, len = 0;
140      do {
141        n = in.read(buffer);
142        if (n > 0) {
143          len += n;
144          if (len > maxSize) {
145            throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
146          }
147          baos.write(buffer, 0, n);
148        }
149      } while (n > 0);
150    } finally {
151      in.close();
152    }
153    return baos.toByteArray();
154  }
155
156  // Visible for testing
157  public static boolean contains(String dictionaryPath) {
158    if (CACHE != null) {
159      return CACHE.asMap().containsKey(dictionaryPath);
160    }
161    return false;
162  }
163
164}