001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.lang.reflect.Constructor;
025import java.lang.reflect.InvocationTargetException;
026import java.util.EnumMap;
027import java.util.Map;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
031import org.apache.hadoop.hbase.io.TagCompressionContext;
032import org.apache.hadoop.hbase.io.compress.Compression;
033import org.apache.hadoop.hbase.io.util.Dictionary;
034import org.apache.hadoop.io.compress.Compressor;
035import org.apache.hadoop.io.compress.Decompressor;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Context that holds the various dictionaries for compression in WAL.
042 * <p>
043 * CompressionContexts are not expected to be shared among threads. Multithreaded use may produce
044 * unexpected results.
045 */
046@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
047public class CompressionContext {
048
049  private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class);
050
051  public static final String ENABLE_WAL_TAGS_COMPRESSION =
052    "hbase.regionserver.wal.tags.enablecompression";
053
054  public static final String ENABLE_WAL_VALUE_COMPRESSION =
055    "hbase.regionserver.wal.value.enablecompression";
056
057  public static final String WAL_VALUE_COMPRESSION_TYPE =
058    "hbase.regionserver.wal.value.compression.type";
059
060  public enum DictionaryIndex {
061    REGION,
062    TABLE,
063    FAMILY,
064    QUALIFIER,
065    ROW
066  }
067
068  /**
069   * Encapsulates the compression algorithm and its streams that we will use for value compression
070   * in this WAL.
071   */
072  static class ValueCompressor {
073
074    static final int IO_BUFFER_SIZE = 64 * 1024; // bigger buffer improves large edit compress ratio
075
076    private final Compression.Algorithm algorithm;
077    private Compressor compressor;
078    private Decompressor decompressor;
079    private BoundedDelegatingInputStream lowerIn;
080    private ByteArrayOutputStream lowerOut;
081    private InputStream compressedIn;
082    private OutputStream compressedOut;
083
084    public ValueCompressor(Compression.Algorithm algorithm) {
085      this.algorithm = algorithm;
086    }
087
088    public Compression.Algorithm getAlgorithm() {
089      return algorithm;
090    }
091
092    public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) throws IOException {
093      if (compressedOut == null) {
094        // Create the output streams here the first time around.
095        lowerOut = new ByteArrayOutputStream();
096        if (compressor == null) {
097          compressor = algorithm.getCompressor();
098        }
099        compressedOut = algorithm.createCompressionStream(lowerOut, compressor, IO_BUFFER_SIZE);
100      }
101      compressedOut.write(valueArray, valueOffset, valueLength);
102      compressedOut.flush();
103      final byte[] compressed = lowerOut.toByteArray();
104      lowerOut.reset(); // Reset now to minimize the overhead of keeping around the BAOS
105      return compressed;
106    }
107
108    public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
109      int outLength) throws IOException {
110
111      // Our input is a sequence of bounded byte ranges (call them segments), with
112      // BoundedDelegatingInputStream providing a way to switch in a new segment when the
113      // previous segment has been fully consumed.
114
115      // Create the input streams here the first time around.
116      if (compressedIn == null) {
117        lowerIn = new BoundedDelegatingInputStream(in, inLength);
118        if (decompressor == null) {
119          decompressor = algorithm.getDecompressor();
120        }
121        compressedIn = algorithm.createDecompressionStream(lowerIn, decompressor, IO_BUFFER_SIZE);
122      } else {
123        lowerIn.setDelegate(in, inLength);
124      }
125
126      // Caller must handle short reads.
127      // With current Hadoop compression codecs all 'outLength' bytes are read in here, so not
128      // an issue for now.
129      return compressedIn.read(outArray, outOffset, outLength);
130    }
131
132    public void clear() {
133      if (compressedOut != null) {
134        try {
135          compressedOut.close();
136        } catch (IOException e) {
137          LOG.warn("Exception closing compressed output stream", e);
138        }
139      }
140      compressedOut = null;
141      if (lowerOut != null) {
142        try {
143          lowerOut.close();
144        } catch (IOException e) {
145          LOG.warn("Exception closing lower output stream", e);
146        }
147      }
148      lowerOut = null;
149      if (compressedIn != null) {
150        try {
151          compressedIn.close();
152        } catch (IOException e) {
153          LOG.warn("Exception closing compressed input stream", e);
154        }
155      }
156      compressedIn = null;
157      if (lowerIn != null) {
158        try {
159          lowerIn.close();
160        } catch (IOException e) {
161          LOG.warn("Exception closing lower input stream", e);
162        }
163      }
164      lowerIn = null;
165      if (compressor != null) {
166        compressor.reset();
167      }
168      if (decompressor != null) {
169        decompressor.reset();
170      }
171    }
172
173  }
174
175  private final Map<DictionaryIndex, Dictionary> dictionaries =
176    new EnumMap<>(DictionaryIndex.class);
177  // Context used for compressing tags
178  TagCompressionContext tagCompressionContext = null;
179  ValueCompressor valueCompressor = null;
180
181  public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
182    boolean hasTagCompression, boolean hasValueCompression,
183    Compression.Algorithm valueCompressionType) throws SecurityException, NoSuchMethodException,
184    InstantiationException, IllegalAccessException, InvocationTargetException, IOException {
185    Constructor<? extends Dictionary> dictConstructor = dictType.getConstructor();
186    for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) {
187      Dictionary newDictionary = dictConstructor.newInstance();
188      dictionaries.put(dictionaryIndex, newDictionary);
189    }
190    if (recoveredEdits) {
191      getDictionary(DictionaryIndex.REGION).init(1);
192      getDictionary(DictionaryIndex.TABLE).init(1);
193    } else {
194      getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE);
195      getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE);
196    }
197
198    getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE);
199    getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE);
200    getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE);
201
202    if (hasTagCompression) {
203      tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE);
204    }
205    if (hasValueCompression && valueCompressionType != null) {
206      valueCompressor = new ValueCompressor(valueCompressionType);
207    }
208  }
209
210  public CompressionContext(Class<? extends Dictionary> dictType, boolean recoveredEdits,
211    boolean hasTagCompression) throws SecurityException, NoSuchMethodException,
212    InstantiationException, IllegalAccessException, InvocationTargetException, IOException {
213    this(dictType, recoveredEdits, hasTagCompression, false, null);
214  }
215
216  public boolean hasTagCompression() {
217    return tagCompressionContext != null;
218  }
219
220  public boolean hasValueCompression() {
221    return valueCompressor != null;
222  }
223
224  public Dictionary getDictionary(Enum dictIndex) {
225    return dictionaries.get(dictIndex);
226  }
227
228  public ValueCompressor getValueCompressor() {
229    return valueCompressor;
230  }
231
232  void clear() {
233    for (Dictionary dictionary : dictionaries.values()) {
234      dictionary.clear();
235    }
236    if (tagCompressionContext != null) {
237      tagCompressionContext.clear();
238    }
239    if (valueCompressor != null) {
240      valueCompressor.clear();
241    }
242  }
243
244  public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) {
245    if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) {
246      String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE);
247      if (compressionType != null) {
248        return Compression.getCompressionAlgorithmByName(compressionType);
249      }
250      return Compression.Algorithm.GZ;
251    }
252    return Compression.Algorithm.NONE;
253  }
254
255}