001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations
015 * under the License.
016 */
017package org.apache.hadoop.hbase.io.compress;
018
019import java.io.ByteArrayOutputStream;
020import java.io.IOException;
021import java.io.OutputStream;
022import java.util.Arrays;
023import java.util.zip.GZIPOutputStream;
024
025import org.apache.hadoop.hbase.util.JVM;
026import org.apache.hadoop.io.compress.CompressionOutputStream;
027import org.apache.hadoop.io.compress.CompressorStream;
028import org.apache.hadoop.io.compress.GzipCodec;
029import org.apache.hadoop.io.compress.zlib.ZlibFactory;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression
036 * streams.
037 */
038@InterfaceAudience.Private
039public class ReusableStreamGzipCodec extends GzipCodec {
040
041  private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
042
043  /**
044   * A bridge that wraps around a DeflaterOutputStream to make it a
045   * CompressionOutputStream.
046   */
047  protected static class ReusableGzipOutputStream extends CompressorStream {
048
049    private static final int GZIP_HEADER_LENGTH = 10;
050
051    /**
052     * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
053     * details.
054     */
055    private static final byte[] GZIP_HEADER;
056
057    static {
058      // Capture the fixed ten-byte header hard-coded in GZIPOutputStream.
059      ByteArrayOutputStream baos = new ByteArrayOutputStream();
060      byte[] header = null;
061      GZIPOutputStream gzipStream = null;
062      try {
063        gzipStream  = new GZIPOutputStream(baos);
064        gzipStream.finish();
065        header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
066      } catch (IOException e) {
067        throw new RuntimeException("Could not create gzip stream", e);
068      } finally {
069        if (gzipStream != null) {
070          try {
071            gzipStream.close();
072          } catch (IOException e) {
073            LOG.error(e.toString(), e);
074          }
075        }
076      }
077      GZIP_HEADER = header;
078    }
079
080    private static class ResetableGZIPOutputStream extends GZIPOutputStream {
081
082      private static final int TRAILER_SIZE = 8;
083      private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken();
084
085      public ResetableGZIPOutputStream(OutputStream out) throws IOException {
086        super(out);
087      }
088
089      public void resetState() throws IOException {
090        def.reset();
091        crc.reset();
092        out.write(GZIP_HEADER);
093      }
094
095      /**
096       * Override because certain implementation calls def.end() which
097       * causes problem when resetting the stream for reuse.
098       */
099      @Override
100      public void finish() throws IOException {
101        if (HAS_BROKEN_FINISH) {
102          if (!def.finished()) {
103            def.finish();
104            while (!def.finished()) {
105              int i = def.deflate(this.buf, 0, this.buf.length);
106              if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
107                writeTrailer(this.buf, i);
108                i += TRAILER_SIZE;
109                out.write(this.buf, 0, i);
110
111                return;
112              }
113              if (i > 0) {
114                out.write(this.buf, 0, i);
115              }
116            }
117
118            byte[] arrayOfByte = new byte[TRAILER_SIZE];
119            writeTrailer(arrayOfByte, 0);
120            out.write(arrayOfByte);
121          }
122        } else {
123          super.finish();
124        }
125      }
126
127      /** re-implement because the relative method in jdk is invisible */
128      private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
129          throws IOException {
130        writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
131        writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
132      }
133
134      /** re-implement because the relative method in jdk is invisible */
135      private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
136          throws IOException {
137        writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
138        writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
139      }
140
141      /** re-implement because the relative method in jdk is invisible */
142      private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
143          throws IOException {
144        paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
145        paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
146      }
147    }
148
149    public ReusableGzipOutputStream(OutputStream out) throws IOException {
150      super(new ResetableGZIPOutputStream(out));
151    }
152
153    @Override
154    public void close() throws IOException {
155      out.close();
156    }
157
158    @Override
159    public void flush() throws IOException {
160      out.flush();
161    }
162
163    @Override
164    public void write(int b) throws IOException {
165      out.write(b);
166    }
167
168    @Override
169    public void write(byte[] data, int offset, int length) throws IOException {
170      out.write(data, offset, length);
171    }
172
173    @Override
174    public void finish() throws IOException {
175      ((GZIPOutputStream) out).finish();
176    }
177
178    @Override
179    public void resetState() throws IOException {
180      ((ResetableGZIPOutputStream) out).resetState();
181    }
182  }
183
184  @Override
185  public CompressionOutputStream createOutputStream(OutputStream out)
186      throws IOException {
187    if (ZlibFactory.isNativeZlibLoaded(getConf())) {
188      return super.createOutputStream(out);
189    }
190    return new ReusableGzipOutputStream(out);
191  }
192
193}