1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.compress;
18
19 import java.io.BufferedInputStream;
20 import java.io.BufferedOutputStream;
21 import java.io.FilterOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configurable;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.IOUtils;
33 import org.apache.hadoop.io.compress.CodecPool;
34 import org.apache.hadoop.io.compress.CompressionCodec;
35 import org.apache.hadoop.io.compress.CompressionInputStream;
36 import org.apache.hadoop.io.compress.CompressionOutputStream;
37 import org.apache.hadoop.io.compress.Compressor;
38 import org.apache.hadoop.io.compress.Decompressor;
39 import org.apache.hadoop.io.compress.DefaultCodec;
40 import org.apache.hadoop.io.compress.DoNotPool;
41 import org.apache.hadoop.io.compress.GzipCodec;
42 import org.apache.hadoop.util.ReflectionUtils;
43
44
45
46
47
48 @InterfaceAudience.Private
49 public final class Compression {
50 static final Log LOG = LogFactory.getLog(Compression.class);
51
52
53
54
55 private Compression() {
56 super();
57 }
58
59 static class FinishOnFlushCompressionStream extends FilterOutputStream {
60 public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61 super(cout);
62 }
63
64 @Override
65 public void write(byte b[], int off, int len) throws IOException {
66 out.write(b, off, len);
67 }
68
69 @Override
70 public void flush() throws IOException {
71 CompressionOutputStream cout = (CompressionOutputStream) out;
72 cout.finish();
73 cout.flush();
74 cout.resetState();
75 }
76 }
77
78
79
80
81 private static ClassLoader getClassLoaderForCodec() {
82 ClassLoader cl = Thread.currentThread().getContextClassLoader();
83 if (cl == null) {
84 cl = Compression.class.getClassLoader();
85 }
86 if (cl == null) {
87 cl = ClassLoader.getSystemClassLoader();
88 }
89 if (cl == null) {
90 throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91 }
92 return cl;
93 }
94
95
96
97
98
99
100 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101 value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102 justification="We are not serializing so doesn't apply (not sure why transient though)")
103 @InterfaceAudience.Public
104 @InterfaceStability.Evolving
105 public static enum Algorithm {
106 LZO("lzo") {
107
108 private volatile transient CompressionCodec lzoCodec;
109 private transient Object lock = new Object();
110
111 @Override
112 CompressionCodec getCodec(Configuration conf) {
113 if (lzoCodec == null) {
114 synchronized (lock) {
115 if (lzoCodec == null) {
116 lzoCodec = buildCodec(conf);
117 }
118 }
119 }
120 return lzoCodec;
121 }
122
123 private CompressionCodec buildCodec(Configuration conf) {
124 try {
125 Class<?> externalCodec =
126 getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128 new Configuration(conf));
129 } catch (ClassNotFoundException e) {
130 throw new RuntimeException(e);
131 }
132 }
133 },
134 GZ("gz") {
135 private volatile transient GzipCodec codec;
136 private transient Object lock = new Object();
137
138 @Override
139 DefaultCodec getCodec(Configuration conf) {
140 if (codec == null) {
141 synchronized (lock) {
142 if (codec == null) {
143 codec = buildCodec(conf);
144 }
145 }
146 }
147
148 return codec;
149 }
150
151 private GzipCodec buildCodec(Configuration conf) {
152 GzipCodec gzcodec = new ReusableStreamGzipCodec();
153 gzcodec.setConf(new Configuration(conf));
154 return gzcodec;
155 }
156 },
157
158 NONE("none") {
159 @Override
160 DefaultCodec getCodec(Configuration conf) {
161 return null;
162 }
163
164 @Override
165 public synchronized InputStream createDecompressionStream(
166 InputStream downStream, Decompressor decompressor,
167 int downStreamBufferSize) throws IOException {
168 if (downStreamBufferSize > 0) {
169 return new BufferedInputStream(downStream, downStreamBufferSize);
170 }
171 return downStream;
172 }
173
174 @Override
175 public synchronized OutputStream createCompressionStream(
176 OutputStream downStream, Compressor compressor,
177 int downStreamBufferSize) throws IOException {
178 if (downStreamBufferSize > 0) {
179 return new BufferedOutputStream(downStream, downStreamBufferSize);
180 }
181
182 return downStream;
183 }
184 },
185 SNAPPY("snappy") {
186
187 private volatile transient CompressionCodec snappyCodec;
188 private transient Object lock = new Object();
189
190 @Override
191 CompressionCodec getCodec(Configuration conf) {
192 if (snappyCodec == null) {
193 synchronized (lock) {
194 if (snappyCodec == null) {
195 snappyCodec = buildCodec(conf);
196 }
197 }
198 }
199 return snappyCodec;
200 }
201
202 private CompressionCodec buildCodec(Configuration conf) {
203 try {
204 Class<?> externalCodec =
205 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
206 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
207 } catch (ClassNotFoundException e) {
208 throw new RuntimeException(e);
209 }
210 }
211 },
212 LZ4("lz4") {
213
214 private volatile transient CompressionCodec lz4Codec;
215 private transient Object lock = new Object();
216
217 @Override
218 CompressionCodec getCodec(Configuration conf) {
219 if (lz4Codec == null) {
220 synchronized (lock) {
221 if (lz4Codec == null) {
222 lz4Codec = buildCodec(conf);
223 }
224 }
225 }
226 return lz4Codec;
227 }
228
229 private CompressionCodec buildCodec(Configuration conf) {
230 try {
231 Class<?> externalCodec =
232 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
233 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
234 } catch (ClassNotFoundException e) {
235 throw new RuntimeException(e);
236 }
237 }
238 };
239
240 private final Configuration conf;
241 private final String compressName;
242
243 private static final int DATA_IBUF_SIZE = 1 * 1024;
244
245 private static final int DATA_OBUF_SIZE = 4 * 1024;
246
247 Algorithm(String name) {
248 this.conf = new Configuration();
249 this.conf.setBoolean("io.native.lib.available", true);
250 this.compressName = name;
251 }
252
253 abstract CompressionCodec getCodec(Configuration conf);
254
255 public InputStream createDecompressionStream(
256 InputStream downStream, Decompressor decompressor,
257 int downStreamBufferSize) throws IOException {
258 CompressionCodec codec = getCodec(conf);
259
260 if (downStreamBufferSize > 0) {
261 ((Configurable)codec).getConf().setInt("io.file.buffer.size",
262 downStreamBufferSize);
263 }
264 CompressionInputStream cis =
265 codec.createInputStream(downStream, decompressor);
266 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
267 return bis2;
268
269 }
270
271 public OutputStream createCompressionStream(
272 OutputStream downStream, Compressor compressor, int downStreamBufferSize)
273 throws IOException {
274 OutputStream bos1 = null;
275 if (downStreamBufferSize > 0) {
276 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
277 }
278 else {
279 bos1 = downStream;
280 }
281 CompressionOutputStream cos =
282 createPlainCompressionStream(bos1, compressor);
283 BufferedOutputStream bos2 =
284 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
285 DATA_OBUF_SIZE);
286 return bos2;
287 }
288
289
290
291
292
293 public CompressionOutputStream createPlainCompressionStream(
294 OutputStream downStream, Compressor compressor) throws IOException {
295 CompressionCodec codec = getCodec(conf);
296 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
297 return codec.createOutputStream(downStream, compressor);
298 }
299
300 public Compressor getCompressor() {
301 CompressionCodec codec = getCodec(conf);
302 if (codec != null) {
303 Compressor compressor = CodecPool.getCompressor(codec);
304 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
305 if (compressor != null) {
306 if (compressor.finished()) {
307
308 LOG.warn("Compressor obtained from CodecPool is already finished()");
309 }
310 compressor.reset();
311 }
312 return compressor;
313 }
314 return null;
315 }
316
317 public void returnCompressor(Compressor compressor) {
318 if (compressor != null) {
319 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
320 CodecPool.returnCompressor(compressor);
321 }
322 }
323
324 public Decompressor getDecompressor() {
325 CompressionCodec codec = getCodec(conf);
326 if (codec != null) {
327 Decompressor decompressor = CodecPool.getDecompressor(codec);
328 if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
329 if (decompressor != null) {
330 if (decompressor.finished()) {
331
332 LOG.warn("Deompressor obtained from CodecPool is already finished()");
333 }
334 decompressor.reset();
335 }
336 return decompressor;
337 }
338
339 return null;
340 }
341
342 public void returnDecompressor(Decompressor decompressor) {
343 if (decompressor != null) {
344 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
345 CodecPool.returnDecompressor(decompressor);
346 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
347 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
348 decompressor.end();
349 }
350 }
351 }
352
353 public String getName() {
354 return compressName;
355 }
356 }
357
358 public static Algorithm getCompressionAlgorithmByName(String compressName) {
359 Algorithm[] algos = Algorithm.class.getEnumConstants();
360
361 for (Algorithm a : algos) {
362 if (a.getName().equals(compressName)) {
363 return a;
364 }
365 }
366
367 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
368 }
369
370
371
372
373
374
375
376 public static String[] getSupportedAlgorithms() {
377 Algorithm[] algos = Algorithm.class.getEnumConstants();
378
379 String[] ret = new String[algos.length];
380 int i = 0;
381 for (Algorithm a : algos) {
382 ret[i++] = a.getName();
383 }
384
385 return ret;
386 }
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408 public static void decompress(byte[] dest, int destOffset,
409 InputStream bufferedBoundedStream, int compressedSize,
410 int uncompressedSize, Compression.Algorithm compressAlgo)
411 throws IOException {
412
413 if (dest.length - destOffset < uncompressedSize) {
414 throw new IllegalArgumentException(
415 "Output buffer does not have enough space to hold "
416 + uncompressedSize + " decompressed bytes, available: "
417 + (dest.length - destOffset));
418 }
419
420 Decompressor decompressor = null;
421 try {
422 decompressor = compressAlgo.getDecompressor();
423 InputStream is = compressAlgo.createDecompressionStream(
424 bufferedBoundedStream, decompressor, 0);
425
426 IOUtils.readFully(is, dest, destOffset, uncompressedSize);
427 is.close();
428 } finally {
429 if (decompressor != null) {
430 compressAlgo.returnDecompressor(decompressor);
431 }
432 }
433 }
434 }