001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
021import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
022
023import java.io.IOException;
024import java.io.OutputStream;
025import java.security.Key;
026import java.util.concurrent.atomic.AtomicLong;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.codec.Codec;
033import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
034import org.apache.hadoop.hbase.io.compress.Compression;
035import org.apache.hadoop.hbase.io.crypto.Cipher;
036import org.apache.hadoop.hbase.io.crypto.Encryption;
037import org.apache.hadoop.hbase.io.crypto.Encryptor;
038import org.apache.hadoop.hbase.io.util.LRUDictionary;
039import org.apache.hadoop.hbase.security.EncryptionUtil;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.util.CommonFSUtils;
042import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
043import org.apache.hadoop.hbase.util.EncryptionTest;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
049
050import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
052
053/**
054 * Base class for Protobuf log writer.
055 */
056@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
057public abstract class AbstractProtobufLogWriter {
058
059  private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class);
060
061  protected CompressionContext compressionContext;
062  protected Configuration conf;
063  protected Codec.Encoder cellEncoder;
064  protected WALCellCodec.ByteStringCompressor compressor;
065  protected boolean trailerWritten;
066  protected WALTrailer trailer;
067  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
068  // than this size, it is written/read respectively, with a WARN message in the log.
069  protected int trailerWarnSize;
070
071  protected AtomicLong length = new AtomicLong();
072
073  private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
074    throws IOException {
075    return WALCellCodec.create(conf, null, compressionContext);
076  }
077
078  private WALHeader buildWALHeader0(Configuration conf, WALHeader.Builder builder) {
079    if (!builder.hasWriterClsName()) {
080      builder.setWriterClsName(getWriterClassName());
081    }
082    if (!builder.hasCellCodecClsName()) {
083      builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName());
084    }
085    return builder.build();
086  }
087
088  protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
089    throws IOException {
090    return buildWALHeader0(conf, builder);
091  }
092
093  // should be called in sub classes's buildWALHeader method to build WALHeader for secure
094  // environment. Do not forget to override the setEncryptor method as it will be called in this
095  // method to init your encryptor.
096  protected final WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder)
097    throws IOException {
098    builder.setWriterClsName(getWriterClassName());
099    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
100      EncryptionTest.testKeyProvider(conf);
101      EncryptionTest.testCipherProvider(conf);
102
103      // Get an instance of our cipher
104      final String cipherName =
105        conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
106      Cipher cipher = Encryption.getCipher(conf, cipherName);
107      if (cipher == null) {
108        throw new RuntimeException("Cipher '" + cipherName + "' is not available");
109      }
110
111      // Generate a random encryption key for this WAL
112      Key key = cipher.getRandomKey();
113      builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf,
114        conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
115          conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName())),
116        key)));
117
118      // Set up the encryptor
119      Encryptor encryptor = cipher.getEncryptor();
120      encryptor.setKey(key);
121      setEncryptor(encryptor);
122      if (LOG.isTraceEnabled()) {
123        LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
124      }
125    }
126    builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
127    return buildWALHeader0(conf, builder);
128  }
129
130  // override this if you need a encryptor
131  protected void setEncryptor(Encryptor encryptor) {
132  }
133
134  protected String getWriterClassName() {
135    return getClass().getSimpleName();
136  }
137
138  private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
139    boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
140    if (doCompress) {
141      try {
142        final boolean useTagCompression =
143          conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
144        final boolean useValueCompression =
145          conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
146        final Compression.Algorithm valueCompressionType = useValueCompression
147          ? CompressionContext.getValueCompressionAlgorithm(conf)
148          : Compression.Algorithm.NONE;
149        if (LOG.isTraceEnabled()) {
150          LOG.trace(
151            "Initializing compression context for {}: isRecoveredEdits={}"
152              + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
153            path, CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression,
154            valueCompressionType);
155        }
156        this.compressionContext =
157          new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
158            useTagCompression, useValueCompression, valueCompressionType);
159      } catch (Exception e) {
160        throw new IOException("Failed to initiate CompressionContext", e);
161      }
162    }
163    return doCompress;
164  }
165
166  public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
167    long blocksize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException {
168    try {
169      this.conf = conf;
170      boolean doCompress = initializeCompressionContext(conf, path);
171      this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
172      int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
173      short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
174        CommonFSUtils.getDefaultReplication(fs, path));
175
176      initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
177
178      boolean doTagCompress =
179        doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
180      boolean doValueCompress =
181        doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
182      WALHeader.Builder headerBuilder = WALHeader.newBuilder().setHasCompression(doCompress)
183        .setHasTagCompression(doTagCompress).setHasValueCompression(doValueCompress);
184      if (doValueCompress) {
185        headerBuilder.setValueCompressionAlgorithm(
186          CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
187      }
188      length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
189        buildWALHeader(conf, headerBuilder)));
190
191      initAfterHeader(doCompress);
192
193      // instantiate trailer to default value.
194      trailer = WALTrailer.newBuilder().build();
195
196      if (LOG.isTraceEnabled()) {
197        LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}"
198          + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
199      }
200    } catch (Exception e) {
201      LOG.warn("Init output failed, path={}", path, e);
202      closeOutputIfNecessary();
203      throw e;
204    }
205  }
206
207  private void initAfterHeader0(boolean doCompress) throws IOException {
208    WALCellCodec codec = getCodec(conf, this.compressionContext);
209    this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
210    if (doCompress) {
211      this.compressor = codec.getByteStringCompressor();
212    } else {
213      this.compressor = WALCellCodec.getNoneCompressor();
214    }
215  }
216
217  protected void initAfterHeader(boolean doCompress) throws IOException {
218    initAfterHeader0(doCompress);
219  }
220
221  // should be called in sub classes's initAfterHeader method to init SecureWALCellCodec.
222  protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor)
223    throws IOException {
224    if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) {
225      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
226      this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
227      // We do not support compression
228      this.compressionContext = null;
229      this.compressor = WALCellCodec.getNoneCompressor();
230    } else {
231      initAfterHeader0(doCompress);
232    }
233  }
234
235  void setWALTrailer(WALTrailer walTrailer) {
236    this.trailer = walTrailer;
237  }
238
239  public long getLength() {
240    return length.get();
241  }
242
243  private WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
244    return builder.build();
245  }
246
247  protected void writeWALTrailer() {
248    try {
249      int trailerSize = 0;
250      if (this.trailer == null) {
251        // use default trailer.
252        LOG.warn("WALTrailer is null. Continuing with default.");
253        this.trailer = buildWALTrailer(WALTrailer.newBuilder());
254        trailerSize = this.trailer.getSerializedSize();
255      } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
256        // continue writing after warning the user.
257        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
258          + " > " + this.trailerWarnSize);
259      }
260      length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
261      this.trailerWritten = true;
262    } catch (IOException ioe) {
263      LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
264    }
265  }
266
267  protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
268    short replication, long blockSize, StreamSlowMonitor monitor)
269    throws IOException, StreamLacksCapabilityException;
270
271  /**
272   * It is straight forward to close the output, do not need to write trailer like the Writer.close
273   */
274  protected void closeOutputIfNecessary() {
275  }
276
277  /**
278   * return the file length after written.
279   */
280  protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException;
281
282  protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic)
283    throws IOException;
284
285  protected abstract OutputStream getOutputStreamForCellEncoder();
286}