001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; 021import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; 022 023import java.io.IOException; 024import java.io.OutputStream; 025import java.security.Key; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseInterfaceAudience; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.codec.Codec; 033import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 034import org.apache.hadoop.hbase.io.compress.Compression; 035import org.apache.hadoop.hbase.io.crypto.Cipher; 036import org.apache.hadoop.hbase.io.crypto.Encryption; 037import org.apache.hadoop.hbase.io.crypto.Encryptor; 038import org.apache.hadoop.hbase.io.util.LRUDictionary; 039import org.apache.hadoop.hbase.security.EncryptionUtil; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.util.CommonFSUtils; 042import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 043import org.apache.hadoop.hbase.util.EncryptionTest; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 049 050import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 052 053/** 054 * Base class for Protobuf log writer. 055 */ 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 057public abstract class AbstractProtobufLogWriter { 058 059 private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class); 060 061 protected CompressionContext compressionContext; 062 protected Configuration conf; 063 protected Codec.Encoder cellEncoder; 064 protected WALCellCodec.ByteStringCompressor compressor; 065 protected boolean trailerWritten; 066 protected WALTrailer trailer; 067 // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger 068 // than this size, it is written/read respectively, with a WARN message in the log. 069 protected int trailerWarnSize; 070 071 protected AtomicLong length = new AtomicLong(); 072 073 private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) 074 throws IOException { 075 return WALCellCodec.create(conf, null, compressionContext); 076 } 077 078 private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder builder) { 079 if (!builder.hasWriterClsName()) { 080 builder.setWriterClsName(getWriterClassName()); 081 } 082 if (!builder.hasCellCodecClsName()) { 083 builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName()); 084 } 085 return builder.build(); 086 } 087 088 protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) 089 throws IOException { 090 return buildWALHeader0(conf, builder); 091 } 092 093 // should be called in sub classes's buildWALHeader method to build WALHeader for secure 094 // environment. Do not forget to override the setEncryptor method as it will be called in this 095 // method to init your encryptor. 096 protected final WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder) 097 throws IOException { 098 builder.setWriterClsName(getWriterClassName()); 099 if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { 100 EncryptionTest.testKeyProvider(conf); 101 EncryptionTest.testCipherProvider(conf); 102 103 // Get an instance of our cipher 104 final String cipherName = 105 conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); 106 Cipher cipher = Encryption.getCipher(conf, cipherName); 107 if (cipher == null) { 108 throw new RuntimeException("Cipher '" + cipherName + "' is not available"); 109 } 110 111 // Generate a random encryption key for this WAL 112 Key key = cipher.getRandomKey(); 113 builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf, 114 conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY, 115 conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName())), 116 key))); 117 118 // Set up the encryptor 119 Encryptor encryptor = cipher.getEncryptor(); 120 encryptor.setKey(key); 121 setEncryptor(encryptor); 122 if (LOG.isTraceEnabled()) { 123 LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName()); 124 } 125 } 126 builder.setCellCodecClsName(SecureWALCellCodec.class.getName()); 127 return buildWALHeader0(conf, builder); 128 } 129 130 // override this if you need a encryptor 131 protected void setEncryptor(Encryptor encryptor) { 132 } 133 134 protected String getWriterClassName() { 135 return getClass().getSimpleName(); 136 } 137 138 private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { 139 boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); 140 if (doCompress) { 141 try { 142 final boolean useTagCompression = 143 conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); 144 final boolean useValueCompression = 145 conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); 146 final Compression.Algorithm valueCompressionType = useValueCompression 147 ? CompressionContext.getValueCompressionAlgorithm(conf) 148 : Compression.Algorithm.NONE; 149 if (LOG.isTraceEnabled()) { 150 LOG.trace( 151 "Initializing compression context for {}: isRecoveredEdits={}" 152 + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", 153 path, CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression, 154 valueCompressionType); 155 } 156 this.compressionContext = 157 new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), 158 useTagCompression, useValueCompression, valueCompressionType); 159 } catch (Exception e) { 160 throw new IOException("Failed to initiate CompressionContext", e); 161 } 162 } 163 return doCompress; 164 } 165 166 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, 167 long blocksize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException { 168 try { 169 this.conf = conf; 170 boolean doCompress = initializeCompressionContext(conf, path); 171 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); 172 int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); 173 short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", 174 CommonFSUtils.getDefaultReplication(fs, path)); 175 176 initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); 177 178 boolean doTagCompress = 179 doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); 180 boolean doValueCompress = 181 doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); 182 WALHeader.Builder headerBuilder = WALHeader.newBuilder().setHasCompression(doCompress) 183 .setHasTagCompression(doTagCompress).setHasValueCompression(doValueCompress); 184 if (doValueCompress) { 185 headerBuilder.setValueCompressionAlgorithm( 186 CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); 187 } 188 length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, 189 buildWALHeader(conf, headerBuilder))); 190 191 initAfterHeader(doCompress); 192 193 // instantiate trailer to default value. 194 trailer = WALTrailer.newBuilder().build(); 195 196 if (LOG.isTraceEnabled()) { 197 LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" 198 + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); 199 } 200 } catch (Exception e) { 201 LOG.warn("Init output failed, path={}", path, e); 202 closeOutputIfNecessary(); 203 throw e; 204 } 205 } 206 207 private void initAfterHeader0(boolean doCompress) throws IOException { 208 WALCellCodec codec = getCodec(conf, this.compressionContext); 209 this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); 210 if (doCompress) { 211 this.compressor = codec.getByteStringCompressor(); 212 } else { 213 this.compressor = WALCellCodec.getNoneCompressor(); 214 } 215 } 216 217 protected void initAfterHeader(boolean doCompress) throws IOException { 218 initAfterHeader0(doCompress); 219 } 220 221 // should be called in sub classes's initAfterHeader method to init SecureWALCellCodec. 222 protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) 223 throws IOException { 224 if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) { 225 WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor); 226 this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); 227 // We do not support compression 228 this.compressionContext = null; 229 this.compressor = WALCellCodec.getNoneCompressor(); 230 } else { 231 initAfterHeader0(doCompress); 232 } 233 } 234 235 void setWALTrailer(WALTrailer walTrailer) { 236 this.trailer = walTrailer; 237 } 238 239 public long getLength() { 240 return length.get(); 241 } 242 243 private WALTrailer buildWALTrailer(WALTrailer.Builder builder) { 244 return builder.build(); 245 } 246 247 protected void writeWALTrailer() { 248 try { 249 int trailerSize = 0; 250 if (this.trailer == null) { 251 // use default trailer. 252 LOG.warn("WALTrailer is null. Continuing with default."); 253 this.trailer = buildWALTrailer(WALTrailer.newBuilder()); 254 trailerSize = this.trailer.getSerializedSize(); 255 } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) { 256 // continue writing after warning the user. 257 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize 258 + " > " + this.trailerWarnSize); 259 } 260 length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC)); 261 this.trailerWritten = true; 262 } catch (IOException ioe) { 263 LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe); 264 } 265 } 266 267 protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 268 short replication, long blockSize, StreamSlowMonitor monitor) 269 throws IOException, StreamLacksCapabilityException; 270 271 /** 272 * It is straight forward to close the output, do not need to write trailer like the Writer.close 273 */ 274 protected void closeOutputIfNecessary() { 275 } 276 277 /** 278 * return the file length after written. 279 */ 280 protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException; 281 282 protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) 283 throws IOException; 284 285 protected abstract OutputStream getOutputStreamForCellEncoder(); 286}