View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.io.OutputStream;
22  import java.util.Arrays;
23  import java.util.zip.GZIPOutputStream;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.util.JVM;
29  import org.apache.hadoop.io.compress.CompressionOutputStream;
30  import org.apache.hadoop.io.compress.CompressorStream;
31  import org.apache.hadoop.io.compress.GzipCodec;
32  import org.apache.hadoop.io.compress.zlib.ZlibFactory;
33  
34  /**
35   * Fixes an inefficiency in Hadoop's Gzip codec, allowing to reuse compression
36   * streams.
37   */
38  @InterfaceAudience.Private
39  public class ReusableStreamGzipCodec extends GzipCodec {
40  
41    private static final Log LOG = LogFactory.getLog(Compression.class);
42  
43    /**
44     * A bridge that wraps around a DeflaterOutputStream to make it a
45     * CompressionOutputStream.
46     */
47    protected static class ReusableGzipOutputStream extends CompressorStream {
48  
49      private static final int GZIP_HEADER_LENGTH = 10;
50  
51      /**
52       * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
53       * details.
54       */
55      private static final byte[] GZIP_HEADER;
56  
57      static {
58        // Capture the fixed ten-byte header hard-coded in GZIPOutputStream.
59        ByteArrayOutputStream baos = new ByteArrayOutputStream();
60        byte[] header = null;
61        GZIPOutputStream gzipStream = null;
62        try {
63          gzipStream  = new GZIPOutputStream(baos);
64          gzipStream.finish();
65          header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
66        } catch (IOException e) {
67          throw new RuntimeException("Could not create gzip stream", e);
68        } finally {
69          if (gzipStream != null) {
70            try {
71              gzipStream.close();
72            } catch (IOException e) {
73              LOG.error(e);
74            }
75          }
76        }
77        GZIP_HEADER = header;
78      }
79  
80      private static class ResetableGZIPOutputStream extends GZIPOutputStream {
81  
82        private static final int TRAILER_SIZE = 8;
83        private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken();
84  
85        public ResetableGZIPOutputStream(OutputStream out) throws IOException {
86          super(out);
87        }
88  
89        public void resetState() throws IOException {
90          def.reset();
91          crc.reset();
92          out.write(GZIP_HEADER);
93        }
94  
95        /**
96         * Override because certain implementation calls def.end() which
97         * causes problem when resetting the stream for reuse.
98         */
99        @Override
100       public void finish() throws IOException {
101         if (HAS_BROKEN_FINISH) { 
102           if (!def.finished()) {
103             def.finish();
104             while (!def.finished()) {
105               int i = def.deflate(this.buf, 0, this.buf.length);
106               if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
107                 writeTrailer(this.buf, i);
108                 i += TRAILER_SIZE;
109                 out.write(this.buf, 0, i);
110 
111                 return;
112               }
113               if (i > 0) {
114                 out.write(this.buf, 0, i);
115               }
116             }
117 
118             byte[] arrayOfByte = new byte[TRAILER_SIZE];
119             writeTrailer(arrayOfByte, 0);
120             out.write(arrayOfByte);
121           }
122         } else {
123           super.finish();
124         }
125       }
126 
127       /** re-implement because the relative method in jdk is invisible */
128       private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
129           throws IOException {
130         writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
131         writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
132       }
133 
134       /** re-implement because the relative method in jdk is invisible */
135       private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
136           throws IOException {
137         writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
138         writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
139       }
140 
141       /** re-implement because the relative method in jdk is invisible */
142       private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
143           throws IOException {
144         paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
145         paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
146       }
147     }
148 
149     public ReusableGzipOutputStream(OutputStream out) throws IOException {
150       super(new ResetableGZIPOutputStream(out));
151     }
152 
153     @Override
154     public void close() throws IOException {
155       out.close();
156     }
157 
158     @Override
159     public void flush() throws IOException {
160       out.flush();
161     }
162 
163     @Override
164     public void write(int b) throws IOException {
165       out.write(b);
166     }
167 
168     @Override
169     public void write(byte[] data, int offset, int length) throws IOException {
170       out.write(data, offset, length);
171     }
172 
173     @Override
174     public void finish() throws IOException {
175       ((GZIPOutputStream) out).finish();
176     }
177 
178     @Override
179     public void resetState() throws IOException {
180       ((ResetableGZIPOutputStream) out).resetState();
181     }
182   }
183 
184   @Override
185   public CompressionOutputStream createOutputStream(OutputStream out)
186       throws IOException {
187     if (ZlibFactory.isNativeZlibLoaded(getConf())) {
188       return super.createOutputStream(out);
189     }
190     return new ReusableGzipOutputStream(out);
191   }
192 
193 }