001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress; 019 020import java.io.BufferedInputStream; 021import java.io.BufferedOutputStream; 022import java.io.FilterOutputStream; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import org.apache.hadoop.conf.Configurable; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.io.compress.CodecPool; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.CompressionInputStream; 032import org.apache.hadoop.io.compress.CompressionOutputStream; 033import org.apache.hadoop.io.compress.Compressor; 034import org.apache.hadoop.io.compress.Decompressor; 035import org.apache.hadoop.io.compress.DoNotPool; 036import org.apache.hadoop.util.ReflectionUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Compression related stuff. Copied from hadoop-3315 tfile. 043 */ 044@InterfaceAudience.Private 045public final class Compression { 046 private static final Logger LOG = LoggerFactory.getLogger(Compression.class); 047 048 // LZO 049 050 public static final String LZO_CODEC_CLASS_KEY = "hbase.io.compress.lzo.codec"; 051 public static final String LZO_CODEC_CLASS_DEFAULT = "com.hadoop.compression.lzo.LzoCodec"; 052 053 // GZ 054 055 public static final String GZ_CODEC_CLASS_KEY = "hbase.io.compress.gz.codec"; 056 // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to 057 // reuse compression streams, but still requires the Hadoop native codec. 058 public static final String GZ_CODEC_CLASS_DEFAULT = 059 "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec"; 060 061 // SNAPPY 062 063 public static final String SNAPPY_CODEC_CLASS_KEY = "hbase.io.compress.snappy.codec"; 064 public static final String SNAPPY_CODEC_CLASS_DEFAULT = 065 "org.apache.hadoop.io.compress.SnappyCodec"; 066 067 // LZ4 068 069 public static final String LZ4_CODEC_CLASS_KEY = "hbase.io.compress.lz4.codec"; 070 public static final String LZ4_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.Lz4Codec"; 071 072 // ZSTD 073 074 public static final String ZSTD_CODEC_CLASS_KEY = "hbase.io.compress.zstd.codec"; 075 public static final String ZSTD_CODEC_CLASS_DEFAULT = 076 "org.apache.hadoop.io.compress.ZStandardCodec"; 077 078 // BZIP2 079 080 public static final String BZIP2_CODEC_CLASS_KEY = "hbase.io.compress.bzip2.codec"; 081 public static final String BZIP2_CODEC_CLASS_DEFAULT = "org.apache.hadoop.io.compress.BZip2Codec"; 082 083 // LZMA 084 085 public static final String LZMA_CODEC_CLASS_KEY = "hbase.io.compress.lzma.codec"; 086 public static final String LZMA_CODEC_CLASS_DEFAULT = 087 "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec"; 088 089 // Brotli 090 091 public static final String BROTLI_CODEC_CLASS_KEY = "hbase.io.compress.brotli.codec"; 092 public static final String BROTLI_CODEC_CLASS_DEFAULT = 093 "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec"; 094 095 /** 096 * Prevent the instantiation of class. 097 */ 098 private Compression() { 099 super(); 100 } 101 102 static class FinishOnFlushCompressionStream extends FilterOutputStream { 103 public FinishOnFlushCompressionStream(CompressionOutputStream cout) { 104 super(cout); 105 } 106 107 @Override 108 public void write(byte b[], int off, int len) throws IOException { 109 out.write(b, off, len); 110 } 111 112 @Override 113 public void flush() throws IOException { 114 CompressionOutputStream cout = (CompressionOutputStream) out; 115 cout.finish(); 116 cout.flush(); 117 cout.resetState(); 118 } 119 } 120 121 /** 122 * Returns the classloader to load the Codec class from. 123 */ 124 private static ClassLoader getClassLoaderForCodec() { 125 ClassLoader cl = Thread.currentThread().getContextClassLoader(); 126 if (cl == null) { 127 cl = Compression.class.getClassLoader(); 128 } 129 if (cl == null) { 130 cl = ClassLoader.getSystemClassLoader(); 131 } 132 if (cl == null) { 133 throw new RuntimeException("A ClassLoader to load the Codec could not be determined"); 134 } 135 return cl; 136 } 137 138 /** 139 * Compression algorithms. The ordinal of these cannot change or else you risk breaking all 140 * existing HFiles out there. Even the ones that are not compressed! (They use the NONE algorithm) 141 */ 142 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "SE_TRANSIENT_FIELD_NOT_RESTORED", 143 justification = "We are not serializing so doesn't apply (not sure why transient though)") 144 @SuppressWarnings("ImmutableEnumChecker") 145 @InterfaceAudience.Public 146 public static enum Algorithm { 147 // LZO is GPL and requires extra install to setup. See 148 // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5 149 LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) { 150 // Use base type to avoid compile-time dependencies. 151 private volatile transient CompressionCodec lzoCodec; 152 private final transient Object lock = new Object(); 153 154 @Override 155 CompressionCodec getCodec(Configuration conf) { 156 if (lzoCodec == null) { 157 synchronized (lock) { 158 if (lzoCodec == null) { 159 lzoCodec = buildCodec(conf, this); 160 } 161 } 162 } 163 return lzoCodec; 164 } 165 166 @Override 167 public CompressionCodec reload(Configuration conf) { 168 synchronized (lock) { 169 lzoCodec = buildCodec(conf, this); 170 LOG.warn("Reloaded configuration for {}", name()); 171 return lzoCodec; 172 } 173 } 174 }, 175 176 GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) { 177 private volatile transient CompressionCodec gzCodec; 178 private final transient Object lock = new Object(); 179 180 @Override 181 CompressionCodec getCodec(Configuration conf) { 182 if (gzCodec == null) { 183 synchronized (lock) { 184 if (gzCodec == null) { 185 gzCodec = buildCodec(conf, this); 186 } 187 } 188 } 189 return gzCodec; 190 } 191 192 @Override 193 public CompressionCodec reload(Configuration conf) { 194 synchronized (lock) { 195 gzCodec = buildCodec(conf, this); 196 LOG.warn("Reloaded configuration for {}", name()); 197 return gzCodec; 198 } 199 } 200 }, 201 202 NONE("none", "", "") { 203 @Override 204 CompressionCodec getCodec(Configuration conf) { 205 return null; 206 } 207 208 @Override 209 public CompressionCodec reload(Configuration conf) { 210 return null; 211 } 212 213 @Override 214 public synchronized InputStream createDecompressionStream(InputStream downStream, 215 Decompressor decompressor, int downStreamBufferSize) throws IOException { 216 if (downStreamBufferSize > 0) { 217 return new BufferedInputStream(downStream, downStreamBufferSize); 218 } 219 return downStream; 220 } 221 222 @Override 223 public synchronized OutputStream createCompressionStream(OutputStream downStream, 224 Compressor compressor, int downStreamBufferSize) throws IOException { 225 if (downStreamBufferSize > 0) { 226 return new BufferedOutputStream(downStream, downStreamBufferSize); 227 } 228 229 return downStream; 230 } 231 }, 232 SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) { 233 // Use base type to avoid compile-time dependencies. 234 private volatile transient CompressionCodec snappyCodec; 235 private final transient Object lock = new Object(); 236 237 @Override 238 CompressionCodec getCodec(Configuration conf) { 239 if (snappyCodec == null) { 240 synchronized (lock) { 241 if (snappyCodec == null) { 242 snappyCodec = buildCodec(conf, this); 243 } 244 } 245 } 246 return snappyCodec; 247 } 248 249 @Override 250 public CompressionCodec reload(Configuration conf) { 251 synchronized (lock) { 252 snappyCodec = buildCodec(conf, this); 253 LOG.warn("Reloaded configuration for {}", name()); 254 return snappyCodec; 255 } 256 } 257 }, 258 LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) { 259 // Use base type to avoid compile-time dependencies. 260 private volatile transient CompressionCodec lz4Codec; 261 private final transient Object lock = new Object(); 262 263 @Override 264 CompressionCodec getCodec(Configuration conf) { 265 if (lz4Codec == null) { 266 synchronized (lock) { 267 if (lz4Codec == null) { 268 lz4Codec = buildCodec(conf, this); 269 } 270 } 271 } 272 return lz4Codec; 273 } 274 275 @Override 276 public CompressionCodec reload(Configuration conf) { 277 synchronized (lock) { 278 lz4Codec = buildCodec(conf, this); 279 LOG.warn("Reloaded configuration for {}", name()); 280 return lz4Codec; 281 } 282 } 283 }, 284 BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) { 285 // Use base type to avoid compile-time dependencies. 286 private volatile transient CompressionCodec bzipCodec; 287 private final transient Object lock = new Object(); 288 289 @Override 290 CompressionCodec getCodec(Configuration conf) { 291 if (bzipCodec == null) { 292 synchronized (lock) { 293 if (bzipCodec == null) { 294 bzipCodec = buildCodec(conf, this); 295 } 296 } 297 } 298 return bzipCodec; 299 } 300 301 @Override 302 public CompressionCodec reload(Configuration conf) { 303 synchronized (lock) { 304 bzipCodec = buildCodec(conf, this); 305 LOG.warn("Reloaded configuration for {}", name()); 306 return bzipCodec; 307 } 308 } 309 }, 310 ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) { 311 // Use base type to avoid compile-time dependencies. 312 private volatile transient CompressionCodec zStandardCodec; 313 private final transient Object lock = new Object(); 314 315 @Override 316 CompressionCodec getCodec(Configuration conf) { 317 if (zStandardCodec == null) { 318 synchronized (lock) { 319 if (zStandardCodec == null) { 320 zStandardCodec = buildCodec(conf, this); 321 } 322 } 323 } 324 return zStandardCodec; 325 } 326 327 @Override 328 public CompressionCodec reload(Configuration conf) { 329 synchronized (lock) { 330 zStandardCodec = buildCodec(conf, this); 331 LOG.warn("Reloaded configuration for {}", name()); 332 return zStandardCodec; 333 } 334 } 335 }, 336 LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) { 337 // Use base type to avoid compile-time dependencies. 338 private volatile transient CompressionCodec lzmaCodec; 339 private final transient Object lock = new Object(); 340 341 @Override 342 CompressionCodec getCodec(Configuration conf) { 343 if (lzmaCodec == null) { 344 synchronized (lock) { 345 if (lzmaCodec == null) { 346 lzmaCodec = buildCodec(conf, this); 347 } 348 } 349 } 350 return lzmaCodec; 351 } 352 353 @Override 354 public CompressionCodec reload(Configuration conf) { 355 synchronized (lock) { 356 lzmaCodec = buildCodec(conf, this); 357 LOG.warn("Reloaded configuration for {}", name()); 358 return lzmaCodec; 359 } 360 } 361 }, 362 363 BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) { 364 // Use base type to avoid compile-time dependencies. 365 private volatile transient CompressionCodec brotliCodec; 366 private final transient Object lock = new Object(); 367 368 @Override 369 CompressionCodec getCodec(Configuration conf) { 370 if (brotliCodec == null) { 371 synchronized (lock) { 372 if (brotliCodec == null) { 373 brotliCodec = buildCodec(conf, this); 374 } 375 } 376 } 377 return brotliCodec; 378 } 379 380 @Override 381 public CompressionCodec reload(Configuration conf) { 382 synchronized (lock) { 383 brotliCodec = buildCodec(conf, this); 384 LOG.warn("Reloaded configuration for {}", name()); 385 return brotliCodec; 386 } 387 } 388 }; 389 390 private final Configuration conf; 391 private final String compressName; 392 private final String confKey; 393 private final String confDefault; 394 /** data input buffer size to absorb small reads from application. */ 395 private static final int DATA_IBUF_SIZE = 1 * 1024; 396 /** data output buffer size to absorb small writes from application. */ 397 private static final int DATA_OBUF_SIZE = 4 * 1024; 398 399 Algorithm(String name, String confKey, String confDefault) { 400 this.conf = HBaseConfiguration.create(); 401 this.conf.setBoolean("io.native.lib.available", true); 402 this.compressName = name; 403 this.confKey = confKey; 404 this.confDefault = confDefault; 405 } 406 407 abstract CompressionCodec getCodec(Configuration conf); 408 409 /** 410 * Reload configuration for the given algorithm. 411 * <p> 412 * NOTE: Experts only. This can only be done safely during process startup, before the 413 * algorithm's codecs are in use. If the codec implementation is changed, the new implementation 414 * may not be fully compatible with what was loaded at static initialization time, leading to 415 * potential data corruption. Mostly used by unit tests. 416 * @param conf configuration 417 */ 418 public abstract CompressionCodec reload(Configuration conf); 419 420 public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, 421 int downStreamBufferSize) throws IOException { 422 CompressionCodec codec = getCodec(conf); 423 // Set the internal buffer size to read from down stream. 424 if (downStreamBufferSize > 0) { 425 ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize); 426 } 427 CompressionInputStream cis = codec.createInputStream(downStream, decompressor); 428 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); 429 return bis2; 430 431 } 432 433 public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, 434 int downStreamBufferSize) throws IOException { 435 OutputStream bos1 = null; 436 if (downStreamBufferSize > 0) { 437 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); 438 } else { 439 bos1 = downStream; 440 } 441 CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor); 442 BufferedOutputStream bos2 = 443 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); 444 return bos2; 445 } 446 447 /** 448 * Creates a compression stream without any additional wrapping into buffering streams. 449 */ 450 public CompressionOutputStream createPlainCompressionStream(OutputStream downStream, 451 Compressor compressor) throws IOException { 452 CompressionCodec codec = getCodec(conf); 453 ((Configurable) codec).getConf().setInt("io.file.buffer.size", 32 * 1024); 454 return codec.createOutputStream(downStream, compressor); 455 } 456 457 public Compressor getCompressor() { 458 CompressionCodec codec = getCodec(conf); 459 if (codec != null) { 460 Compressor compressor = CodecPool.getCompressor(codec); 461 if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); 462 if (compressor != null) { 463 if (compressor.finished()) { 464 // Somebody returns the compressor to CodecPool but is still using it. 465 LOG.warn("Compressor obtained from CodecPool is already finished()"); 466 } 467 compressor.reset(); 468 } 469 return compressor; 470 } 471 return null; 472 } 473 474 public void returnCompressor(Compressor compressor) { 475 if (compressor != null) { 476 if (LOG.isTraceEnabled()) LOG.trace("Returning compressor " + compressor + " to pool."); 477 CodecPool.returnCompressor(compressor); 478 } 479 } 480 481 public Decompressor getDecompressor() { 482 CompressionCodec codec = getCodec(conf); 483 if (codec != null) { 484 Decompressor decompressor = CodecPool.getDecompressor(codec); 485 if (LOG.isTraceEnabled()) 486 LOG.trace("Retrieved decompressor " + decompressor + " from pool."); 487 if (decompressor != null) { 488 if (decompressor.finished()) { 489 // Somebody returns the decompressor to CodecPool but is still using it. 490 LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor); 491 } 492 decompressor.reset(); 493 } 494 return decompressor; 495 } 496 497 return null; 498 } 499 500 public void returnDecompressor(Decompressor decompressor) { 501 if (decompressor != null) { 502 if (LOG.isTraceEnabled()) LOG.trace("Returning decompressor " + decompressor + " to pool."); 503 CodecPool.returnDecompressor(decompressor); 504 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 505 if (LOG.isTraceEnabled()) LOG.trace("Ending decompressor " + decompressor); 506 decompressor.end(); 507 } 508 } 509 } 510 511 public String getName() { 512 return compressName; 513 } 514 } 515 516 public static Algorithm getCompressionAlgorithmByName(String compressName) { 517 Algorithm[] algos = Algorithm.class.getEnumConstants(); 518 519 for (Algorithm a : algos) { 520 if (a.getName().equals(compressName)) { 521 return a; 522 } 523 } 524 525 throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName); 526 } 527 528 /** 529 * Get names of supported compression algorithms. 530 * @return Array of strings, each represents a supported compression algorithm. Currently, the 531 * following compression algorithms are supported. 532 */ 533 public static String[] getSupportedAlgorithms() { 534 Algorithm[] algos = Algorithm.class.getEnumConstants(); 535 536 String[] ret = new String[algos.length]; 537 int i = 0; 538 for (Algorithm a : algos) { 539 ret[i++] = a.getName(); 540 } 541 542 return ret; 543 } 544 545 /** 546 * Load a codec implementation for an algorithm using the supplied configuration. 547 * @param conf the configuration to use 548 * @param algo the algorithm to implement 549 */ 550 private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) { 551 try { 552 String codecClassName = conf.get(algo.confKey, algo.confDefault); 553 if (codecClassName == null) { 554 throw new RuntimeException("No codec configured for " + algo.confKey); 555 } 556 Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName); 557 CompressionCodec codec = 558 (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration(conf)); 559 LOG.info("Loaded codec {} for compression algorithm {}", codec.getClass().getCanonicalName(), 560 algo.name()); 561 return codec; 562 } catch (ClassNotFoundException e) { 563 throw new RuntimeException(e); 564 } 565 } 566 567 public static void main(String[] args) throws Exception { 568 Configuration conf = HBaseConfiguration.create(); 569 java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>(); 570 for (Algorithm algo : Algorithm.class.getEnumConstants()) { 571 try { 572 implMap.put(algo.name(), algo.getCodec(conf)); 573 } catch (Exception e) { 574 // Ignore failures to load codec native implementations while building the report. 575 // We are to report what is configured. 576 } 577 } 578 for (Algorithm algo : Algorithm.class.getEnumConstants()) { 579 System.out.println(algo.name() + ":"); 580 System.out.println(" name: " + algo.getName()); 581 System.out.println(" confKey: " + algo.confKey); 582 System.out.println(" confDefault: " + algo.confDefault); 583 CompressionCodec codec = implMap.get(algo.name()); 584 System.out.println( 585 " implClass: " + (codec != null ? codec.getClass().getCanonicalName() : "<none>")); 586 } 587 } 588 589}