001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.lang.reflect.Constructor; 025import java.lang.reflect.InvocationTargetException; 026import java.util.EnumMap; 027import java.util.Map; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; 031import org.apache.hadoop.hbase.io.TagCompressionContext; 032import org.apache.hadoop.hbase.io.compress.Compression; 033import org.apache.hadoop.hbase.io.util.Dictionary; 034import org.apache.hadoop.io.compress.Compressor; 035import org.apache.hadoop.io.compress.Decompressor; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Context that holds the various dictionaries for compression in WAL. 042 * <p> 043 * CompressionContexts are not expected to be shared among threads. Multithreaded use may produce 044 * unexpected results. 045 */ 046@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 047public class CompressionContext { 048 049 private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class); 050 051 public static final String ENABLE_WAL_TAGS_COMPRESSION = 052 "hbase.regionserver.wal.tags.enablecompression"; 053 054 public static final String ENABLE_WAL_VALUE_COMPRESSION = 055 "hbase.regionserver.wal.value.enablecompression"; 056 057 public static final String WAL_VALUE_COMPRESSION_TYPE = 058 "hbase.regionserver.wal.value.compression.type"; 059 060 public enum DictionaryIndex { 061 REGION, 062 TABLE, 063 FAMILY, 064 QUALIFIER, 065 ROW 066 } 067 068 /** 069 * Encapsulates the compression algorithm and its streams that we will use for value compression 070 * in this WAL. 071 */ 072 static class ValueCompressor { 073 074 static final int IO_BUFFER_SIZE = 64 * 1024; // bigger buffer improves large edit compress ratio 075 076 private final Compression.Algorithm algorithm; 077 private Compressor compressor; 078 private Decompressor decompressor; 079 private BoundedDelegatingInputStream lowerIn; 080 private ByteArrayOutputStream lowerOut; 081 private InputStream compressedIn; 082 private OutputStream compressedOut; 083 084 public ValueCompressor(Compression.Algorithm algorithm) { 085 this.algorithm = algorithm; 086 } 087 088 public Compression.Algorithm getAlgorithm() { 089 return algorithm; 090 } 091 092 public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) throws IOException { 093 if (compressedOut == null) { 094 // Create the output streams here the first time around. 095 lowerOut = new ByteArrayOutputStream(); 096 if (compressor == null) { 097 compressor = algorithm.getCompressor(); 098 } 099 compressedOut = algorithm.createCompressionStream(lowerOut, compressor, IO_BUFFER_SIZE); 100 } 101 compressedOut.write(valueArray, valueOffset, valueLength); 102 compressedOut.flush(); 103 final byte[] compressed = lowerOut.toByteArray(); 104 lowerOut.reset(); // Reset now to minimize the overhead of keeping around the BAOS 105 return compressed; 106 } 107 108 public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset, 109 int outLength) throws IOException { 110 111 // Our input is a sequence of bounded byte ranges (call them segments), with 112 // BoundedDelegatingInputStream providing a way to switch in a new segment when the 113 // previous segment has been fully consumed. 114 115 // Create the input streams here the first time around. 116 if (compressedIn == null) { 117 lowerIn = new BoundedDelegatingInputStream(in, inLength); 118 if (decompressor == null) { 119 decompressor = algorithm.getDecompressor(); 120 } 121 compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE); 122 } else { 123 lowerIn.setDelegate(in, inLength); 124 } 125 126 // Caller must handle short reads. 127 // With current Hadoop compression codecs all 'outLength' bytes are read in here, so not 128 // an issue for now. 129 return compressedIn.read(outArray, outOffset, outLength); 130 } 131 132 public void clear() { 133 if (compressedOut != null) { 134 try { 135 compressedOut.close(); 136 } catch (IOException e) { 137 LOG.warn("Exception closing compressed output stream", e); 138 } 139 } 140 compressedOut = null; 141 if (lowerOut != null) { 142 try { 143 lowerOut.close(); 144 } catch (IOException e) { 145 LOG.warn("Exception closing lower output stream", e); 146 } 147 } 148 lowerOut = null; 149 if (compressedIn != null) { 150 try { 151 compressedIn.close(); 152 } catch (IOException e) { 153 LOG.warn("Exception closing compressed input stream", e); 154 } 155 } 156 compressedIn = null; 157 if (lowerIn != null) { 158 try { 159 lowerIn.close(); 160 } catch (IOException e) { 161 LOG.warn("Exception closing lower input stream", e); 162 } 163 } 164 lowerIn = null; 165 if (compressor != null) { 166 compressor.reset(); 167 } 168 if (decompressor != null) { 169 decompressor.reset(); 170 } 171 } 172 173 } 174 175 private final Map<DictionaryIndex, Dictionary> dictionaries = 176 new EnumMap<>(DictionaryIndex.class); 177 // Context used for compressing tags 178 TagCompressionContext tagCompressionContext = null; 179 ValueCompressor valueCompressor = null; 180 181 public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits, 182 boolean hasTagCompression, boolean hasValueCompression, 183 Compression.Algorithm valueCompressionType) throws SecurityException, NoSuchMethodException, 184 InstantiationException, IllegalAccessException, InvocationTargetException, IOException { 185 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor(); 186 for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { 187 Dictionary newDictionary = dictConstructor.newInstance(); 188 dictionaries.put(dictionaryIndex, newDictionary); 189 } 190 if (recoveredEdits) { 191 getDictionary(DictionaryIndex.REGION).init(1); 192 getDictionary(DictionaryIndex.TABLE).init(1); 193 } else { 194 getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE); 195 getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE); 196 } 197 198 getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE); 199 getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE); 200 getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE); 201 202 if (hasTagCompression) { 203 tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); 204 } 205 if (hasValueCompression && valueCompressionType != null) { 206 valueCompressor = new ValueCompressor(valueCompressionType); 207 } 208 } 209 210 public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits, 211 boolean hasTagCompression) throws SecurityException, NoSuchMethodException, 212 InstantiationException, IllegalAccessException, InvocationTargetException, IOException { 213 this(dictType, recoveredEdits, hasTagCompression, false, null); 214 } 215 216 public boolean hasTagCompression() { 217 return tagCompressionContext != null; 218 } 219 220 public boolean hasValueCompression() { 221 return valueCompressor != null; 222 } 223 224 public Dictionary getDictionary(Enum dictIndex) { 225 return dictionaries.get(dictIndex); 226 } 227 228 public ValueCompressor getValueCompressor() { 229 return valueCompressor; 230 } 231 232 void clear() { 233 for (Dictionary dictionary : dictionaries.values()) { 234 dictionary.clear(); 235 } 236 if (tagCompressionContext != null) { 237 tagCompressionContext.clear(); 238 } 239 if (valueCompressor != null) { 240 valueCompressor.clear(); 241 } 242 } 243 244 public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) { 245 if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) { 246 String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE); 247 if (compressionType != null) { 248 return Compression.getCompressionAlgorithmByName(compressionType); 249 } 250 return Compression.Algorithm.GZ; 251 } 252 return Compression.Algorithm.NONE; 253 } 254 255}