001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import java.io.ByteArrayOutputStream; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.util.concurrent.ExecutionException; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FSDataInputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 035import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 036import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 037 038/** 039 * A utility class for managing compressor/decompressor dictionary loading and caching of load 040 * results. Useful for any codec that can support changing dictionaries at runtime, such as 041 * ZStandard. 042 */ 043@InterfaceAudience.Private 044public final class DictionaryCache { 045 046 public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size"; 047 public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024; 048 public static final String RESOURCE_SCHEME = "resource://"; 049 050 private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class); 051 private static volatile LoadingCache<String, byte[]> CACHE; 052 053 private DictionaryCache() { 054 } 055 056 /** 057 * Load a dictionary or return a previously cached load. 058 * @param conf configuration 059 * @param path the hadoop Path where the dictionary is located, as a String 060 * @return the dictionary bytes if successful, null otherwise 061 */ 062 public static byte[] getDictionary(final Configuration conf, final String path) 063 throws IOException { 064 if (path == null || path.isEmpty()) { 065 return null; 066 } 067 // Create the dictionary loading cache if we haven't already 068 if (CACHE == null) { 069 synchronized (DictionaryCache.class) { 070 if (CACHE == null) { 071 final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE); 072 CACHE = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(10, TimeUnit.MINUTES) 073 .build(new CacheLoader<String, byte[]>() { 074 @Override 075 public byte[] load(String s) throws Exception { 076 byte[] bytes; 077 if (path.startsWith(RESOURCE_SCHEME)) { 078 bytes = loadFromResource(conf, path, maxSize); 079 } else { 080 bytes = loadFromHadoopFs(conf, path, maxSize); 081 } 082 LOG.info("Loaded dictionary from {} (size {})", s, bytes.length); 083 return bytes; 084 } 085 }); 086 } 087 } 088 } 089 090 // Get or load the dictionary for the given path 091 try { 092 return CACHE.get(path); 093 } catch (ExecutionException e) { 094 throw new IOException(e); 095 } 096 } 097 098 // Visible for testing 099 public static byte[] loadFromResource(final Configuration conf, final String s, final int maxSize) 100 throws IOException { 101 if (!s.startsWith(RESOURCE_SCHEME)) { 102 throw new IOException("Path does not start with " + RESOURCE_SCHEME); 103 } 104 final String path = s.substring(RESOURCE_SCHEME.length(), s.length()); 105 LOG.info("Loading resource {}", path); 106 final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path); 107 if (in == null) { 108 throw new FileNotFoundException("Resource " + path + " not found"); 109 } 110 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 111 try { 112 final byte[] buffer = new byte[8192]; 113 int n, len = 0; 114 do { 115 n = in.read(buffer); 116 if (n > 0) { 117 len += n; 118 if (len > maxSize) { 119 throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize); 120 } 121 baos.write(buffer, 0, n); 122 } 123 } while (n > 0); 124 } finally { 125 in.close(); 126 } 127 return baos.toByteArray(); 128 } 129 130 private static byte[] loadFromHadoopFs(final Configuration conf, final String s, 131 final int maxSize) throws IOException { 132 final Path path = new Path(s); 133 final FileSystem fs = FileSystem.get(path.toUri(), conf); 134 LOG.info("Loading file {}", path); 135 final ByteArrayOutputStream baos = new ByteArrayOutputStream(); 136 final FSDataInputStream in = fs.open(path); 137 try { 138 final byte[] buffer = new byte[8192]; 139 int n, len = 0; 140 do { 141 n = in.read(buffer); 142 if (n > 0) { 143 len += n; 144 if (len > maxSize) { 145 throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize); 146 } 147 baos.write(buffer, 0, n); 148 } 149 } while (n > 0); 150 } finally { 151 in.close(); 152 } 153 return baos.toByteArray(); 154 } 155 156 // Visible for testing 157 public static boolean contains(String dictionaryPath) { 158 if (CACHE != null) { 159 return CACHE.asMap().containsKey(dictionaryPath); 160 } 161 return false; 162 } 163 164}