001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations 015 * under the License. 016 */ 017package org.apache.hadoop.hbase.io.compress; 018 019import java.io.ByteArrayOutputStream; 020import java.io.IOException; 021import java.io.OutputStream; 022import java.util.Arrays; 023import java.util.zip.GZIPOutputStream; 024 025import org.apache.hadoop.hbase.util.JVM; 026import org.apache.hadoop.io.compress.CompressionOutputStream; 027import org.apache.hadoop.io.compress.CompressorStream; 028import org.apache.hadoop.io.compress.GzipCodec; 029import org.apache.hadoop.io.compress.zlib.ZlibFactory; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression 036 * streams. 037 */ 038@InterfaceAudience.Private 039public class ReusableStreamGzipCodec extends GzipCodec { 040 041 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 042 043 /** 044 * A bridge that wraps around a DeflaterOutputStream to make it a 045 * CompressionOutputStream. 046 */ 047 protected static class ReusableGzipOutputStream extends CompressorStream { 048 049 private static final int GZIP_HEADER_LENGTH = 10; 050 051 /** 052 * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for 053 * details. 054 */ 055 private static final byte[] GZIP_HEADER; 056 057 static { 058 // Capture the fixed ten-byte header hard-coded in GZIPOutputStream. 059 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 060 byte[] header = null; 061 GZIPOutputStream gzipStream = null; 062 try { 063 gzipStream = new GZIPOutputStream(baos); 064 gzipStream.finish(); 065 header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH); 066 } catch (IOException e) { 067 throw new RuntimeException("Could not create gzip stream", e); 068 } finally { 069 if (gzipStream != null) { 070 try { 071 gzipStream.close(); 072 } catch (IOException e) { 073 LOG.error(e.toString(), e); 074 } 075 } 076 } 077 GZIP_HEADER = header; 078 } 079 080 private static class ResetableGZIPOutputStream extends GZIPOutputStream { 081 082 private static final int TRAILER_SIZE = 8; 083 private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken(); 084 085 public ResetableGZIPOutputStream(OutputStream out) throws IOException { 086 super(out); 087 } 088 089 public void resetState() throws IOException { 090 def.reset(); 091 crc.reset(); 092 out.write(GZIP_HEADER); 093 } 094 095 /** 096 * Override because certain implementation calls def.end() which 097 * causes problem when resetting the stream for reuse. 098 */ 099 @Override 100 public void finish() throws IOException { 101 if (HAS_BROKEN_FINISH) { 102 if (!def.finished()) { 103 def.finish(); 104 while (!def.finished()) { 105 int i = def.deflate(this.buf, 0, this.buf.length); 106 if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) { 107 writeTrailer(this.buf, i); 108 i += TRAILER_SIZE; 109 out.write(this.buf, 0, i); 110 111 return; 112 } 113 if (i > 0) { 114 out.write(this.buf, 0, i); 115 } 116 } 117 118 byte[] arrayOfByte = new byte[TRAILER_SIZE]; 119 writeTrailer(arrayOfByte, 0); 120 out.write(arrayOfByte); 121 } 122 } else { 123 super.finish(); 124 } 125 } 126 127 /** re-implement because the relative method in jdk is invisible */ 128 private void writeTrailer(byte[] paramArrayOfByte, int paramInt) 129 throws IOException { 130 writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt); 131 writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4); 132 } 133 134 /** re-implement because the relative method in jdk is invisible */ 135 private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2) 136 throws IOException { 137 writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2); 138 writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2); 139 } 140 141 /** re-implement because the relative method in jdk is invisible */ 142 private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2) 143 throws IOException { 144 paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF); 145 paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF); 146 } 147 } 148 149 public ReusableGzipOutputStream(OutputStream out) throws IOException { 150 super(new ResetableGZIPOutputStream(out)); 151 } 152 153 @Override 154 public void close() throws IOException { 155 out.close(); 156 } 157 158 @Override 159 public void flush() throws IOException { 160 out.flush(); 161 } 162 163 @Override 164 public void write(int b) throws IOException { 165 out.write(b); 166 } 167 168 @Override 169 public void write(byte[] data, int offset, int length) throws IOException { 170 out.write(data, offset, length); 171 } 172 173 @Override 174 public void finish() throws IOException { 175 ((GZIPOutputStream) out).finish(); 176 } 177 178 @Override 179 public void resetState() throws IOException { 180 ((ResetableGZIPOutputStream) out).resetState(); 181 } 182 } 183 184 @Override 185 public CompressionOutputStream createOutputStream(OutputStream out) 186 throws IOException { 187 if (ZlibFactory.isNativeZlibLoaded(getConf())) { 188 return super.createOutputStream(out); 189 } 190 return new ReusableGzipOutputStream(out); 191 } 192 193}