001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.aircompressor; 019 020import io.airlift.compress.lzo.LzoCompressor; 021import io.airlift.compress.lzo.LzoDecompressor; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.CommonConfigurationKeys; 028import org.apache.hadoop.hbase.io.compress.CompressionUtil; 029import org.apache.hadoop.io.compress.BlockCompressorStream; 030import org.apache.hadoop.io.compress.BlockDecompressorStream; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.CompressionInputStream; 033import org.apache.hadoop.io.compress.CompressionOutputStream; 034import org.apache.hadoop.io.compress.Compressor; 035import org.apache.hadoop.io.compress.Decompressor; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Hadoop Lzo codec implemented with aircompressor. 040 * <p> 041 * This is data format compatible with Hadoop's native lzo codec. 042 */ 043@InterfaceAudience.Private 044public class LzoCodec implements Configurable, CompressionCodec { 045 046 public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize"; 047 048 private Configuration conf; 049 private int bufferSize; 050 051 public LzoCodec() { 052 conf = new Configuration(); 053 bufferSize = getBufferSize(conf); 054 } 055 056 @Override 057 public Configuration getConf() { 058 return conf; 059 } 060 061 @Override 062 public void setConf(Configuration conf) { 063 this.conf = conf; 064 this.bufferSize = getBufferSize(conf); 065 } 066 067 @Override 068 public Compressor createCompressor() { 069 return new HadoopLzoCompressor(); 070 } 071 072 @Override 073 public Decompressor createDecompressor() { 074 return new HadoopLzoDecompressor(); 075 } 076 077 @Override 078 public CompressionInputStream createInputStream(InputStream in) throws IOException { 079 return createInputStream(in, createDecompressor()); 080 } 081 082 @Override 083 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 084 throws IOException { 085 return new BlockDecompressorStream(in, d, bufferSize); 086 } 087 088 @Override 089 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 090 return createOutputStream(out, createCompressor()); 091 } 092 093 @Override 094 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 095 throws IOException { 096 return new BlockCompressorStream(out, c, bufferSize, 097 CompressionUtil.compressionOverhead(bufferSize)); 098 } 099 100 @Override 101 public Class<? extends Compressor> getCompressorType() { 102 return HadoopLzoCompressor.class; 103 } 104 105 @Override 106 public Class<? extends Decompressor> getDecompressorType() { 107 return HadoopLzoDecompressor.class; 108 } 109 110 @Override 111 public String getDefaultExtension() { 112 return ".lzo"; 113 } 114 115 @InterfaceAudience.Private 116 public class HadoopLzoCompressor extends HadoopCompressor<LzoCompressor> { 117 118 HadoopLzoCompressor(LzoCompressor compressor) { 119 super(compressor, LzoCodec.getBufferSize(conf)); 120 } 121 122 HadoopLzoCompressor() { 123 this(new LzoCompressor()); 124 } 125 126 @Override 127 int getLevel(Configuration conf) { 128 return 0; 129 } 130 131 @Override 132 int getBufferSize(Configuration conf) { 133 return LzoCodec.getBufferSize(conf); 134 } 135 136 } 137 138 @InterfaceAudience.Private 139 public class HadoopLzoDecompressor extends HadoopDecompressor<LzoDecompressor> { 140 141 HadoopLzoDecompressor(LzoDecompressor decompressor) { 142 super(decompressor, getBufferSize(conf)); 143 } 144 145 HadoopLzoDecompressor() { 146 this(new LzoDecompressor()); 147 } 148 149 } 150 151 // Package private 152 153 static int getBufferSize(Configuration conf) { 154 return conf.getInt(LZO_BUFFER_SIZE_KEY, 155 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, 156 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT)); 157 } 158 159}