1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.compress;
18
19 import java.io.BufferedInputStream;
20 import java.io.BufferedOutputStream;
21 import java.io.FilterOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configurable;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.IOUtils;
33 import org.apache.hadoop.io.compress.CodecPool;
34 import org.apache.hadoop.io.compress.CompressionCodec;
35 import org.apache.hadoop.io.compress.CompressionInputStream;
36 import org.apache.hadoop.io.compress.CompressionOutputStream;
37 import org.apache.hadoop.io.compress.Compressor;
38 import org.apache.hadoop.io.compress.Decompressor;
39 import org.apache.hadoop.io.compress.DefaultCodec;
40 import org.apache.hadoop.io.compress.DoNotPool;
41 import org.apache.hadoop.io.compress.GzipCodec;
42 import org.apache.hadoop.util.ReflectionUtils;
43
44
45
46
47
48 @InterfaceAudience.Private
49 public final class Compression {
50 private static final Log LOG = LogFactory.getLog(Compression.class);
51
52
53
54
55 private Compression() {
56 super();
57 }
58
59 static class FinishOnFlushCompressionStream extends FilterOutputStream {
60 public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
61 super(cout);
62 }
63
64 @Override
65 public void write(byte b[], int off, int len) throws IOException {
66 out.write(b, off, len);
67 }
68
69 @Override
70 public void flush() throws IOException {
71 CompressionOutputStream cout = (CompressionOutputStream) out;
72 cout.finish();
73 cout.flush();
74 cout.resetState();
75 }
76 }
77
78
79
80
81 private static ClassLoader getClassLoaderForCodec() {
82 ClassLoader cl = Thread.currentThread().getContextClassLoader();
83 if (cl == null) {
84 cl = Compression.class.getClassLoader();
85 }
86 if (cl == null) {
87 cl = ClassLoader.getSystemClassLoader();
88 }
89 if (cl == null) {
90 throw new RuntimeException("A ClassLoader to load the Codec could not be determined");
91 }
92 return cl;
93 }
94
95
96
97
98
99
100 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
101 value="SE_TRANSIENT_FIELD_NOT_RESTORED",
102 justification="We are not serializing so doesn't apply (not sure why transient though)")
103 @InterfaceAudience.Public
104 @InterfaceStability.Evolving
105 public static enum Algorithm {
106 LZO("lzo") {
107
108 private volatile transient CompressionCodec lzoCodec;
109 private transient Object lock = new Object();
110
111 @Override
112 CompressionCodec getCodec(Configuration conf) {
113 if (lzoCodec == null) {
114 synchronized (lock) {
115 if (lzoCodec == null) {
116 lzoCodec = buildCodec(conf);
117 }
118 }
119 }
120 return lzoCodec;
121 }
122
123 private CompressionCodec buildCodec(Configuration conf) {
124 try {
125 Class<?> externalCodec =
126 getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
127 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
128 new Configuration(conf));
129 } catch (ClassNotFoundException e) {
130 throw new RuntimeException(e);
131 }
132 }
133 },
134 GZ("gz") {
135 private volatile transient GzipCodec codec;
136 private transient Object lock = new Object();
137
138 @Override
139 DefaultCodec getCodec(Configuration conf) {
140 if (codec == null) {
141 synchronized (lock) {
142 if (codec == null) {
143 codec = buildCodec(conf);
144 }
145 }
146 }
147
148 return codec;
149 }
150
151 private GzipCodec buildCodec(Configuration conf) {
152 GzipCodec gzcodec = new ReusableStreamGzipCodec();
153 gzcodec.setConf(new Configuration(conf));
154 return gzcodec;
155 }
156 },
157
158 NONE("none") {
159 @Override
160 DefaultCodec getCodec(Configuration conf) {
161 return null;
162 }
163
164 @Override
165 public synchronized InputStream createDecompressionStream(
166 InputStream downStream, Decompressor decompressor,
167 int downStreamBufferSize) throws IOException {
168 if (downStreamBufferSize > 0) {
169 return new BufferedInputStream(downStream, downStreamBufferSize);
170 }
171 return downStream;
172 }
173
174 @Override
175 public synchronized OutputStream createCompressionStream(
176 OutputStream downStream, Compressor compressor,
177 int downStreamBufferSize) throws IOException {
178 if (downStreamBufferSize > 0) {
179 return new BufferedOutputStream(downStream, downStreamBufferSize);
180 }
181
182 return downStream;
183 }
184 },
185 SNAPPY("snappy") {
186
187 private volatile transient CompressionCodec snappyCodec;
188 private transient Object lock = new Object();
189
190 @Override
191 CompressionCodec getCodec(Configuration conf) {
192 if (snappyCodec == null) {
193 synchronized (lock) {
194 if (snappyCodec == null) {
195 snappyCodec = buildCodec(conf);
196 }
197 }
198 }
199 return snappyCodec;
200 }
201
202 private CompressionCodec buildCodec(Configuration conf) {
203 try {
204 Class<?> externalCodec =
205 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
206 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
207 } catch (ClassNotFoundException e) {
208 throw new RuntimeException(e);
209 }
210 }
211 },
212 LZ4("lz4") {
213
214 private volatile transient CompressionCodec lz4Codec;
215 private transient Object lock = new Object();
216
217 @Override
218 CompressionCodec getCodec(Configuration conf) {
219 if (lz4Codec == null) {
220 synchronized (lock) {
221 if (lz4Codec == null) {
222 lz4Codec = buildCodec(conf);
223 }
224 }
225 }
226 return lz4Codec;
227 }
228
229 private CompressionCodec buildCodec(Configuration conf) {
230 try {
231 Class<?> externalCodec =
232 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
233 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
234 } catch (ClassNotFoundException e) {
235 throw new RuntimeException(e);
236 }
237 }
238 },
239 ZSTD("zstd") {
240
241 private volatile transient CompressionCodec zStandardCodec;
242 private transient Object lock = new Object();
243
244 @Override
245 CompressionCodec getCodec(Configuration conf) {
246 if (zStandardCodec == null) {
247 synchronized (lock) {
248 if (zStandardCodec == null) {
249 zStandardCodec = buildCodec(conf);
250 }
251 }
252 }
253 return zStandardCodec;
254 }
255
256 private CompressionCodec buildCodec(Configuration conf) {
257 try {
258 Class<?> externalCodec =
259 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec");
260 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
261 } catch (ClassNotFoundException e) {
262 throw new RuntimeException(e);
263 }
264 }
265 };
266
267 private final transient Configuration conf;
268 private final String compressName;
269
270 private static final int DATA_IBUF_SIZE = 1 * 1024;
271
272 private static final int DATA_OBUF_SIZE = 4 * 1024;
273
274 Algorithm(String name) {
275 this.conf = new Configuration();
276 this.conf.setBoolean("io.native.lib.available", true);
277 this.compressName = name;
278 }
279
280 abstract CompressionCodec getCodec(Configuration conf);
281
282 public InputStream createDecompressionStream(
283 InputStream downStream, Decompressor decompressor,
284 int downStreamBufferSize) throws IOException {
285 CompressionCodec codec = getCodec(conf);
286
287 if (downStreamBufferSize > 0) {
288 ((Configurable)codec).getConf().setInt("io.file.buffer.size",
289 downStreamBufferSize);
290 }
291 CompressionInputStream cis =
292 codec.createInputStream(downStream, decompressor);
293 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
294 return bis2;
295
296 }
297
298 public OutputStream createCompressionStream(
299 OutputStream downStream, Compressor compressor, int downStreamBufferSize)
300 throws IOException {
301 OutputStream bos1 = null;
302 if (downStreamBufferSize > 0) {
303 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
304 }
305 else {
306 bos1 = downStream;
307 }
308 CompressionOutputStream cos =
309 createPlainCompressionStream(bos1, compressor);
310 BufferedOutputStream bos2 =
311 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
312 DATA_OBUF_SIZE);
313 return bos2;
314 }
315
316
317
318
319
320 public CompressionOutputStream createPlainCompressionStream(
321 OutputStream downStream, Compressor compressor) throws IOException {
322 CompressionCodec codec = getCodec(conf);
323 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
324 return codec.createOutputStream(downStream, compressor);
325 }
326
327 public Compressor getCompressor() {
328 CompressionCodec codec = getCodec(conf);
329 if (codec != null) {
330 Compressor compressor = CodecPool.getCompressor(codec);
331 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
332 if (compressor != null) {
333 if (compressor.finished()) {
334
335 LOG.warn("Compressor obtained from CodecPool is already finished()");
336 }
337 compressor.reset();
338 }
339 return compressor;
340 }
341 return null;
342 }
343
344 public void returnCompressor(Compressor compressor) {
345 if (compressor != null) {
346 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool.");
347 CodecPool.returnCompressor(compressor);
348 }
349 }
350
351 public Decompressor getDecompressor() {
352 CompressionCodec codec = getCodec(conf);
353 if (codec != null) {
354 Decompressor decompressor = CodecPool.getDecompressor(codec);
355 if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool.");
356 if (decompressor != null) {
357 if (decompressor.finished()) {
358
359 LOG.warn("Deompressor obtained from CodecPool is already finished()");
360 }
361 decompressor.reset();
362 }
363 return decompressor;
364 }
365
366 return null;
367 }
368
369 public void returnDecompressor(Decompressor decompressor) {
370 if (decompressor != null) {
371 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool.");
372 CodecPool.returnDecompressor(decompressor);
373 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
374 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor);
375 decompressor.end();
376 }
377 }
378 }
379
380 public String getName() {
381 return compressName;
382 }
383 }
384
385 public static Algorithm getCompressionAlgorithmByName(String compressName) {
386 Algorithm[] algos = Algorithm.class.getEnumConstants();
387
388 for (Algorithm a : algos) {
389 if (a.getName().equals(compressName)) {
390 return a;
391 }
392 }
393
394 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
395 }
396
397
398
399
400
401
402
403 public static String[] getSupportedAlgorithms() {
404 Algorithm[] algos = Algorithm.class.getEnumConstants();
405
406 String[] ret = new String[algos.length];
407 int i = 0;
408 for (Algorithm a : algos) {
409 ret[i++] = a.getName();
410 }
411
412 return ret;
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435 public static void decompress(byte[] dest, int destOffset,
436 InputStream bufferedBoundedStream, int compressedSize,
437 int uncompressedSize, Compression.Algorithm compressAlgo)
438 throws IOException {
439
440 if (dest.length - destOffset < uncompressedSize) {
441 throw new IllegalArgumentException(
442 "Output buffer does not have enough space to hold "
443 + uncompressedSize + " decompressed bytes, available: "
444 + (dest.length - destOffset));
445 }
446
447 Decompressor decompressor = null;
448 try {
449 decompressor = compressAlgo.getDecompressor();
450 InputStream is = compressAlgo.createDecompressionStream(
451 bufferedBoundedStream, decompressor, 0);
452
453 IOUtils.readFully(is, dest, destOffset, uncompressedSize);
454 is.close();
455 } finally {
456 if (decompressor != null) {
457 compressAlgo.returnDecompressor(decompressor);
458 }
459 }
460 }
461 }