001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress.aircompressor;
019
020import io.airlift.compress.lzo.LzoCompressor;
021import io.airlift.compress.lzo.LzoDecompressor;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.CommonConfigurationKeys;
028import org.apache.hadoop.hbase.io.compress.CompressionUtil;
029import org.apache.hadoop.io.compress.BlockCompressorStream;
030import org.apache.hadoop.io.compress.BlockDecompressorStream;
031import org.apache.hadoop.io.compress.CompressionCodec;
032import org.apache.hadoop.io.compress.CompressionInputStream;
033import org.apache.hadoop.io.compress.CompressionOutputStream;
034import org.apache.hadoop.io.compress.Compressor;
035import org.apache.hadoop.io.compress.Decompressor;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * Hadoop Lzo codec implemented with aircompressor.
040 * <p>
041 * This is data format compatible with Hadoop's native lzo codec.
042 */
043@InterfaceAudience.Private
044public class LzoCodec implements Configurable, CompressionCodec {
045
046  public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize";
047
048  private Configuration conf;
049
050  public LzoCodec() {
051    conf = new Configuration();
052  }
053
054  @Override
055  public Configuration getConf() {
056    return conf;
057  }
058
059  @Override
060  public void setConf(Configuration conf) {
061    this.conf = conf;
062  }
063
064  @Override
065  public Compressor createCompressor() {
066    return new HadoopLzoCompressor();
067  }
068
069  @Override
070  public Decompressor createDecompressor() {
071    return new HadoopLzoDecompressor();
072  }
073
074  @Override
075  public CompressionInputStream createInputStream(InputStream in) throws IOException {
076    return createInputStream(in, createDecompressor());
077  }
078
079  @Override
080  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
081    throws IOException {
082    return new BlockDecompressorStream(in, d, getBufferSize(conf));
083  }
084
085  @Override
086  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
087    return createOutputStream(out, createCompressor());
088  }
089
090  @Override
091  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
092    throws IOException {
093    int bufferSize = getBufferSize(conf);
094    return new BlockCompressorStream(out, c, bufferSize,
095      CompressionUtil.compressionOverhead(bufferSize));
096  }
097
098  @Override
099  public Class<? extends Compressor> getCompressorType() {
100    return HadoopLzoCompressor.class;
101  }
102
103  @Override
104  public Class<? extends Decompressor> getDecompressorType() {
105    return HadoopLzoDecompressor.class;
106  }
107
108  @Override
109  public String getDefaultExtension() {
110    return ".lzo";
111  }
112
113  @InterfaceAudience.Private
114  public class HadoopLzoCompressor extends HadoopCompressor<LzoCompressor> {
115
116    HadoopLzoCompressor(LzoCompressor compressor) {
117      super(compressor, LzoCodec.getBufferSize(conf));
118    }
119
120    HadoopLzoCompressor() {
121      this(new LzoCompressor());
122    }
123
124    @Override
125    int getLevel(Configuration conf) {
126      return 0;
127    }
128
129    @Override
130    int getBufferSize(Configuration conf) {
131      return LzoCodec.getBufferSize(conf);
132    }
133
134  }
135
136  @InterfaceAudience.Private
137  public class HadoopLzoDecompressor extends HadoopDecompressor<LzoDecompressor> {
138
139    HadoopLzoDecompressor(LzoDecompressor decompressor) {
140      super(decompressor, getBufferSize(conf));
141    }
142
143    HadoopLzoDecompressor() {
144      this(new LzoDecompressor());
145    }
146
147  }
148
149  // Package private
150
151  static int getBufferSize(Configuration conf) {
152    return conf.getInt(LZO_BUFFER_SIZE_KEY,
153      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
154        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
155  }
156
157}