001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.aircompressor;
019
020import io.airlift.compress.snappy.SnappyCompressor;
021import io.airlift.compress.snappy.SnappyDecompressor;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.CommonConfigurationKeys;
028import org.apache.hadoop.hbase.io.compress.CompressionUtil;
029import org.apache.hadoop.io.compress.BlockCompressorStream;
030import org.apache.hadoop.io.compress.BlockDecompressorStream;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.CompressionInputStream;
033import org.apache.hadoop.io.compress.CompressionOutputStream;
034import org.apache.hadoop.io.compress.Compressor;
035import org.apache.hadoop.io.compress.Decompressor;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Hadoop snappy codec implemented with aircompressor.
040 * <p>
041 * This is data format compatible with Hadoop's native snappy codec.
042 */
043@InterfaceAudience.Private
044public class SnappyCodec implements Configurable, CompressionCodec {
045
046  public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
047
048  private Configuration conf;
049  private int bufferSize;
050
051  public SnappyCodec() {
052    conf = new Configuration();
053    bufferSize = getBufferSize(conf);
054  }
055
056  @Override
057  public Configuration getConf() {
058    return conf;
059  }
060
061  @Override
062  public void setConf(Configuration conf) {
063    this.conf = conf;
064    this.bufferSize = getBufferSize(conf);
065  }
066
067  @Override
068  public Compressor createCompressor() {
069    return new HadoopSnappyCompressor();
070  }
071
072  @Override
073  public Decompressor createDecompressor() {
074    return new HadoopSnappyDecompressor();
075  }
076
077  @Override
078  public CompressionInputStream createInputStream(InputStream in) throws IOException {
079    return createInputStream(in, createDecompressor());
080  }
081
082  @Override
083  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
084    throws IOException {
085    return new BlockDecompressorStream(in, d, bufferSize);
086  }
087
088  @Override
089  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
090    return createOutputStream(out, createCompressor());
091  }
092
093  @Override
094  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
095    throws IOException {
096    return new BlockCompressorStream(out, c, bufferSize,
097      CompressionUtil.compressionOverhead(bufferSize));
098  }
099
100  @Override
101  public Class<? extends Compressor> getCompressorType() {
102    return HadoopSnappyCompressor.class;
103  }
104
105  @Override
106  public Class<? extends Decompressor> getDecompressorType() {
107    return HadoopSnappyDecompressor.class;
108  }
109
110  @Override
111  public String getDefaultExtension() {
112    return ".snappy";
113  }
114
115  @InterfaceAudience.Private
116  public class HadoopSnappyCompressor extends HadoopCompressor<SnappyCompressor> {
117
118    HadoopSnappyCompressor(SnappyCompressor compressor) {
119      super(compressor, SnappyCodec.getBufferSize(conf));
120    }
121
122    HadoopSnappyCompressor() {
123      this(new SnappyCompressor());
124    }
125
126    @Override
127    int getLevel(Configuration conf) {
128      return 0;
129    }
130
131    @Override
132    int getBufferSize(Configuration conf) {
133      return SnappyCodec.getBufferSize(conf);
134    }
135
136  }
137
138  @InterfaceAudience.Private
139  public class HadoopSnappyDecompressor extends HadoopDecompressor<SnappyDecompressor> {
140
141    HadoopSnappyDecompressor(SnappyDecompressor decompressor) {
142      super(decompressor, getBufferSize(conf));
143    }
144
145    HadoopSnappyDecompressor() {
146      this(new SnappyDecompressor());
147    }
148
149  }
150
151  // Package private
152
153  static int getBufferSize(Configuration conf) {
154    return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
155      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
156        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
157  }
158
159}