001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; 021import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; 022 023import java.io.IOException; 024import java.io.OutputStream; 025import java.security.Key; 026import java.security.SecureRandom; 027import java.util.concurrent.atomic.AtomicLong; 028 029import javax.crypto.spec.SecretKeySpec; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HBaseInterfaceAudience; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.apache.hadoop.hbase.codec.Codec; 040import org.apache.hadoop.hbase.io.crypto.Cipher; 041import org.apache.hadoop.hbase.io.crypto.Encryption; 042import org.apache.hadoop.hbase.io.crypto.Encryptor; 043import org.apache.hadoop.hbase.io.util.LRUDictionary; 044import org.apache.hadoop.hbase.security.EncryptionUtil; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 049import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 050import org.apache.hadoop.hbase.util.EncryptionTest; 051import org.apache.hadoop.hbase.util.FSUtils; 052 053/** 054 * Base class for Protobuf log writer. 055 */ 056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 057public abstract class AbstractProtobufLogWriter { 058 059 private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class); 060 061 protected CompressionContext compressionContext; 062 protected Configuration conf; 063 protected Codec.Encoder cellEncoder; 064 protected WALCellCodec.ByteStringCompressor compressor; 065 protected boolean trailerWritten; 066 protected WALTrailer trailer; 067 // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger 068 // than this size, it is written/read respectively, with a WARN message in the log. 069 protected int trailerWarnSize; 070 071 protected AtomicLong length = new AtomicLong(); 072 073 private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) 074 throws IOException { 075 return WALCellCodec.create(conf, null, compressionContext); 076 } 077 078 private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder builder) { 079 if (!builder.hasWriterClsName()) { 080 builder.setWriterClsName(getWriterClassName()); 081 } 082 if (!builder.hasCellCodecClsName()) { 083 builder.setCellCodecClsName( 084 WALCellCodec.getWALCellCodecClass(conf).getName()); 085 } 086 return builder.build(); 087 } 088 089 protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) 090 throws IOException { 091 return buildWALHeader0(conf, builder); 092 } 093 094 // should be called in sub classes's buildWALHeader method to build WALHeader for secure 095 // environment. Do not forget to override the setEncryptor method as it will be called in this 096 // method to init your encryptor. 097 protected final WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder) 098 throws IOException { 099 builder.setWriterClsName(getWriterClassName()); 100 if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { 101 EncryptionTest.testKeyProvider(conf); 102 EncryptionTest.testCipherProvider(conf); 103 104 // Get an instance of our cipher 105 final String cipherName = 106 conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); 107 Cipher cipher = Encryption.getCipher(conf, cipherName); 108 if (cipher == null) { 109 throw new RuntimeException("Cipher '" + cipherName + "' is not available"); 110 } 111 112 // Generate an encryption key for this WAL 113 SecureRandom rng = new SecureRandom(); 114 byte[] keyBytes = new byte[cipher.getKeyLength()]; 115 rng.nextBytes(keyBytes); 116 Key key = new SecretKeySpec(keyBytes, cipher.getName()); 117 builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf, 118 conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY, 119 conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, 120 User.getCurrent().getShortName())), 121 key))); 122 123 // Set up the encryptor 124 Encryptor encryptor = cipher.getEncryptor(); 125 encryptor.setKey(key); 126 setEncryptor(encryptor); 127 if (LOG.isTraceEnabled()) { 128 LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName()); 129 } 130 } 131 builder.setCellCodecClsName(SecureWALCellCodec.class.getName()); 132 return buildWALHeader0(conf, builder); 133 } 134 135 // override this if you need a encryptor 136 protected void setEncryptor(Encryptor encryptor) { 137 } 138 139 protected String getWriterClassName() { 140 return getClass().getSimpleName(); 141 } 142 143 private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { 144 boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); 145 if (doCompress) { 146 try { 147 this.compressionContext = new CompressionContext(LRUDictionary.class, 148 FSUtils.isRecoveredEdits(path), 149 conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); 150 } catch (Exception e) { 151 throw new IOException("Failed to initiate CompressionContext", e); 152 } 153 } 154 return doCompress; 155 } 156 157 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, 158 long blocksize) throws IOException, StreamLacksCapabilityException { 159 this.conf = conf; 160 boolean doCompress = initializeCompressionContext(conf, path); 161 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); 162 int bufferSize = FSUtils.getDefaultBufferSize(fs); 163 short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", 164 FSUtils.getDefaultReplication(fs, path)); 165 166 initOutput(fs, path, overwritable, bufferSize, replication, blocksize); 167 168 boolean doTagCompress = doCompress 169 && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); 170 length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, 171 WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); 172 173 initAfterHeader(doCompress); 174 175 // instantiate trailer to default value. 176 trailer = WALTrailer.newBuilder().build(); 177 if (LOG.isTraceEnabled()) { 178 LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); 179 } 180 } 181 182 private void initAfterHeader0(boolean doCompress) throws IOException { 183 WALCellCodec codec = getCodec(conf, this.compressionContext); 184 this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); 185 if (doCompress) { 186 this.compressor = codec.getByteStringCompressor(); 187 } 188 } 189 190 protected void initAfterHeader(boolean doCompress) throws IOException { 191 initAfterHeader0(doCompress); 192 } 193 194 // should be called in sub classes's initAfterHeader method to init SecureWALCellCodec. 195 protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) 196 throws IOException { 197 if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) { 198 WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor); 199 this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); 200 // We do not support compression 201 this.compressionContext = null; 202 } else { 203 initAfterHeader0(doCompress); 204 } 205 } 206 207 void setWALTrailer(WALTrailer walTrailer) { 208 this.trailer = walTrailer; 209 } 210 211 public long getLength() { 212 return length.get(); 213 } 214 215 private WALTrailer buildWALTrailer(WALTrailer.Builder builder) { 216 return builder.build(); 217 } 218 219 protected void writeWALTrailer() { 220 try { 221 int trailerSize = 0; 222 if (this.trailer == null) { 223 // use default trailer. 224 LOG.warn("WALTrailer is null. Continuing with default."); 225 this.trailer = buildWALTrailer(WALTrailer.newBuilder()); 226 trailerSize = this.trailer.getSerializedSize(); 227 } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) { 228 // continue writing after warning the user. 229 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize 230 + " > " + this.trailerWarnSize); 231 } 232 length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC)); 233 this.trailerWritten = true; 234 } catch (IOException ioe) { 235 LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe); 236 } 237 } 238 239 protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 240 short replication, long blockSize) throws IOException, StreamLacksCapabilityException; 241 242 /** 243 * return the file length after written. 244 */ 245 protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException; 246 247 protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) 248 throws IOException; 249 250 protected abstract OutputStream getOutputStreamForCellEncoder(); 251}