001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.util.Arrays; 024import java.util.zip.GZIPOutputStream; 025import org.apache.hadoop.hbase.util.JVM; 026import org.apache.hadoop.io.compress.CompressionOutputStream; 027import org.apache.hadoop.io.compress.CompressorStream; 028import org.apache.hadoop.io.compress.GzipCodec; 029import org.apache.hadoop.io.compress.zlib.ZlibFactory; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression streams. 036 */ 037@InterfaceAudience.Private 038public class ReusableStreamGzipCodec extends GzipCodec { 039 040 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 041 042 /** 043 * A bridge that wraps around a DeflaterOutputStream to make it a CompressionOutputStream. 044 */ 045 protected static class ReusableGzipOutputStream extends CompressorStream { 046 047 private static final int GZIP_HEADER_LENGTH = 10; 048 049 /** 050 * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for details. 051 */ 052 private static final byte[] GZIP_HEADER; 053 054 static { 055 // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. 056 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 057 byte[] header = null; 058 GZIPOutputStream gzipStream = null; 059 try { 060 gzipStream = new GZIPOutputStream(baos); 061 gzipStream.finish(); 062 header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); 063 } catch (IOException e) { 064 throw new RuntimeException("Could not create gzip stream", e); 065 } finally { 066 if (gzipStream != null) { 067 try { 068 gzipStream.close(); 069 } catch (IOException e) { 070 LOG.error(e.toString(), e); 071 } 072 } 073 } 074 GZIP_HEADER = header; 075 } 076 077 private static class ResetableGZIPOutputStream extends GZIPOutputStream { 078 079 private static final int TRAILER_SIZE = 8; 080 private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken(); 081 082 public ResetableGZIPOutputStream(OutputStream out) throws IOException { 083 super(out); 084 } 085 086 public void resetState() throws IOException { 087 def.reset(); 088 crc.reset(); 089 out.write(GZIP_HEADER); 090 } 091 092 /** 093 * Override because certain implementation calls def.end() which causes problem when resetting 094 * the stream for reuse. 095 */ 096 @Override 097 public void finish() throws IOException { 098 if (HAS_BROKEN_FINISH) { 099 if (!def.finished()) { 100 def.finish(); 101 while (!def.finished()) { 102 int i = def.deflate(this.buf, 0, this.buf.length); 103 if (def.finished() && (i <= this.buf.length - TRAILER_SIZE)) { 104 writeTrailer(this.buf, i); 105 i += TRAILER_SIZE; 106 out.write(this.buf, 0, i); 107 108 return; 109 } 110 if (i > 0) { 111 out.write(this.buf, 0, i); 112 } 113 } 114 115 byte[] arrayOfByte = new byte[TRAILER_SIZE]; 116 writeTrailer(arrayOfByte, 0); 117 out.write(arrayOfByte); 118 } 119 } else { 120 super.finish(); 121 } 122 } 123 124 /** re-implement because the relative method in jdk is invisible */ 125 private void writeTrailer(byte[] paramArrayOfByte, int paramInt) throws IOException { 126 writeInt((int) this.crc.getValue(), paramArrayOfByte, paramInt); 127 writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4); 128 } 129 130 /** re-implement because the relative method in jdk is invisible */ 131 private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2) 132 throws IOException { 133 writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2); 134 writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2); 135 } 136 137 /** re-implement because the relative method in jdk is invisible */ 138 private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2) 139 throws IOException { 140 paramArrayOfByte[paramInt2] = (byte) (paramInt1 & 0xFF); 141 paramArrayOfByte[(paramInt2 + 1)] = (byte) (paramInt1 >> 8 & 0xFF); 142 } 143 } 144 145 public ReusableGzipOutputStream(OutputStream out) throws IOException { 146 super(new ResetableGZIPOutputStream(out)); 147 } 148 149 @Override 150 public void close() throws IOException { 151 out.close(); 152 } 153 154 @Override 155 public void flush() throws IOException { 156 out.flush(); 157 } 158 159 @Override 160 public void write(int b) throws IOException { 161 out.write(b); 162 } 163 164 @Override 165 public void write(byte[] data, int offset, int length) throws IOException { 166 out.write(data, offset, length); 167 } 168 169 @Override 170 public void finish() throws IOException { 171 ((GZIPOutputStream) out).finish(); 172 } 173 174 @Override 175 public void resetState() throws IOException { 176 ((ResetableGZIPOutputStream) out).resetState(); 177 } 178 } 179 180 @Override 181 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { 182 if (ZlibFactory.isNativeZlibLoaded(getConf())) { 183 return super.createOutputStream(out); 184 } 185 return new ReusableGzipOutputStream(out); 186 } 187 188}