001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.aircompressor; 019 020import io.airlift.compress.snappy.SnappyCompressor; 021import io.airlift.compress.snappy.SnappyDecompressor; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.CommonConfigurationKeys; 028import org.apache.hadoop.hbase.io.compress.CompressionUtil; 029import org.apache.hadoop.io.compress.BlockCompressorStream; 030import org.apache.hadoop.io.compress.BlockDecompressorStream; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.CompressionInputStream; 033import org.apache.hadoop.io.compress.CompressionOutputStream; 034import org.apache.hadoop.io.compress.Compressor; 035import org.apache.hadoop.io.compress.Decompressor; 036import org.apache.yetus.audience.InterfaceAudience; 037 038/** 039 * Hadoop snappy codec implemented with aircompressor. 040 * <p> 041 * This is data format compatible with Hadoop's native snappy codec. 042 */ 043@InterfaceAudience.Private 044public class SnappyCodec implements Configurable, CompressionCodec { 045 046 public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize"; 047 048 private Configuration conf; 049 050 public SnappyCodec() { 051 conf = new Configuration(); 052 } 053 054 @Override 055 public Configuration getConf() { 056 return conf; 057 } 058 059 @Override 060 public void setConf(Configuration conf) { 061 this.conf = conf; 062 } 063 064 @Override 065 public Compressor createCompressor() { 066 return new HadoopSnappyCompressor(); 067 } 068 069 @Override 070 public Decompressor createDecompressor() { 071 return new HadoopSnappyDecompressor(); 072 } 073 074 @Override 075 public CompressionInputStream createInputStream(InputStream in) throws IOException { 076 return createInputStream(in, createDecompressor()); 077 } 078 079 @Override 080 public CompressionInputStream createInputStream(InputStream in, Decompressor d) 081 throws IOException { 082 return new BlockDecompressorStream(in, d, getBufferSize(conf)); 083 } 084 085 @Override 086 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 087 return createOutputStream(out, createCompressor()); 088 } 089 090 @Override 091 public CompressionOutputStream createOutputStream(OutputStream out, Compressor c) 092 throws IOException { 093 int bufferSize = getBufferSize(conf); 094 return new BlockCompressorStream(out, c, bufferSize, 095 CompressionUtil.compressionOverhead(bufferSize)); 096 } 097 098 @Override 099 public Class<? extends Compressor> getCompressorType() { 100 return HadoopSnappyCompressor.class; 101 } 102 103 @Override 104 public Class<? extends Decompressor> getDecompressorType() { 105 return HadoopSnappyDecompressor.class; 106 } 107 108 @Override 109 public String getDefaultExtension() { 110 return ".snappy"; 111 } 112 113 @InterfaceAudience.Private 114 public class HadoopSnappyCompressor extends HadoopCompressor<SnappyCompressor> { 115 116 HadoopSnappyCompressor(SnappyCompressor compressor) { 117 super(compressor, SnappyCodec.getBufferSize(conf)); 118 } 119 120 HadoopSnappyCompressor() { 121 this(new SnappyCompressor()); 122 } 123 124 @Override 125 int getLevel(Configuration conf) { 126 return 0; 127 } 128 129 @Override 130 int getBufferSize(Configuration conf) { 131 return SnappyCodec.getBufferSize(conf); 132 } 133 134 } 135 136 @InterfaceAudience.Private 137 public class HadoopSnappyDecompressor extends HadoopDecompressor<SnappyDecompressor> { 138 139 HadoopSnappyDecompressor(SnappyDecompressor decompressor) { 140 super(decompressor, getBufferSize(conf)); 141 } 142 143 HadoopSnappyDecompressor() { 144 this(new SnappyDecompressor()); 145 } 146 147 } 148 149 // Package private 150 151 static int getBufferSize(Configuration conf) { 152 return conf.getInt(SNAPPY_BUFFER_SIZE_KEY, 153 conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, 154 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT)); 155 } 156 157}