001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.InvocationTargetException; 027import java.util.EnumMap; 028import java.util.Map; 029import org.apache.commons.io.IOUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseInterfaceAudience; 032import org.apache.hadoop.hbase.io.TagCompressionContext; 033import org.apache.hadoop.hbase.io.compress.Compression; 034import org.apache.hadoop.hbase.io.util.Dictionary; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Context that holds the various dictionaries for compression in WAL. 043 * <p> 044 * CompressionContexts are not expected to be shared among threads. Multithreaded use may produce 045 * unexpected results. 046 */ 047@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX }) 048public class CompressionContext { 049 050 private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class); 051 052 public static final String ENABLE_WAL_TAGS_COMPRESSION = 053 "hbase.regionserver.wal.tags.enablecompression"; 054 055 public static final String ENABLE_WAL_VALUE_COMPRESSION = 056 "hbase.regionserver.wal.value.enablecompression"; 057 058 public static final String WAL_VALUE_COMPRESSION_TYPE = 059 "hbase.regionserver.wal.value.compression.type"; 060 061 public enum DictionaryIndex { 062 REGION, 063 TABLE, 064 FAMILY, 065 QUALIFIER, 066 ROW 067 } 068 069 /** 070 * Encapsulates the compression algorithm and its streams that we will use for value compression 071 * in this WAL. 072 */ 073 static class ValueCompressor { 074 075 static final int IO_BUFFER_SIZE = 64 * 1024; // bigger buffer improves large edit compress ratio 076 077 private final Compression.Algorithm algorithm; 078 private Compressor compressor; 079 private Decompressor decompressor; 080 private WALDecompressionBoundedDelegatingInputStream lowerIn; 081 private ByteArrayOutputStream lowerOut; 082 private InputStream compressedIn; 083 private OutputStream compressedOut; 084 085 public ValueCompressor(Compression.Algorithm algorithm) { 086 this.algorithm = algorithm; 087 } 088 089 public Compression.Algorithm getAlgorithm() { 090 return algorithm; 091 } 092 093 public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) throws IOException { 094 if (compressedOut == null) { 095 // Create the output streams here the first time around. 096 lowerOut = new ByteArrayOutputStream(); 097 if (compressor == null) { 098 compressor = algorithm.getCompressor(); 099 } 100 compressedOut = algorithm.createCompressionStream(lowerOut, compressor, IO_BUFFER_SIZE); 101 } 102 compressedOut.write(valueArray, valueOffset, valueLength); 103 compressedOut.flush(); 104 final byte[] compressed = lowerOut.toByteArray(); 105 lowerOut.reset(); // Reset now to minimize the overhead of keeping around the BAOS 106 return compressed; 107 } 108 109 public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset, 110 int outLength) throws IOException { 111 // Our input is a sequence of bounded byte ranges (call them segments), with 112 // BoundedDelegatingInputStream providing a way to switch in a new segment when the 113 // previous segment has been fully consumed. 114 115 // Create the input streams here the first time around. 116 if (compressedIn == null) { 117 lowerIn = new WALDecompressionBoundedDelegatingInputStream(); 118 if (decompressor == null) { 119 decompressor = algorithm.getDecompressor(); 120 } 121 compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE); 122 } 123 if (outLength == 0) { 124 // The BufferedInputStream will return earlier and skip reading anything if outLength == 0, 125 // but in fact for an empty value, the compressed output still contains some metadata so the 126 // compressed size is not 0, so here we need to manually skip inLength bytes otherwise the 127 // next read on this stream will start from an invalid position and cause critical problem, 128 // such as data loss when splitting wal or replicating wal. 129 IOUtils.skipFully(in, inLength); 130 } else { 131 lowerIn.reset(in, inLength); 132 IOUtils.readFully(compressedIn, outArray, outOffset, outLength); 133 // if the uncompressed size was larger than the configured buffer size for the codec, 134 // the BlockCompressorStream will have left an extra 4 bytes hanging. This represents a size 135 // for the next segment, and it should be 0. See HBASE-28390 136 if (lowerIn.available() == 4) { 137 int remaining = rawReadInt(lowerIn); 138 assert remaining == 0; 139 } 140 } 141 } 142 143 private int rawReadInt(InputStream in) throws IOException { 144 int b1 = in.read(); 145 int b2 = in.read(); 146 int b3 = in.read(); 147 int b4 = in.read(); 148 if ((b1 | b2 | b3 | b4) < 0) throw new EOFException(); 149 return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0)); 150 } 151 152 public void clear() { 153 if (compressedOut != null) { 154 try { 155 compressedOut.close(); 156 } catch (IOException e) { 157 LOG.warn("Exception closing compressed output stream", e); 158 } 159 } 160 compressedOut = null; 161 if (lowerOut != null) { 162 try { 163 lowerOut.close(); 164 } catch (IOException e) { 165 LOG.warn("Exception closing lower output stream", e); 166 } 167 } 168 lowerOut = null; 169 if (compressedIn != null) { 170 try { 171 compressedIn.close(); 172 } catch (IOException e) { 173 LOG.warn("Exception closing compressed input stream", e); 174 } 175 } 176 compressedIn = null; 177 if (lowerIn != null) { 178 try { 179 lowerIn.close(); 180 } catch (IOException e) { 181 LOG.warn("Exception closing lower input stream", e); 182 } 183 } 184 lowerIn = null; 185 if (compressor != null) { 186 compressor.reset(); 187 } 188 if (decompressor != null) { 189 decompressor.reset(); 190 } 191 } 192 193 } 194 195 private final Map<DictionaryIndex, Dictionary> dictionaries = 196 new EnumMap<>(DictionaryIndex.class); 197 // Context used for compressing tags 198 TagCompressionContext tagCompressionContext = null; 199 ValueCompressor valueCompressor = null; 200 201 public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits, 202 boolean hasTagCompression, boolean hasValueCompression, 203 Compression.Algorithm valueCompressionType) throws SecurityException, NoSuchMethodException, 204 InstantiationException, IllegalAccessException, InvocationTargetException, IOException { 205 Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor(); 206 for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { 207 Dictionary newDictionary = dictConstructor.newInstance(); 208 dictionaries.put(dictionaryIndex, newDictionary); 209 } 210 if (recoveredEdits) { 211 getDictionary(DictionaryIndex.REGION).init(1); 212 getDictionary(DictionaryIndex.TABLE).init(1); 213 } else { 214 getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE); 215 getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE); 216 } 217 218 getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE); 219 getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE); 220 getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE); 221 222 if (hasTagCompression) { 223 tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); 224 } 225 if (hasValueCompression && valueCompressionType != null) { 226 valueCompressor = new ValueCompressor(valueCompressionType); 227 } 228 } 229 230 public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits, 231 boolean hasTagCompression) throws SecurityException, NoSuchMethodException, 232 InstantiationException, IllegalAccessException, InvocationTargetException, IOException { 233 this(dictType, recoveredEdits, hasTagCompression, false, null); 234 } 235 236 public boolean hasTagCompression() { 237 return tagCompressionContext != null; 238 } 239 240 public boolean hasValueCompression() { 241 return valueCompressor != null; 242 } 243 244 public Dictionary getDictionary(Enum dictIndex) { 245 return dictionaries.get(dictIndex); 246 } 247 248 public ValueCompressor getValueCompressor() { 249 return valueCompressor; 250 } 251 252 void clear() { 253 for (Dictionary dictionary : dictionaries.values()) { 254 dictionary.clear(); 255 } 256 if (tagCompressionContext != null) { 257 tagCompressionContext.clear(); 258 } 259 if (valueCompressor != null) { 260 valueCompressor.clear(); 261 } 262 } 263 264 public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) { 265 if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) { 266 String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE); 267 if (compressionType != null) { 268 return Compression.getCompressionAlgorithmByName(compressionType); 269 } 270 return Compression.Algorithm.GZ; 271 } 272 return Compression.Algorithm.NONE; 273 } 274 275}