1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  package org.apache.hadoop.hbase.io.compress;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.io.OutputStream;
22  import java.util.Arrays;
23  import java.util.zip.GZIPOutputStream;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.util.JVM;
29  import org.apache.hadoop.io.compress.CompressionOutputStream;
30  import org.apache.hadoop.io.compress.CompressorStream;
31  import org.apache.hadoop.io.compress.GzipCodec;
32  import org.apache.hadoop.io.compress.zlib.ZlibFactory;
33  
34  
35  
36  
37  
38  @InterfaceAudience.Private
39  public class ReusableStreamGzipCodec extends GzipCodec {
40  
41    private static final Log LOG = LogFactory.getLog(Compression.class);
42  
43    
44  
45  
46  
47    protected static class ReusableGzipOutputStream extends CompressorStream {
48  
49      private static final int GZIP_HEADER_LENGTH = 10;
50  
51      
52  
53  
54  
55      private static final byte[] GZIP_HEADER;
56  
57      static {
58        
59        ByteArrayOutputStream baos = new ByteArrayOutputStream();
60        byte[] header = null;
61        GZIPOutputStream gzipStream = null;
62        try {
63          gzipStream  = new GZIPOutputStream(baos);
64          gzipStream.finish();
65          header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
66        } catch (IOException e) {
67          throw new RuntimeException("Could not create gzip stream", e);
68        } finally {
69          if (gzipStream != null) {
70            try {
71              gzipStream.close();
72            } catch (IOException e) {
73              LOG.error(e);
74            }
75          }
76        }
77        GZIP_HEADER = header;
78      }
79  
80      private static class ResetableGZIPOutputStream extends GZIPOutputStream {
81  
82        private static final int TRAILER_SIZE = 8;
83        private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken();
84  
85        public ResetableGZIPOutputStream(OutputStream out) throws IOException {
86          super(out);
87        }
88  
89        public void resetState() throws IOException {
90          def.reset();
91          crc.reset();
92          out.write(GZIP_HEADER);
93        }
94  
95        
96  
97  
98  
99        @Override
100       public void finish() throws IOException {
101         if (HAS_BROKEN_FINISH) { 
102           if (!def.finished()) {
103             def.finish();
104             while (!def.finished()) {
105               int i = def.deflate(this.buf, 0, this.buf.length);
106               if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
107                 writeTrailer(this.buf, i);
108                 i += TRAILER_SIZE;
109                 out.write(this.buf, 0, i);
110 
111                 return;
112               }
113               if (i > 0) {
114                 out.write(this.buf, 0, i);
115               }
116             }
117 
118             byte[] arrayOfByte = new byte[TRAILER_SIZE];
119             writeTrailer(arrayOfByte, 0);
120             out.write(arrayOfByte);
121           }
122         } else {
123           super.finish();
124         }
125       }
126 
127       
128       private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
129           throws IOException {
130         writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
131         writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
132       }
133 
134       
135       private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
136           throws IOException {
137         writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
138         writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
139       }
140 
141       
142       private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
143           throws IOException {
144         paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
145         paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
146       }
147     }
148 
149     public ReusableGzipOutputStream(OutputStream out) throws IOException {
150       super(new ResetableGZIPOutputStream(out));
151     }
152 
153     @Override
154     public void close() throws IOException {
155       out.close();
156     }
157 
158     @Override
159     public void flush() throws IOException {
160       out.flush();
161     }
162 
163     @Override
164     public void write(int b) throws IOException {
165       out.write(b);
166     }
167 
168     @Override
169     public void write(byte[] data, int offset, int length) throws IOException {
170       out.write(data, offset, length);
171     }
172 
173     @Override
174     public void finish() throws IOException {
175       ((GZIPOutputStream) out).finish();
176     }
177 
178     @Override
179     public void resetState() throws IOException {
180       ((ResetableGZIPOutputStream) out).resetState();
181     }
182   }
183 
184   @Override
185   public CompressionOutputStream createOutputStream(OutputStream out)
186       throws IOException {
187     if (ZlibFactory.isNativeZlibLoaded(getConf())) {
188       return super.createOutputStream(out);
189     }
190     return new ReusableGzipOutputStream(out);
191   }
192 
193 }