001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.xerial;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import org.apache.hadoop.conf.Configurable;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.CommonConfigurationKeys;
026import org.apache.hadoop.io.compress.BlockCompressorStream;
027import org.apache.hadoop.io.compress.BlockDecompressorStream;
028import org.apache.hadoop.io.compress.CompressionCodec;
029import org.apache.hadoop.io.compress.CompressionInputStream;
030import org.apache.hadoop.io.compress.CompressionOutputStream;
031import org.apache.hadoop.io.compress.Compressor;
032import org.apache.hadoop.io.compress.Decompressor;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.xerial.snappy.Snappy;
037
038/**
039 * Hadoop Snappy codec implemented with Xerial Snappy.
040 * <p>
041 * This is data format compatible with Hadoop's native snappy codec.
042 */
043@InterfaceAudience.Private
044public class SnappyCodec implements Configurable, CompressionCodec {
045
046  public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
047
048  private static final Logger LOG = LoggerFactory.getLogger(SnappyCodec.class);
049  private Configuration conf;
050  private int bufferSize;
051  private static boolean loaded = false;
052  private static Throwable loadError;
053
054  static {
055    try {
056      Snappy.getNativeLibraryVersion();
057      loaded = true;
058    } catch (Throwable t) {
059      loadError = t;
060      LOG.error("The Snappy native libraries could not be loaded", t);
061    }
062  }
063
064  /** Return true if the native shared libraries were loaded; false otherwise. */
065  public static boolean isLoaded() {
066    return loaded;
067  }
068
069  public SnappyCodec() {
070    if (!isLoaded()) {
071      throw new RuntimeException("Snappy codec could not be loaded", loadError);
072    }
073    conf = new Configuration();
074    bufferSize = getBufferSize(conf);
075  }
076
077  @Override
078  public Configuration getConf() {
079    return conf;
080  }
081
082  @Override
083  public void setConf(Configuration conf) {
084    this.conf = conf;
085    this.bufferSize = getBufferSize(conf);
086  }
087
088  @Override
089  public Compressor createCompressor() {
090    return new SnappyCompressor(bufferSize);
091  }
092
093  @Override
094  public Decompressor createDecompressor() {
095    return new SnappyDecompressor(bufferSize);
096  }
097
098  @Override
099  public CompressionInputStream createInputStream(InputStream in) throws IOException {
100    return createInputStream(in, createDecompressor());
101  }
102
103  @Override
104  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
105    throws IOException {
106    return new BlockDecompressorStream(in, d, bufferSize);
107  }
108
109  @Override
110  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
111    return createOutputStream(out, createCompressor());
112  }
113
114  @Override
115  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
116    throws IOException {
117    return new BlockCompressorStream(out, c, bufferSize,
118      Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
119  }
120
121  @Override
122  public Class<? extends Compressor> getCompressorType() {
123    return SnappyCompressor.class;
124  }
125
126  @Override
127  public Class<? extends Decompressor> getDecompressorType() {
128    return SnappyDecompressor.class;
129  }
130
131  @Override
132  public String getDefaultExtension() {
133    return ".snappy";
134  }
135
136  // Package private
137
138  static int getBufferSize(Configuration conf) {
139    return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
140      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
141        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
142  }
143
144}