001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
021import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
022
023import java.io.IOException;
024import java.io.OutputStream;
025import java.security.Key;
026import java.security.SecureRandom;
027import java.util.concurrent.atomic.AtomicLong;
028
029import javax.crypto.spec.SecretKeySpec;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hadoop.hbase.codec.Codec;
040import org.apache.hadoop.hbase.io.crypto.Cipher;
041import org.apache.hadoop.hbase.io.crypto.Encryption;
042import org.apache.hadoop.hbase.io.crypto.Encryptor;
043import org.apache.hadoop.hbase.io.util.LRUDictionary;
044import org.apache.hadoop.hbase.security.EncryptionUtil;
045import org.apache.hadoop.hbase.security.User;
046import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
049import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
050import org.apache.hadoop.hbase.util.EncryptionTest;
051import org.apache.hadoop.hbase.util.FSUtils;
052
053/**
054 * Base class for Protobuf log writer.
055 */
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
057public abstract class AbstractProtobufLogWriter {
058
059  private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class);
060
061  protected CompressionContext compressionContext;
062  protected Configuration conf;
063  protected Codec.Encoder cellEncoder;
064  protected WALCellCodec.ByteStringCompressor compressor;
065  protected boolean trailerWritten;
066  protected WALTrailer trailer;
067  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
068  // than this size, it is written/read respectively, with a WARN message in the log.
069  protected int trailerWarnSize;
070
071  protected AtomicLong length = new AtomicLong();
072
073  private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
074      throws IOException {
075    return WALCellCodec.create(conf, null, compressionContext);
076  }
077
078  private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder builder) {
079    if (!builder.hasWriterClsName()) {
080      builder.setWriterClsName(getWriterClassName());
081    }
082    if (!builder.hasCellCodecClsName()) {
083      builder.setCellCodecClsName(
084          WALCellCodec.getWALCellCodecClass(conf).getName());
085    }
086    return builder.build();
087  }
088
089  protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
090      throws IOException {
091    return buildWALHeader0(conf, builder);
092  }
093
094  // should be called in sub classes's buildWALHeader method to build WALHeader for secure
095  // environment. Do not forget to override the setEncryptor method as it will be called in this
096  // method to init your encryptor.
097  protected final WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder)
098      throws IOException {
099    builder.setWriterClsName(getWriterClassName());
100    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
101      EncryptionTest.testKeyProvider(conf);
102      EncryptionTest.testCipherProvider(conf);
103
104      // Get an instance of our cipher
105      final String cipherName =
106          conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
107      Cipher cipher = Encryption.getCipher(conf, cipherName);
108      if (cipher == null) {
109        throw new RuntimeException("Cipher '" + cipherName + "' is not available");
110      }
111
112      // Generate an encryption key for this WAL
113      SecureRandom rng = new SecureRandom();
114      byte[] keyBytes = new byte[cipher.getKeyLength()];
115      rng.nextBytes(keyBytes);
116      Key key = new SecretKeySpec(keyBytes, cipher.getName());
117      builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf,
118          conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
119              conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
120                  User.getCurrent().getShortName())),
121          key)));
122
123      // Set up the encryptor
124      Encryptor encryptor = cipher.getEncryptor();
125      encryptor.setKey(key);
126      setEncryptor(encryptor);
127      if (LOG.isTraceEnabled()) {
128        LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
129      }
130    }
131    builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
132    return buildWALHeader0(conf, builder);
133  }
134
135  // override this if you need a encryptor
136  protected void setEncryptor(Encryptor encryptor) {
137  }
138
139  protected String getWriterClassName() {
140    return getClass().getSimpleName();
141  }
142
143  private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
144    boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
145    if (doCompress) {
146      try {
147        this.compressionContext = new CompressionContext(LRUDictionary.class,
148            FSUtils.isRecoveredEdits(path),
149            conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
150      } catch (Exception e) {
151        throw new IOException("Failed to initiate CompressionContext", e);
152      }
153    }
154    return doCompress;
155  }
156
157  public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
158      long blocksize) throws IOException, StreamLacksCapabilityException {
159    this.conf = conf;
160    boolean doCompress = initializeCompressionContext(conf, path);
161    this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
162    int bufferSize = FSUtils.getDefaultBufferSize(fs);
163    short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
164      FSUtils.getDefaultReplication(fs, path));
165
166    initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
167
168    boolean doTagCompress = doCompress
169        && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
170    length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
171      WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
172
173    initAfterHeader(doCompress);
174
175    // instantiate trailer to default value.
176    trailer = WALTrailer.newBuilder().build();
177    if (LOG.isTraceEnabled()) {
178      LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
179    }
180  }
181
182  private void initAfterHeader0(boolean doCompress) throws IOException {
183    WALCellCodec codec = getCodec(conf, this.compressionContext);
184    this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
185    if (doCompress) {
186      this.compressor = codec.getByteStringCompressor();
187    }
188  }
189
190  protected void initAfterHeader(boolean doCompress) throws IOException {
191    initAfterHeader0(doCompress);
192  }
193
194  // should be called in sub classes's initAfterHeader method to init SecureWALCellCodec.
195  protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor)
196      throws IOException {
197    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) {
198      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
199      this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
200      // We do not support compression
201      this.compressionContext = null;
202    } else {
203      initAfterHeader0(doCompress);
204    }
205  }
206
207  void setWALTrailer(WALTrailer walTrailer) {
208    this.trailer = walTrailer;
209  }
210
211  public long getLength() {
212    return length.get();
213  }
214
215  private WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
216    return builder.build();
217  }
218
219  protected void writeWALTrailer() {
220    try {
221      int trailerSize = 0;
222      if (this.trailer == null) {
223        // use default trailer.
224        LOG.warn("WALTrailer is null. Continuing with default.");
225        this.trailer = buildWALTrailer(WALTrailer.newBuilder());
226        trailerSize = this.trailer.getSerializedSize();
227      } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
228        // continue writing after warning the user.
229        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
230            + " > " + this.trailerWarnSize);
231      }
232      length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
233      this.trailerWritten = true;
234    } catch (IOException ioe) {
235      LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
236    }
237  }
238
239  protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
240      short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
241
242  /**
243   * return the file length after written.
244   */
245  protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException;
246
247  protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic)
248      throws IOException;
249
250  protected abstract OutputStream getOutputStreamForCellEncoder();
251}