001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; 021import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; 022import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE; 023import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC; 024import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC; 025import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE; 026 027import java.io.IOException; 028import java.io.OutputStream; 029import java.security.Key; 030import java.util.concurrent.atomic.AtomicLong; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.codec.Codec; 036import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 037import org.apache.hadoop.hbase.io.compress.Compression; 038import org.apache.hadoop.hbase.io.crypto.Cipher; 039import org.apache.hadoop.hbase.io.crypto.Encryption; 040import org.apache.hadoop.hbase.io.crypto.Encryptor; 041import org.apache.hadoop.hbase.io.util.LRUDictionary; 042import org.apache.hadoop.hbase.security.EncryptionUtil; 043import org.apache.hadoop.hbase.security.User; 044import org.apache.hadoop.hbase.util.CommonFSUtils; 045import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; 046import org.apache.hadoop.hbase.util.EncryptionTest; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; 055 056/** 057 * Base class for Protobuf log writer. 058 */ 059@InterfaceAudience.Private 060public abstract class AbstractProtobufLogWriter { 061 062 private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class); 063 064 protected CompressionContext compressionContext; 065 protected Configuration conf; 066 protected Encryptor encryptor; 067 protected Codec.Encoder cellEncoder; 068 protected WALCellCodec.ByteStringCompressor compressor; 069 protected boolean trailerWritten; 070 protected WALTrailer trailer; 071 // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger 072 // than this size, it is written/read respectively, with a WARN message in the log. 073 protected int trailerWarnSize; 074 075 protected AtomicLong length = new AtomicLong(); 076 077 private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) 078 throws IOException { 079 return WALCellCodec.create(conf, null, compressionContext); 080 } 081 082 private WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) 083 throws IOException { 084 builder.setWriterClsName(getWriterClassName()); 085 builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName()); 086 return builder.build(); 087 } 088 089 private WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder) 090 throws IOException { 091 EncryptionTest.testKeyProvider(conf); 092 EncryptionTest.testCipherProvider(conf); 093 094 // Get an instance of our cipher 095 final String cipherName = 096 conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); 097 Cipher cipher = Encryption.getCipher(conf, cipherName); 098 if (cipher == null) { 099 throw new RuntimeException("Cipher '" + cipherName + "' is not available"); 100 } 101 102 // Generate a random encryption key for this WAL 103 Key key = cipher.getRandomKey(); 104 builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf, 105 conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY, 106 conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName())), 107 key))); 108 109 // Set up the encryptor 110 Encryptor encryptor = cipher.getEncryptor(); 111 encryptor.setKey(key); 112 this.encryptor = encryptor; 113 if (LOG.isTraceEnabled()) { 114 LOG.trace("Initialized secure protobuf WAL: cipher={}", cipher.getName()); 115 } 116 builder.setWriterClsName(getWriterClassName()); 117 builder.setCellCodecClsName(SecureWALCellCodec.class.getName()); 118 return builder.build(); 119 } 120 121 private String getWriterClassName() { 122 // class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error: 123 // IOException: Got unknown writer class: AsyncProtobufLogWriter 124 if (encryptor == null) { 125 return "ProtobufLogWriter"; 126 } else { 127 return "SecureProtobufLogWriter"; 128 } 129 } 130 131 private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { 132 boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); 133 if (doCompress) { 134 try { 135 final boolean useTagCompression = 136 conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); 137 final boolean useValueCompression = 138 conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); 139 final Compression.Algorithm valueCompressionType = useValueCompression 140 ? CompressionContext.getValueCompressionAlgorithm(conf) 141 : Compression.Algorithm.NONE; 142 if (LOG.isTraceEnabled()) { 143 LOG.trace( 144 "Initializing compression context for {}: isRecoveredEdits={}" 145 + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", 146 path, CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression, 147 valueCompressionType); 148 } 149 this.compressionContext = 150 new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), 151 useTagCompression, useValueCompression, valueCompressionType); 152 } catch (Exception e) { 153 throw new IOException("Failed to initiate CompressionContext", e); 154 } 155 } 156 return doCompress; 157 } 158 159 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, 160 long blocksize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException { 161 try { 162 this.conf = conf; 163 boolean doCompress = initializeCompressionContext(conf, path); 164 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); 165 int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); 166 short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", 167 CommonFSUtils.getDefaultReplication(fs, path)); 168 boolean noLocalWrite = 169 conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); 170 171 initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite); 172 173 boolean doTagCompress = 174 doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); 175 boolean doValueCompress = 176 doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); 177 WALHeader.Builder headerBuilder = WALHeader.newBuilder().setHasCompression(doCompress) 178 .setHasTagCompression(doTagCompress).setHasValueCompression(doValueCompress); 179 if (doValueCompress) { 180 headerBuilder.setValueCompressionAlgorithm( 181 CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); 182 } 183 if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { 184 length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildSecureWALHeader(conf, headerBuilder))); 185 secureInitAfterHeader(doCompress, encryptor); 186 } else { 187 length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder))); 188 initAfterHeader(doCompress); 189 } 190 191 // instantiate trailer to default value. 192 trailer = WALTrailer.newBuilder().build(); 193 194 if (LOG.isTraceEnabled()) { 195 LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" 196 + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); 197 } 198 } catch (Exception e) { 199 LOG.warn("Init output failed, path={}", path, e); 200 closeOutputIfNecessary(); 201 throw e; 202 } 203 } 204 205 private void initAfterHeader(boolean doCompress) throws IOException { 206 WALCellCodec codec = getCodec(conf, this.compressionContext); 207 this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); 208 if (doCompress) { 209 this.compressor = codec.getByteStringCompressor(); 210 } else { 211 this.compressor = WALCellCodec.getNoneCompressor(); 212 } 213 } 214 215 private void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) throws IOException { 216 if (encryptor != null) { 217 WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor); 218 this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); 219 // We do not support compression 220 this.compressionContext = null; 221 this.compressor = WALCellCodec.getNoneCompressor(); 222 } else { 223 initAfterHeader(doCompress); 224 } 225 } 226 227 void setWALTrailer(WALTrailer walTrailer) { 228 this.trailer = walTrailer; 229 } 230 231 public long getLength() { 232 return length.get(); 233 } 234 235 private WALTrailer buildWALTrailer(WALTrailer.Builder builder) { 236 return builder.build(); 237 } 238 239 protected final void writeWALTrailer() { 240 try { 241 int trailerSize = 0; 242 if (this.trailer == null) { 243 // use default trailer. 244 LOG.warn("WALTrailer is null. Continuing with default."); 245 this.trailer = buildWALTrailer(WALTrailer.newBuilder()); 246 trailerSize = this.trailer.getSerializedSize(); 247 } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) { 248 // continue writing after warning the user. 249 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize 250 + " > " + this.trailerWarnSize); 251 } 252 length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC)); 253 this.trailerWritten = true; 254 } catch (IOException ioe) { 255 LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe); 256 } 257 } 258 259 protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, 260 short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) 261 throws IOException, StreamLacksCapabilityException; 262 263 /** 264 * It is straight forward to close the output, do not need to write trailer like the Writer.close 265 */ 266 protected void closeOutputIfNecessary() { 267 } 268 269 /** 270 * return the file length after written. 271 */ 272 protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException; 273 274 protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) 275 throws IOException; 276 277 protected abstract OutputStream getOutputStreamForCellEncoder(); 278}