001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.compress;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.util.Arrays;
024import java.util.zip.GZIPOutputStream;
025import org.apache.hadoop.hbase.util.JVM;
026import org.apache.hadoop.io.compress.CompressionOutputStream;
027import org.apache.hadoop.io.compress.CompressorStream;
028import org.apache.hadoop.io.compress.GzipCodec;
029import org.apache.hadoop.io.compress.zlib.ZlibFactory;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression streams.
036 */
037@InterfaceAudience.Private
038public class ReusableStreamGzipCodec extends GzipCodec {
039
040  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
041
042  /**
043   * A bridge that wraps around a DeflaterOutputStream to make it a CompressionOutputStream.
044   */
045  protected static class ReusableGzipOutputStream extends CompressorStream {
046
047    private static final int GZIP_HEADER_LENGTH = 10;
048
049    /**
050     * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for details.
051     */
052    private static final byte[] GZIP_HEADER;
053
054    static {
055      // Capture the fixed ten-byte header hard-coded in GZIPOutputStream.
056      ByteArrayOutputStream baos = new ByteArrayOutputStream();
057      byte[] header = null;
058      GZIPOutputStream gzipStream = null;
059      try {
060        gzipStream = new GZIPOutputStream(baos);
061        gzipStream.finish();
062        header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
063      } catch (IOException e) {
064        throw new RuntimeException("Could not create gzip stream", e);
065      } finally {
066        if (gzipStream != null) {
067          try {
068            gzipStream.close();
069          } catch (IOException e) {
070            LOG.error(e.toString(), e);
071          }
072        }
073      }
074      GZIP_HEADER = header;
075    }
076
077    private static class ResetableGZIPOutputStream extends GZIPOutputStream {
078
079      private static final int TRAILER_SIZE = 8;
080      private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken();
081
082      public ResetableGZIPOutputStream(OutputStream out) throws IOException {
083        super(out);
084      }
085
086      public void resetState() throws IOException {
087        def.reset();
088        crc.reset();
089        out.write(GZIP_HEADER);
090      }
091
092      /**
093       * Override because certain implementation calls def.end() which causes problem when resetting
094       * the stream for reuse.
095       */
096      @Override
097      public void finish() throws IOException {
098        if (HAS_BROKEN_FINISH) {
099          if (!def.finished()) {
100            def.finish();
101            while (!def.finished()) {
102              int i = def.deflate(this.buf, 0, this.buf.length);
103              if (def.finished() && (i <= this.buf.length - TRAILER_SIZE)) {
104                writeTrailer(this.buf, i);
105                i += TRAILER_SIZE;
106                out.write(this.buf, 0, i);
107
108                return;
109              }
110              if (i > 0) {
111                out.write(this.buf, 0, i);
112              }
113            }
114
115            byte[] arrayOfByte = new byte[TRAILER_SIZE];
116            writeTrailer(arrayOfByte, 0);
117            out.write(arrayOfByte);
118          }
119        } else {
120          super.finish();
121        }
122      }
123
124      /** re-implement because the relative method in jdk is invisible */
125      private void writeTrailer(byte[] paramArrayOfByte, int paramInt) throws IOException {
126        writeInt((int) this.crc.getValue(), paramArrayOfByte, paramInt);
127        writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
128      }
129
130      /** re-implement because the relative method in jdk is invisible */
131      private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
132        throws IOException {
133        writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
134        writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
135      }
136
137      /** re-implement because the relative method in jdk is invisible */
138      private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
139        throws IOException {
140        paramArrayOfByte[paramInt2] = (byte) (paramInt1 & 0xFF);
141        paramArrayOfByte[(paramInt2 + 1)] = (byte) (paramInt1 >> 8 & 0xFF);
142      }
143    }
144
145    public ReusableGzipOutputStream(OutputStream out) throws IOException {
146      super(new ResetableGZIPOutputStream(out));
147    }
148
149    @Override
150    public void close() throws IOException {
151      out.close();
152    }
153
154    @Override
155    public void flush() throws IOException {
156      out.flush();
157    }
158
159    @Override
160    public void write(int b) throws IOException {
161      out.write(b);
162    }
163
164    @Override
165    public void write(byte[] data, int offset, int length) throws IOException {
166      out.write(data, offset, length);
167    }
168
169    @Override
170    public void finish() throws IOException {
171      ((GZIPOutputStream) out).finish();
172    }
173
174    @Override
175    public void resetState() throws IOException {
176      ((ResetableGZIPOutputStream) out).resetState();
177    }
178  }
179
180  @Override
181  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
182    if (ZlibFactory.isNativeZlibLoaded(getConf())) {
183      return super.createOutputStream(out);
184    }
185    return new ReusableGzipOutputStream(out);
186  }
187
188}