1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.compress;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.util.Arrays;
23 import java.util.zip.GZIPOutputStream;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.util.JVM;
29 import org.apache.hadoop.io.compress.CompressionOutputStream;
30 import org.apache.hadoop.io.compress.CompressorStream;
31 import org.apache.hadoop.io.compress.GzipCodec;
32 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class ReusableStreamGzipCodec extends GzipCodec {
40
41 private static final Log LOG = LogFactory.getLog(Compression.class);
42
43
44
45
46
47 protected static class ReusableGzipOutputStream extends CompressorStream {
48
49 private static final int GZIP_HEADER_LENGTH = 10;
50
51
52
53
54
55 private static final byte[] GZIP_HEADER;
56
57 static {
58
59 ByteArrayOutputStream baos = new ByteArrayOutputStream();
60 byte[] header = null;
61 GZIPOutputStream gzipStream = null;
62 try {
63 gzipStream = new GZIPOutputStream(baos);
64 gzipStream.finish();
65 header = Arrays.copyOfRange(baos.toByteArray(), 0, GZIP_HEADER_LENGTH);
66 } catch (IOException e) {
67 throw new RuntimeException("Could not create gzip stream", e);
68 } finally {
69 if (gzipStream != null) {
70 try {
71 gzipStream.close();
72 } catch (IOException e) {
73 LOG.error(e);
74 }
75 }
76 }
77 GZIP_HEADER = header;
78 }
79
80 private static class ResetableGZIPOutputStream extends GZIPOutputStream {
81
82 private static final int TRAILER_SIZE = 8;
83 private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken();
84
85 public ResetableGZIPOutputStream(OutputStream out) throws IOException {
86 super(out);
87 }
88
89 public void resetState() throws IOException {
90 def.reset();
91 crc.reset();
92 out.write(GZIP_HEADER);
93 }
94
95
96
97
98
99 @Override
100 public void finish() throws IOException {
101 if (HAS_BROKEN_FINISH) {
102 if (!def.finished()) {
103 def.finish();
104 while (!def.finished()) {
105 int i = def.deflate(this.buf, 0, this.buf.length);
106 if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
107 writeTrailer(this.buf, i);
108 i += TRAILER_SIZE;
109 out.write(this.buf, 0, i);
110
111 return;
112 }
113 if (i > 0) {
114 out.write(this.buf, 0, i);
115 }
116 }
117
118 byte[] arrayOfByte = new byte[TRAILER_SIZE];
119 writeTrailer(arrayOfByte, 0);
120 out.write(arrayOfByte);
121 }
122 } else {
123 super.finish();
124 }
125 }
126
127
128 private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
129 throws IOException {
130 writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
131 writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
132 }
133
134
135 private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
136 throws IOException {
137 writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
138 writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
139 }
140
141
142 private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
143 throws IOException {
144 paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
145 paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
146 }
147 }
148
149 public ReusableGzipOutputStream(OutputStream out) throws IOException {
150 super(new ResetableGZIPOutputStream(out));
151 }
152
153 @Override
154 public void close() throws IOException {
155 out.close();
156 }
157
158 @Override
159 public void flush() throws IOException {
160 out.flush();
161 }
162
163 @Override
164 public void write(int b) throws IOException {
165 out.write(b);
166 }
167
168 @Override
169 public void write(byte[] data, int offset, int length) throws IOException {
170 out.write(data, offset, length);
171 }
172
173 @Override
174 public void finish() throws IOException {
175 ((GZIPOutputStream) out).finish();
176 }
177
178 @Override
179 public void resetState() throws IOException {
180 ((ResetableGZIPOutputStream) out).resetState();
181 }
182 }
183
184 @Override
185 public CompressionOutputStream createOutputStream(OutputStream out)
186 throws IOException {
187 if (ZlibFactory.isNativeZlibLoaded(getConf())) {
188 return super.createOutputStream(out);
189 }
190 return new ReusableGzipOutputStream(out);
191 }
192
193 }