001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.InvocationTargetException; 027import java.util.EnumMap; 028import java.util.Map; 029import org.apache.commons.io.IOUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.io.TagCompressionContext; 033import org.apache.hadoop.hbase.io.compress.Compression; 034import org.apache.hadoop.hbase.io.util.Dictionary; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Context that holds the various dictionaries for compression in WAL. 043 * <p> 044 * CompressionContexts are not expected to be shared among threads. Multithreaded use may produce 045 * unexpected results. 046 */ 047@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 048public class CompressionContext { 049 050 private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class); 051 052 public static final String ENABLE_WAL_TAGS_COMPRESSION = 053 "hbase.regionserver.wal.tags.enablecompression"; 054 055 public static final String ENABLE_WAL_VALUE_COMPRESSION = 056 "hbase.regionserver.wal.value.enablecompression"; 057 058 public static final String WAL_VALUE_COMPRESSION_TYPE = 059 "hbase.regionserver.wal.value.compression.type"; 060 061 public enum DictionaryIndex { 062 REGION, 063 TABLE, 064 FAMILY, 065 QUALIFIER, 066 ROW 067 } 068 069 /** 070 * Encapsulates the compression algorithm and its streams that we will use for value compression 071 * in this WAL. 072 */ 073 static class ValueCompressor { 074 075 static final int IO_BUFFER_SIZE = 64 * 1024; // bigger buffer improves large edit compress ratio 076 077 private final Compression.Algorithm algorithm; 078 private Compressor compressor; 079 private Decompressor decompressor; 080 private WALDecompressionBoundedDelegatingInputStream lowerIn; 081 private ByteArrayOutputStream lowerOut; 082 private InputStream compressedIn; 083 private OutputStream compressedOut; 084 085 public ValueCompressor(Compression.Algorithm algorithm) { 086 this.algorithm = algorithm; 087 } 088 089 public Compression.Algorithm getAlgorithm() { 090 return algorithm; 091 } 092 093 public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) throws IOException { 094 if (compressedOut == null) { 095 // Create the output streams here the first time around. 096 lowerOut = new ByteArrayOutputStream(); 097 if (compressor == null) { 098 compressor = algorithm.getCompressor(); 099 } 100 compressedOut = algorithm.createCompressionStream(lowerOut, compressor, IO_BUFFER_SIZE); 101 } 102 compressedOut.write(valueArray, valueOffset, valueLength); 103 compressedOut.flush(); 104 final byte[] compressed = lowerOut.toByteArray(); 105 lowerOut.reset(); // Reset now to minimize the overhead of keeping around the BAOS 106 return compressed; 107 } 108 109 public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset, 110 int outLength) throws IOException { 111 // Our input is a sequence of bounded byte ranges (call them segments), with 112 // BoundedDelegatingInputStream providing a way to switch in a new segment when the 113 // previous segment has been fully consumed. 114 115 // Create the input streams here the first time around. 116 if (compressedIn == null) { 117 lowerIn = new WALDecompressionBoundedDelegatingInputStream(); 118 if (decompressor == null) { 119 decompressor = algorithm.getDecompressor(); 120 } 121 compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE); 122 } 123 if (outLength == 0) { 124 // The BufferedInputStream will return earlier and skip reading anything if outLength == 0, 125 // but in fact for an empty value, the compressed output still contains some metadata so the 126 // compressed size is not 0, so here we need to manually skip inLength bytes otherwise the 127 // next read on this stream will start from an invalid position and cause critical problem, 128 // such as data loss when splitting wal or replicating wal. 129 IOUtils.skipFully(in, inLength); 130 } else { 131 lowerIn.reset(in, inLength); 132 IOUtils.readFully(compressedIn, outArray, outOffset, outLength); 133 // if the uncompressed size was larger than the configured buffer size for the codec, 134 // the BlockCompressorStream will have left an extra 4 bytes hanging. This represents a size 135 // for the next segment, and it should be 0. See HBASE-28390 136 if (lowerIn.available() == 4) { 137 int remaining = rawReadInt(lowerIn); 138 assert remaining == 0; 139 } 140 } 141 } 142 143 /** 144 * Read an integer from the stream in big-endian byte order. 145 */ 146 private int rawReadInt(InputStream in) throws IOException { 147 int b1 = in.read(); 148 int b2 = in.read(); 149 int b3 = in.read(); 150 int b4 = in.read(); 151 if ((b1 | b2 | b3 | b4) < 0) throw new EOFException(); 152 return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0)); 153 } 154 155 public void clear() { 156 if (compressedOut != null) { 157 try { 158 compressedOut.close(); 159 } catch (IOException e) { 160 LOG.warn("Exception closing compressed output stream", e); 161 } 162 } 163 compressedOut = null; 164 if (lowerOut != null) { 165 try { 166 lowerOut.close(); 167 } catch (IOException e) { 168 LOG.warn("Exception closing lower output stream", e); 169 } 170 } 171 lowerOut = null; 172 if (compressedIn != null) { 173 try { 174 compressedIn.close(); 175 } catch (IOException e) { 176 LOG.warn("Exception closing compressed input stream", e); 177 } 178 } 179 compressedIn = null; 180 if (lowerIn != null) { 181 try { 182 lowerIn.close(); 183 } catch (IOException e) { 184 LOG.warn("Exception closing lower input stream", e); 185 } 186 } 187 lowerIn = null; 188 if (compressor != null) { 189 compressor.reset(); 190 } 191 if (decompressor != null) { 192 decompressor.reset(); 193 } 194 } 195 196 } 197 198 private final Map<DictionaryIndex, Dictionary> dictionaries = 199 new EnumMap<>(DictionaryIndex.class); 200 // Context used for compressing tags 201 TagCompressionContext tagCompressionContext = null; 202 ValueCompressor valueCompressor = null; 203 204 public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits, 205 boolean hasTagCompression, boolean hasValueCompression, 206 Compression.Algorithm valueCompressionType) throws SecurityException, NoSuchMethodException, 207 InstantiationException, IllegalAccessException, InvocationTargetException, IOException { 208 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor(); 209 for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { 210 Dictionary newDictionary = dictConstructor.newInstance(); 211 dictionaries.put(dictionaryIndex, newDictionary); 212 } 213 if (recoveredEdits) { 214 getDictionary(DictionaryIndex.REGION).init(1); 215 getDictionary(DictionaryIndex.TABLE).init(1); 216 } else { 217 getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE); 218 getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE); 219 } 220 221 getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE); 222 getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE); 223 getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE); 224 225 if (hasTagCompression) { 226 tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); 227 } 228 if (hasValueCompression && valueCompressionType != null) { 229 valueCompressor = new ValueCompressor(valueCompressionType); 230 } 231 } 232 233 public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits, 234 boolean hasTagCompression) throws SecurityException, NoSuchMethodException, 235 InstantiationException, IllegalAccessException, InvocationTargetException, IOException { 236 this(dictType, recoveredEdits, hasTagCompression, false, null); 237 } 238 239 public boolean hasTagCompression() { 240 return tagCompressionContext != null; 241 } 242 243 public boolean hasValueCompression() { 244 return valueCompressor != null; 245 } 246 247 public Dictionary getDictionary(Enum dictIndex) { 248 return dictionaries.get(dictIndex); 249 } 250 251 public ValueCompressor getValueCompressor() { 252 return valueCompressor; 253 } 254 255 void clear() { 256 for (Dictionary dictionary : dictionaries.values()) { 257 dictionary.clear(); 258 } 259 if (tagCompressionContext != null) { 260 tagCompressionContext.clear(); 261 } 262 if (valueCompressor != null) { 263 valueCompressor.clear(); 264 } 265 } 266 267 public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) { 268 if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) { 269 String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE); 270 if (compressionType != null) { 271 return Compression.getCompressionAlgorithmByName(compressionType); 272 } 273 return Compression.Algorithm.GZ; 274 } 275 return Compression.Algorithm.NONE; 276 } 277 278}