001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver.wal;
020
021import java.io.IOException;
022import java.security.Key;
023import java.security.KeyException;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.io.crypto.Cipher;
034import org.apache.hadoop.hbase.io.crypto.Decryptor;
035import org.apache.hadoop.hbase.io.crypto.Encryption;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
037import org.apache.hadoop.hbase.security.EncryptionUtil;
038import org.apache.hadoop.hbase.security.User;
039import org.apache.hadoop.hbase.util.EncryptionTest;
040
041@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
042public class SecureProtobufLogReader extends ProtobufLogReader {
043
044  private static final Logger LOG = LoggerFactory.getLogger(SecureProtobufLogReader.class);
045
046  private Decryptor decryptor = null;
047  private static List<String> writerClsNames = new ArrayList<>();
048  static {
049    writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
050    writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
051    writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
052    writerClsNames.add(SecureAsyncProtobufLogWriter.class.getSimpleName());
053  }
054
055  @Override
056  public List<String> getWriterClsNames() {
057    return writerClsNames;
058  }
059
060  @Override
061  protected WALHdrContext readHeader(WALHeader.Builder builder, FSDataInputStream stream)
062      throws IOException {
063    WALHdrContext hdrCtxt = super.readHeader(builder, stream);
064    WALHdrResult result = hdrCtxt.getResult();
065    // We need to unconditionally handle the case where the WAL has a key in
066    // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
067    // no longer set in the site configuration.
068    if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) {
069      // Serialized header data has been merged into the builder from the
070      // stream.
071
072      EncryptionTest.testKeyProvider(conf);
073      EncryptionTest.testCipherProvider(conf);
074
075      // Retrieve a usable key
076
077      byte[] keyBytes = builder.getEncryptionKey().toByteArray();
078      Key key = null;
079      String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY);
080      // First try the WAL key, if one is configured
081      if (walKeyName != null) {
082        try {
083          key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes);
084        } catch (KeyException e) {
085          if (LOG.isDebugEnabled()) {
086            LOG.debug("Unable to unwrap key with WAL key '" + walKeyName + "'");
087          }
088          key = null;
089        }
090      }
091      if (key == null) {
092        String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
093          User.getCurrent().getShortName());
094        try {
095          // Then, try the cluster master key
096          key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes);
097        } catch (KeyException e) {
098          // If the current master key fails to unwrap, try the alternate, if
099          // one is configured
100          if (LOG.isDebugEnabled()) {
101            LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
102          }
103          String alternateKeyName =
104            conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
105          if (alternateKeyName != null) {
106            try {
107              key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes);
108            } catch (KeyException ex) {
109              throw new IOException(ex);
110            }
111          } else {
112            throw new IOException(e);
113          }
114        }
115      }
116
117      // Use the algorithm the key wants
118
119      Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
120      if (cipher == null) {
121        throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
122      }
123
124      // Set up the decryptor for this WAL
125
126      decryptor = cipher.getDecryptor();
127      decryptor.setKey(key);
128
129      if (LOG.isTraceEnabled()) {
130        LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
131      }
132    }
133
134    return hdrCtxt;
135  }
136
137  @Override
138  protected void initAfterCompression(String cellCodecClsName) throws IOException {
139    if (decryptor != null && cellCodecClsName.equals(SecureWALCellCodec.class.getName())) {
140      WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
141      this.cellDecoder = codec.getDecoder(this.inputStream);
142      // We do not support compression with WAL encryption
143      this.compressionContext = null;
144      this.hasCompression = false;
145    } else {
146      super.initAfterCompression(cellCodecClsName);
147    }
148  }
149
150}