001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
021import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;
022import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
023import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
024import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
025import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE;
026
027import java.io.IOException;
028import java.io.OutputStream;
029import java.security.Key;
030import java.util.concurrent.atomic.AtomicLong;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.codec.Codec;
036import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
037import org.apache.hadoop.hbase.io.compress.Compression;
038import org.apache.hadoop.hbase.io.crypto.Cipher;
039import org.apache.hadoop.hbase.io.crypto.Encryption;
040import org.apache.hadoop.hbase.io.crypto.Encryptor;
041import org.apache.hadoop.hbase.io.util.LRUDictionary;
042import org.apache.hadoop.hbase.security.EncryptionUtil;
043import org.apache.hadoop.hbase.security.User;
044import org.apache.hadoop.hbase.util.CommonFSUtils;
045import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
046import org.apache.hadoop.hbase.util.EncryptionTest;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
055
056/**
057 * Base class for Protobuf log writer.
058 */
059@InterfaceAudience.Private
060public abstract class AbstractProtobufLogWriter {
061
062  private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufLogWriter.class);
063
064  protected CompressionContext compressionContext;
065  protected Configuration conf;
066  protected Encryptor encryptor;
067  protected Codec.Encoder cellEncoder;
068  protected WALCellCodec.ByteStringCompressor compressor;
069  protected boolean trailerWritten;
070  protected WALTrailer trailer;
071  // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
072  // than this size, it is written/read respectively, with a WARN message in the log.
073  protected int trailerWarnSize;
074
075  protected AtomicLong length = new AtomicLong();
076
077  private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext)
078    throws IOException {
079    return WALCellCodec.create(conf, null, compressionContext);
080  }
081
082  private WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder)
083    throws IOException {
084    builder.setWriterClsName(getWriterClassName());
085    builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf).getName());
086    return builder.build();
087  }
088
089  private WALHeader buildSecureWALHeader(Configuration conf, WALHeader.Builder builder)
090    throws IOException {
091    EncryptionTest.testKeyProvider(conf);
092    EncryptionTest.testCipherProvider(conf);
093
094    // Get an instance of our cipher
095    final String cipherName =
096      conf.get(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
097    Cipher cipher = Encryption.getCipher(conf, cipherName);
098    if (cipher == null) {
099      throw new RuntimeException("Cipher '" + cipherName + "' is not available");
100    }
101
102    // Generate a random encryption key for this WAL
103    Key key = cipher.getRandomKey();
104    builder.setEncryptionKey(UnsafeByteOperations.unsafeWrap(EncryptionUtil.wrapKey(conf,
105      conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY,
106        conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName())),
107      key)));
108
109    // Set up the encryptor
110    Encryptor encryptor = cipher.getEncryptor();
111    encryptor.setKey(key);
112    this.encryptor = encryptor;
113    if (LOG.isTraceEnabled()) {
114      LOG.trace("Initialized secure protobuf WAL: cipher={}", cipher.getName());
115    }
116    builder.setWriterClsName(getWriterClassName());
117    builder.setCellCodecClsName(SecureWALCellCodec.class.getName());
118    return builder.build();
119  }
120
121  private String getWriterClassName() {
122    // class name which is recognized by hbase-1.x to avoid ProtobufLogReader throwing error:
123    // IOException: Got unknown writer class: AsyncProtobufLogWriter
124    if (encryptor == null) {
125      return "ProtobufLogWriter";
126    } else {
127      return "SecureProtobufLogWriter";
128    }
129  }
130
131  private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException {
132    boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
133    if (doCompress) {
134      try {
135        final boolean useTagCompression =
136          conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
137        final boolean useValueCompression =
138          conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
139        final Compression.Algorithm valueCompressionType = useValueCompression
140          ? CompressionContext.getValueCompressionAlgorithm(conf)
141          : Compression.Algorithm.NONE;
142        if (LOG.isTraceEnabled()) {
143          LOG.trace(
144            "Initializing compression context for {}: isRecoveredEdits={}"
145              + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
146            path, CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression,
147            valueCompressionType);
148        }
149        this.compressionContext =
150          new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
151            useTagCompression, useValueCompression, valueCompressionType);
152      } catch (Exception e) {
153        throw new IOException("Failed to initiate CompressionContext", e);
154      }
155    }
156    return doCompress;
157  }
158
159  public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
160    long blocksize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException {
161    try {
162      this.conf = conf;
163      boolean doCompress = initializeCompressionContext(conf, path);
164      this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
165      int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
166      short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
167        CommonFSUtils.getDefaultReplication(fs, path));
168      boolean noLocalWrite =
169        conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT);
170
171      initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite);
172
173      boolean doTagCompress =
174        doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
175      boolean doValueCompress =
176        doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
177      WALHeader.Builder headerBuilder = WALHeader.newBuilder().setHasCompression(doCompress)
178        .setHasTagCompression(doTagCompress).setHasValueCompression(doValueCompress);
179      if (doValueCompress) {
180        headerBuilder.setValueCompressionAlgorithm(
181          CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
182      }
183      if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) {
184        length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildSecureWALHeader(conf, headerBuilder)));
185        secureInitAfterHeader(doCompress, encryptor);
186      } else {
187        length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
188        initAfterHeader(doCompress);
189      }
190
191      // instantiate trailer to default value.
192      trailer = WALTrailer.newBuilder().build();
193
194      if (LOG.isTraceEnabled()) {
195        LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}"
196          + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
197      }
198    } catch (Exception e) {
199      LOG.warn("Init output failed, path={}", path, e);
200      closeOutputIfNecessary();
201      throw e;
202    }
203  }
204
205  private void initAfterHeader(boolean doCompress) throws IOException {
206    WALCellCodec codec = getCodec(conf, this.compressionContext);
207    this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
208    if (doCompress) {
209      this.compressor = codec.getByteStringCompressor();
210    } else {
211      this.compressor = WALCellCodec.getNoneCompressor();
212    }
213  }
214
215  private void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) throws IOException {
216    if (encryptor != null) {
217      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor);
218      this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
219      // We do not support compression
220      this.compressionContext = null;
221      this.compressor = WALCellCodec.getNoneCompressor();
222    } else {
223      initAfterHeader(doCompress);
224    }
225  }
226
227  void setWALTrailer(WALTrailer walTrailer) {
228    this.trailer = walTrailer;
229  }
230
231  public long getLength() {
232    return length.get();
233  }
234
235  private WALTrailer buildWALTrailer(WALTrailer.Builder builder) {
236    return builder.build();
237  }
238
239  protected final void writeWALTrailer() {
240    try {
241      int trailerSize = 0;
242      if (this.trailer == null) {
243        // use default trailer.
244        LOG.warn("WALTrailer is null. Continuing with default.");
245        this.trailer = buildWALTrailer(WALTrailer.newBuilder());
246        trailerSize = this.trailer.getSerializedSize();
247      } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
248        // continue writing after warning the user.
249        LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
250          + " > " + this.trailerWarnSize);
251      }
252      length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC));
253      this.trailerWritten = true;
254    } catch (IOException ioe) {
255      LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
256    }
257  }
258
259  protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
260    short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite)
261    throws IOException, StreamLacksCapabilityException;
262
263  /**
264   * It is straight forward to close the output, do not need to write trailer like the Writer.close
265   */
266  protected void closeOutputIfNecessary() {
267  }
268
269  /**
270   * return the file length after written.
271   */
272  protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException;
273
274  protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic)
275    throws IOException;
276
277  protected abstract OutputStream getOutputStreamForCellEncoder();
278}