001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations under 015 * the License. 016 */ 017package org.apache.hadoop.hbase.io.compress; 018 019import java.io.BufferedInputStream; 020import java.io.BufferedOutputStream; 021import java.io.FilterOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.io.IOUtils; 029import org.apache.hadoop.io.compress.CodecPool; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.CompressionInputStream; 032import org.apache.hadoop.io.compress.CompressionOutputStream; 033import org.apache.hadoop.io.compress.Compressor; 034import org.apache.hadoop.io.compress.Decompressor; 035import org.apache.hadoop.io.compress.DefaultCodec; 036import org.apache.hadoop.io.compress.DoNotPool; 037import org.apache.hadoop.io.compress.GzipCodec; 038import org.apache.hadoop.util.ReflectionUtils; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Compression related stuff. 045 * Copied from hadoop-3315 tfile. 046 */ 047@InterfaceAudience.Private 048public final class Compression { 049 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 050 051 /** 052 * Prevent the instantiation of class. 053 */ 054 private Compression() { 055 super(); 056 } 057 058 static class FinishOnFlushCompressionStream extends FilterOutputStream { 059 public FinishOnFlushCompressionStream(CompressionOutputStream cout) { 060 super(cout); 061 } 062 063 @Override 064 public void write(byte b[], int off, int len) throws IOException { 065 out.write(b, off, len); 066 } 067 068 @Override 069 public void flush() throws IOException { 070 CompressionOutputStream cout = (CompressionOutputStream) out; 071 cout.finish(); 072 cout.flush(); 073 cout.resetState(); 074 } 075 } 076 077 /** 078 * Returns the classloader to load the Codec class from. 079 */ 080 private static ClassLoader getClassLoaderForCodec() { 081 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 082 if (cl == null) { 083 cl = Compression.class.getClassLoader(); 084 } 085 if (cl == null) { 086 cl = ClassLoader.getSystemClassLoader(); 087 } 088 if (cl == null) { 089 throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); 090 } 091 return cl; 092 } 093 094 /** 095 * Compression algorithms. The ordinal of these cannot change or else you 096 * risk breaking all existing HFiles out there. Even the ones that are 097 * not compressed! (They use the NONE algorithm) 098 */ 099 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 100 value="SE_TRANSIENT_FIELD_NOT_RESTORED", 101 justification="We are not serializing so doesn't apply (not sure why transient though)") 102 @InterfaceAudience.Public 103 public static enum Algorithm { 104 LZO("lzo") { 105 // Use base type to avoid compile-time dependencies. 106 private volatile transient CompressionCodec lzoCodec; 107 private final transient Object lock = new Object(); 108 109 @Override 110 CompressionCodec getCodec(Configuration conf) { 111 if (lzoCodec == null) { 112 synchronized (lock) { 113 if (lzoCodec == null) { 114 lzoCodec = buildCodec(conf); 115 } 116 } 117 } 118 return lzoCodec; 119 } 120 121 private CompressionCodec buildCodec(Configuration conf) { 122 try { 123 Class<?> externalCodec = 124 getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec"); 125 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, 126 new Configuration(conf)); 127 } catch (ClassNotFoundException e) { 128 throw new RuntimeException(e); 129 } 130 } 131 }, 132 GZ("gz") { 133 private volatile transient GzipCodec codec; 134 private final transient Object lock = new Object(); 135 136 @Override 137 DefaultCodec getCodec(Configuration conf) { 138 if (codec == null) { 139 synchronized (lock) { 140 if (codec == null) { 141 codec = buildCodec(conf); 142 } 143 } 144 } 145 146 return codec; 147 } 148 149 private GzipCodec buildCodec(Configuration conf) { 150 GzipCodec gzcodec = new ReusableStreamGzipCodec(); 151 gzcodec.setConf(new Configuration(conf)); 152 return gzcodec; 153 } 154 }, 155 156 NONE("none") { 157 @Override 158 DefaultCodec getCodec(Configuration conf) { 159 return null; 160 } 161 162 @Override 163 public synchronized InputStream createDecompressionStream( 164 InputStream downStream, Decompressor decompressor, 165 int downStreamBufferSize) throws IOException { 166 if (downStreamBufferSize > 0) { 167 return new BufferedInputStream(downStream, downStreamBufferSize); 168 } 169 return downStream; 170 } 171 172 @Override 173 public synchronized OutputStream createCompressionStream( 174 OutputStream downStream, Compressor compressor, 175 int downStreamBufferSize) throws IOException { 176 if (downStreamBufferSize > 0) { 177 return new BufferedOutputStream(downStream, downStreamBufferSize); 178 } 179 180 return downStream; 181 } 182 }, 183 SNAPPY("snappy") { 184 // Use base type to avoid compile-time dependencies. 185 private volatile transient CompressionCodec snappyCodec; 186 private final transient Object lock = new Object(); 187 188 @Override 189 CompressionCodec getCodec(Configuration conf) { 190 if (snappyCodec == null) { 191 synchronized (lock) { 192 if (snappyCodec == null) { 193 snappyCodec = buildCodec(conf); 194 } 195 } 196 } 197 return snappyCodec; 198 } 199 200 private CompressionCodec buildCodec(Configuration conf) { 201 try { 202 Class<?> externalCodec = 203 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec"); 204 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 205 } catch (ClassNotFoundException e) { 206 throw new RuntimeException(e); 207 } 208 } 209 }, 210 LZ4("lz4") { 211 // Use base type to avoid compile-time dependencies. 212 private volatile transient CompressionCodec lz4Codec; 213 private final transient Object lock = new Object(); 214 215 @Override 216 CompressionCodec getCodec(Configuration conf) { 217 if (lz4Codec == null) { 218 synchronized (lock) { 219 if (lz4Codec == null) { 220 lz4Codec = buildCodec(conf); 221 } 222 } 223 } 224 return lz4Codec; 225 } 226 227 private CompressionCodec buildCodec(Configuration conf) { 228 try { 229 Class<?> externalCodec = 230 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec"); 231 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 232 } catch (ClassNotFoundException e) { 233 throw new RuntimeException(e); 234 } 235 } 236 }, 237 BZIP2("bzip2") { 238 // Use base type to avoid compile-time dependencies. 239 private volatile transient CompressionCodec bzipCodec; 240 private final transient Object lock = new Object(); 241 242 @Override 243 CompressionCodec getCodec(Configuration conf) { 244 if (bzipCodec == null) { 245 synchronized (lock) { 246 if (bzipCodec == null) { 247 bzipCodec = buildCodec(conf); 248 } 249 } 250 } 251 return bzipCodec; 252 } 253 254 private CompressionCodec buildCodec(Configuration conf) { 255 try { 256 Class<?> externalCodec = 257 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.BZip2Codec"); 258 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 259 } catch (ClassNotFoundException e) { 260 throw new RuntimeException(e); 261 } 262 } 263 }, 264 ZSTD("zstd") { 265 // Use base type to avoid compile-time dependencies. 266 private volatile transient CompressionCodec zStandardCodec; 267 private final transient Object lock = new Object(); 268 269 @Override 270 CompressionCodec getCodec(Configuration conf) { 271 if (zStandardCodec == null) { 272 synchronized (lock) { 273 if (zStandardCodec == null) { 274 zStandardCodec = buildCodec(conf); 275 } 276 } 277 } 278 return zStandardCodec; 279 } 280 281 private CompressionCodec buildCodec(Configuration conf) { 282 try { 283 Class<?> externalCodec = 284 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec"); 285 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 286 } catch (ClassNotFoundException e) { 287 throw new RuntimeException(e); 288 } 289 } 290 }; 291 292 private final Configuration conf; 293 private final String compressName; 294 /** data input buffer size to absorb small reads from application. */ 295 private static final int DATA_IBUF_SIZE = 1 * 1024; 296 /** data output buffer size to absorb small writes from application. */ 297 private static final int DATA_OBUF_SIZE = 4 * 1024; 298 299 Algorithm(String name) { 300 this.conf = new Configuration(); 301 this.conf.setBoolean("io.native.lib.available", true); 302 this.compressName = name; 303 } 304 305 abstract CompressionCodec getCodec(Configuration conf); 306 307 public InputStream createDecompressionStream( 308 InputStream downStream, Decompressor decompressor, 309 int downStreamBufferSize) throws IOException { 310 CompressionCodec codec = getCodec(conf); 311 // Set the internal buffer size to read from down stream. 312 if (downStreamBufferSize > 0) { 313 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 314 downStreamBufferSize); 315 } 316 CompressionInputStream cis = 317 codec.createInputStream(downStream, decompressor); 318 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); 319 return bis2; 320 321 } 322 323 public OutputStream createCompressionStream( 324 OutputStream downStream, Compressor compressor, int downStreamBufferSize) 325 throws IOException { 326 OutputStream bos1 = null; 327 if (downStreamBufferSize > 0) { 328 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); 329 } 330 else { 331 bos1 = downStream; 332 } 333 CompressionOutputStream cos = 334 createPlainCompressionStream(bos1, compressor); 335 BufferedOutputStream bos2 = 336 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), 337 DATA_OBUF_SIZE); 338 return bos2; 339 } 340 341 /** 342 * Creates a compression stream without any additional wrapping into 343 * buffering streams. 344 */ 345 public CompressionOutputStream createPlainCompressionStream( 346 OutputStream downStream, Compressor compressor) throws IOException { 347 CompressionCodec codec = getCodec(conf); 348 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); 349 return codec.createOutputStream(downStream, compressor); 350 } 351 352 public Compressor getCompressor() { 353 CompressionCodec codec = getCodec(conf); 354 if (codec != null) { 355 Compressor compressor = CodecPool.getCompressor(codec); 356 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); 357 if (compressor != null) { 358 if (compressor.finished()) { 359 // Somebody returns the compressor to CodecPool but is still using it. 360 LOG.warn("Compressor obtained from CodecPool is already finished()"); 361 } 362 compressor.reset(); 363 } 364 return compressor; 365 } 366 return null; 367 } 368 369 public void returnCompressor(Compressor compressor) { 370 if (compressor != null) { 371 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool."); 372 CodecPool.returnCompressor(compressor); 373 } 374 } 375 376 public Decompressor getDecompressor() { 377 CompressionCodec codec = getCodec(conf); 378 if (codec != null) { 379 Decompressor decompressor = CodecPool.getDecompressor(codec); 380 if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool."); 381 if (decompressor != null) { 382 if (decompressor.finished()) { 383 // Somebody returns the decompressor to CodecPool but is still using it. 384 LOG.warn("Deompressor obtained from CodecPool is already finished()"); 385 } 386 decompressor.reset(); 387 } 388 return decompressor; 389 } 390 391 return null; 392 } 393 394 public void returnDecompressor(Decompressor decompressor) { 395 if (decompressor != null) { 396 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool."); 397 CodecPool.returnDecompressor(decompressor); 398 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 399 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor); 400 decompressor.end(); 401 } 402 } 403 } 404 405 public String getName() { 406 return compressName; 407 } 408 } 409 410 public static Algorithm getCompressionAlgorithmByName(String compressName) { 411 Algorithm[] algos = Algorithm.class.getEnumConstants(); 412 413 for (Algorithm a : algos) { 414 if (a.getName().equals(compressName)) { 415 return a; 416 } 417 } 418 419 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); 420 } 421 422 /** 423 * Get names of supported compression algorithms. 424 * 425 * @return Array of strings, each represents a supported compression 426 * algorithm. Currently, the following compression algorithms are supported. 427 */ 428 public static String[] getSupportedAlgorithms() { 429 Algorithm[] algos = Algorithm.class.getEnumConstants(); 430 431 String[] ret = new String[algos.length]; 432 int i = 0; 433 for (Algorithm a : algos) { 434 ret[i++] = a.getName(); 435 } 436 437 return ret; 438 } 439 440 /** 441 * Decompresses data from the given stream using the configured compression 442 * algorithm. It will throw an exception if the dest buffer does not have 443 * enough space to hold the decompressed data. 444 * 445 * @param dest 446 * the output bytes buffer 447 * @param destOffset 448 * start writing position of the output buffer 449 * @param bufferedBoundedStream 450 * a stream to read compressed data from, bounded to the exact amount 451 * of compressed data 452 * @param compressedSize 453 * compressed data size, header not included 454 * @param uncompressedSize 455 * uncompressed data size, header not included 456 * @param compressAlgo 457 * compression algorithm used 458 * @throws IOException 459 */ 460 public static void decompress(byte[] dest, int destOffset, 461 InputStream bufferedBoundedStream, int compressedSize, 462 int uncompressedSize, Compression.Algorithm compressAlgo) 463 throws IOException { 464 465 if (dest.length - destOffset < uncompressedSize) { 466 throw new IllegalArgumentException( 467 "Output buffer does not have enough space to hold " 468 + uncompressedSize + " decompressed bytes, available: " 469 + (dest.length - destOffset)); 470 } 471 472 Decompressor decompressor = null; 473 try { 474 decompressor = compressAlgo.getDecompressor(); 475 InputStream is = compressAlgo.createDecompressionStream( 476 bufferedBoundedStream, decompressor, 0); 477 478 IOUtils.readFully(is, dest, destOffset, uncompressedSize); 479 is.close(); 480 } finally { 481 if (decompressor != null) { 482 compressAlgo.returnDecompressor(decompressor); 483 } 484 } 485 } 486}