001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations under 015 * the License. 016 */ 017package org.apache.hadoop.hbase.io.compress; 018 019import java.io.BufferedInputStream; 020import java.io.BufferedOutputStream; 021import java.io.FilterOutputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.io.util.BlockIOUtils; 029import org.apache.hadoop.hbase.nio.ByteBuff; 030import org.apache.hadoop.io.compress.CodecPool; 031import org.apache.hadoop.io.compress.CompressionCodec; 032import org.apache.hadoop.io.compress.CompressionInputStream; 033import org.apache.hadoop.io.compress.CompressionOutputStream; 034import org.apache.hadoop.io.compress.Compressor; 035import org.apache.hadoop.io.compress.Decompressor; 036import org.apache.hadoop.io.compress.DefaultCodec; 037import org.apache.hadoop.io.compress.DoNotPool; 038import org.apache.hadoop.io.compress.GzipCodec; 039import org.apache.hadoop.util.ReflectionUtils; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Compression related stuff. 046 * Copied from hadoop-3315 tfile. 047 */ 048@InterfaceAudience.Private 049public final class Compression { 050 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 051 052 /** 053 * Prevent the instantiation of class. 054 */ 055 private Compression() { 056 super(); 057 } 058 059 static class FinishOnFlushCompressionStream extends FilterOutputStream { 060 public FinishOnFlushCompressionStream(CompressionOutputStream cout) { 061 super(cout); 062 } 063 064 @Override 065 public void write(byte b[], int off, int len) throws IOException { 066 out.write(b, off, len); 067 } 068 069 @Override 070 public void flush() throws IOException { 071 CompressionOutputStream cout = (CompressionOutputStream) out; 072 cout.finish(); 073 cout.flush(); 074 cout.resetState(); 075 } 076 } 077 078 /** 079 * Returns the classloader to load the Codec class from. 080 */ 081 private static ClassLoader getClassLoaderForCodec() { 082 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 083 if (cl == null) { 084 cl = Compression.class.getClassLoader(); 085 } 086 if (cl == null) { 087 cl = ClassLoader.getSystemClassLoader(); 088 } 089 if (cl == null) { 090 throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); 091 } 092 return cl; 093 } 094 095 /** 096 * Compression algorithms. The ordinal of these cannot change or else you 097 * risk breaking all existing HFiles out there. Even the ones that are 098 * not compressed! (They use the NONE algorithm) 099 */ 100 @edu.umd.cs.findbugs.annotations.SuppressWarnings( 101 value="SE_TRANSIENT_FIELD_NOT_RESTORED", 102 justification="We are not serializing so doesn't apply (not sure why transient though)") 103 @InterfaceAudience.Public 104 public static enum Algorithm { 105 LZO("lzo") { 106 // Use base type to avoid compile-time dependencies. 107 private volatile transient CompressionCodec lzoCodec; 108 private final transient Object lock = new Object(); 109 110 @Override 111 CompressionCodec getCodec(Configuration conf) { 112 if (lzoCodec == null) { 113 synchronized (lock) { 114 if (lzoCodec == null) { 115 lzoCodec = buildCodec(conf); 116 } 117 } 118 } 119 return lzoCodec; 120 } 121 122 private CompressionCodec buildCodec(Configuration conf) { 123 try { 124 Class<?> externalCodec = 125 getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec"); 126 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, 127 new Configuration(conf)); 128 } catch (ClassNotFoundException e) { 129 throw new RuntimeException(e); 130 } 131 } 132 }, 133 GZ("gz") { 134 private volatile transient GzipCodec codec; 135 private final transient Object lock = new Object(); 136 137 @Override 138 DefaultCodec getCodec(Configuration conf) { 139 if (codec == null) { 140 synchronized (lock) { 141 if (codec == null) { 142 codec = buildCodec(conf); 143 } 144 } 145 } 146 147 return codec; 148 } 149 150 private GzipCodec buildCodec(Configuration conf) { 151 GzipCodec gzcodec = new ReusableStreamGzipCodec(); 152 gzcodec.setConf(new Configuration(conf)); 153 return gzcodec; 154 } 155 }, 156 157 NONE("none") { 158 @Override 159 DefaultCodec getCodec(Configuration conf) { 160 return null; 161 } 162 163 @Override 164 public synchronized InputStream createDecompressionStream( 165 InputStream downStream, Decompressor decompressor, 166 int downStreamBufferSize) throws IOException { 167 if (downStreamBufferSize > 0) { 168 return new BufferedInputStream(downStream, downStreamBufferSize); 169 } 170 return downStream; 171 } 172 173 @Override 174 public synchronized OutputStream createCompressionStream( 175 OutputStream downStream, Compressor compressor, 176 int downStreamBufferSize) throws IOException { 177 if (downStreamBufferSize > 0) { 178 return new BufferedOutputStream(downStream, downStreamBufferSize); 179 } 180 181 return downStream; 182 } 183 }, 184 SNAPPY("snappy") { 185 // Use base type to avoid compile-time dependencies. 186 private volatile transient CompressionCodec snappyCodec; 187 private final transient Object lock = new Object(); 188 189 @Override 190 CompressionCodec getCodec(Configuration conf) { 191 if (snappyCodec == null) { 192 synchronized (lock) { 193 if (snappyCodec == null) { 194 snappyCodec = buildCodec(conf); 195 } 196 } 197 } 198 return snappyCodec; 199 } 200 201 private CompressionCodec buildCodec(Configuration conf) { 202 try { 203 Class<?> externalCodec = 204 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec"); 205 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 206 } catch (ClassNotFoundException e) { 207 throw new RuntimeException(e); 208 } 209 } 210 }, 211 LZ4("lz4") { 212 // Use base type to avoid compile-time dependencies. 213 private volatile transient CompressionCodec lz4Codec; 214 private final transient Object lock = new Object(); 215 216 @Override 217 CompressionCodec getCodec(Configuration conf) { 218 if (lz4Codec == null) { 219 synchronized (lock) { 220 if (lz4Codec == null) { 221 lz4Codec = buildCodec(conf); 222 } 223 } 224 } 225 return lz4Codec; 226 } 227 228 private CompressionCodec buildCodec(Configuration conf) { 229 try { 230 Class<?> externalCodec = 231 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec"); 232 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 233 } catch (ClassNotFoundException e) { 234 throw new RuntimeException(e); 235 } 236 } 237 }, 238 BZIP2("bzip2") { 239 // Use base type to avoid compile-time dependencies. 240 private volatile transient CompressionCodec bzipCodec; 241 private final transient Object lock = new Object(); 242 243 @Override 244 CompressionCodec getCodec(Configuration conf) { 245 if (bzipCodec == null) { 246 synchronized (lock) { 247 if (bzipCodec == null) { 248 bzipCodec = buildCodec(conf); 249 } 250 } 251 } 252 return bzipCodec; 253 } 254 255 private CompressionCodec buildCodec(Configuration conf) { 256 try { 257 Class<?> externalCodec = 258 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.BZip2Codec"); 259 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 260 } catch (ClassNotFoundException e) { 261 throw new RuntimeException(e); 262 } 263 } 264 }, 265 ZSTD("zstd") { 266 // Use base type to avoid compile-time dependencies. 267 private volatile transient CompressionCodec zStandardCodec; 268 private final transient Object lock = new Object(); 269 270 @Override 271 CompressionCodec getCodec(Configuration conf) { 272 if (zStandardCodec == null) { 273 synchronized (lock) { 274 if (zStandardCodec == null) { 275 zStandardCodec = buildCodec(conf); 276 } 277 } 278 } 279 return zStandardCodec; 280 } 281 282 private CompressionCodec buildCodec(Configuration conf) { 283 try { 284 Class<?> externalCodec = 285 getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec"); 286 return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf); 287 } catch (ClassNotFoundException e) { 288 throw new RuntimeException(e); 289 } 290 } 291 }; 292 293 private final Configuration conf; 294 private final String compressName; 295 /** data input buffer size to absorb small reads from application. */ 296 private static final int DATA_IBUF_SIZE = 1 * 1024; 297 /** data output buffer size to absorb small writes from application. */ 298 private static final int DATA_OBUF_SIZE = 4 * 1024; 299 300 Algorithm(String name) { 301 this.conf = new Configuration(); 302 this.conf.setBoolean("io.native.lib.available", true); 303 this.compressName = name; 304 } 305 306 abstract CompressionCodec getCodec(Configuration conf); 307 308 public InputStream createDecompressionStream( 309 InputStream downStream, Decompressor decompressor, 310 int downStreamBufferSize) throws IOException { 311 CompressionCodec codec = getCodec(conf); 312 // Set the internal buffer size to read from down stream. 313 if (downStreamBufferSize > 0) { 314 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 315 downStreamBufferSize); 316 } 317 CompressionInputStream cis = 318 codec.createInputStream(downStream, decompressor); 319 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); 320 return bis2; 321 322 } 323 324 public OutputStream createCompressionStream( 325 OutputStream downStream, Compressor compressor, int downStreamBufferSize) 326 throws IOException { 327 OutputStream bos1 = null; 328 if (downStreamBufferSize > 0) { 329 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); 330 } 331 else { 332 bos1 = downStream; 333 } 334 CompressionOutputStream cos = 335 createPlainCompressionStream(bos1, compressor); 336 BufferedOutputStream bos2 = 337 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), 338 DATA_OBUF_SIZE); 339 return bos2; 340 } 341 342 /** 343 * Creates a compression stream without any additional wrapping into 344 * buffering streams. 345 */ 346 public CompressionOutputStream createPlainCompressionStream( 347 OutputStream downStream, Compressor compressor) throws IOException { 348 CompressionCodec codec = getCodec(conf); 349 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024); 350 return codec.createOutputStream(downStream, compressor); 351 } 352 353 public Compressor getCompressor() { 354 CompressionCodec codec = getCodec(conf); 355 if (codec != null) { 356 Compressor compressor = CodecPool.getCompressor(codec); 357 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); 358 if (compressor != null) { 359 if (compressor.finished()) { 360 // Somebody returns the compressor to CodecPool but is still using it. 361 LOG.warn("Compressor obtained from CodecPool is already finished()"); 362 } 363 compressor.reset(); 364 } 365 return compressor; 366 } 367 return null; 368 } 369 370 public void returnCompressor(Compressor compressor) { 371 if (compressor != null) { 372 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool."); 373 CodecPool.returnCompressor(compressor); 374 } 375 } 376 377 public Decompressor getDecompressor() { 378 CompressionCodec codec = getCodec(conf); 379 if (codec != null) { 380 Decompressor decompressor = CodecPool.getDecompressor(codec); 381 if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool."); 382 if (decompressor != null) { 383 if (decompressor.finished()) { 384 // Somebody returns the decompressor to CodecPool but is still using it. 385 LOG.warn("Deompressor obtained from CodecPool is already finished()"); 386 } 387 decompressor.reset(); 388 } 389 return decompressor; 390 } 391 392 return null; 393 } 394 395 public void returnDecompressor(Decompressor decompressor) { 396 if (decompressor != null) { 397 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool."); 398 CodecPool.returnDecompressor(decompressor); 399 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 400 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor); 401 decompressor.end(); 402 } 403 } 404 } 405 406 public String getName() { 407 return compressName; 408 } 409 } 410 411 public static Algorithm getCompressionAlgorithmByName(String compressName) { 412 Algorithm[] algos = Algorithm.class.getEnumConstants(); 413 414 for (Algorithm a : algos) { 415 if (a.getName().equals(compressName)) { 416 return a; 417 } 418 } 419 420 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); 421 } 422 423 /** 424 * Get names of supported compression algorithms. 425 * 426 * @return Array of strings, each represents a supported compression 427 * algorithm. Currently, the following compression algorithms are supported. 428 */ 429 public static String[] getSupportedAlgorithms() { 430 Algorithm[] algos = Algorithm.class.getEnumConstants(); 431 432 String[] ret = new String[algos.length]; 433 int i = 0; 434 for (Algorithm a : algos) { 435 ret[i++] = a.getName(); 436 } 437 438 return ret; 439 } 440 441 /** 442 * Decompresses data from the given stream using the configured compression algorithm. It will 443 * throw an exception if the dest buffer does not have enough space to hold the decompressed data. 444 * @param dest the output buffer 445 * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount 446 * of compressed data 447 * @param uncompressedSize uncompressed data size, header not included 448 * @param compressAlgo compression algorithm used 449 * @throws IOException if any IO error happen 450 */ 451 public static void decompress(ByteBuff dest, InputStream bufferedBoundedStream, 452 int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException { 453 if (dest.remaining() < uncompressedSize) { 454 throw new IllegalArgumentException("Output buffer does not have enough space to hold " 455 + uncompressedSize + " decompressed bytes, available: " + dest.remaining()); 456 } 457 458 Decompressor decompressor = null; 459 try { 460 decompressor = compressAlgo.getDecompressor(); 461 try (InputStream is = 462 compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0)) { 463 BlockIOUtils.readFullyWithHeapBuffer(is, dest, uncompressedSize); 464 } 465 } finally { 466 if (decompressor != null) { 467 compressAlgo.returnDecompressor(decompressor); 468 } 469 } 470 } 471}