001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.net.InetAddress;
023import java.security.cert.X509Certificate;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
026import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
027import org.apache.hadoop.hbase.coprocessor.RpcCoprocessor;
028import org.apache.hadoop.hbase.coprocessor.RpcCoprocessorEnvironment;
029import org.apache.hadoop.hbase.coprocessor.RpcObserver;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
036
037@InterfaceAudience.Private
038public class RpcCoprocessorHost extends CoprocessorHost<RpcCoprocessor, RpcCoprocessorEnvironment> {
039
040  private static final Logger LOG = LoggerFactory.getLogger(RpcCoprocessorHost.class);
041
042  private static class RpcEnvironment extends BaseEnvironment<RpcCoprocessor>
043    implements RpcCoprocessorEnvironment {
044
045    public RpcEnvironment(RpcCoprocessor impl, int priority, int seq, Configuration conf) {
046      super(impl, priority, seq, conf);
047    }
048  }
049
050  public RpcCoprocessorHost(final Configuration conf) {
051    // RPCServer cannot be aborted, so we don't pass Abortable down here.
052    super(null);
053    this.conf = conf;
054    boolean coprocessorsEnabled =
055      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
056    LOG.trace("System coprocessor loading is {}", (coprocessorsEnabled ? "enabled" : "disabled"));
057    loadSystemCoprocessors(conf, RPC_COPROCESSOR_CONF_KEY);
058  }
059
060  @Override
061  public RpcCoprocessorEnvironment createEnvironment(RpcCoprocessor instance, int priority,
062    int sequence, Configuration conf) {
063    return new RpcEnvironment(instance, priority, sequence, conf);
064  }
065
066  @Override
067  public RpcCoprocessor checkAndGetInstance(Class<?> implClass)
068    throws InstantiationException, IllegalAccessException {
069    try {
070      if (RpcCoprocessor.class.isAssignableFrom(implClass)) {
071        return implClass.asSubclass(RpcCoprocessor.class).getDeclaredConstructor().newInstance();
072      } else {
073        LOG.error("{} is not of type RpcCoprocessor. Check the configuration of {}",
074          implClass.getName(), CoprocessorHost.RPC_COPROCESSOR_CONF_KEY);
075        return null;
076      }
077    } catch (NoSuchMethodException | InvocationTargetException e) {
078      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
079    }
080  }
081
082  private final ObserverGetter<RpcCoprocessor, RpcObserver> rpcObserverGetter =
083    RpcCoprocessor::getRpcObserver;
084
085  abstract class RpcObserverOperation extends ObserverOperationWithoutResult<RpcObserver> {
086    public RpcObserverOperation() {
087      super(rpcObserverGetter);
088    }
089
090    public RpcObserverOperation(boolean bypassable) {
091      this(null, bypassable);
092    }
093
094    public RpcObserverOperation(User user) {
095      super(rpcObserverGetter, user);
096    }
097
098    public RpcObserverOperation(User user, boolean bypassable) {
099      super(rpcObserverGetter, user, bypassable);
100    }
101  }
102
103  public void preAuthorizeConnection(RPCProtos.ConnectionHeader connectionHeader,
104    InetAddress remoteAddr) throws IOException {
105    execOperation(coprocEnvironments.isEmpty() ? null : new RpcObserverOperation() {
106      @Override
107      protected void call(RpcObserver observer) throws IOException {
108        observer.preAuthorizeConnection(this, connectionHeader, remoteAddr);
109      }
110    });
111  }
112
113  public void postAuthorizeConnection(final String userName,
114    final X509Certificate[] clientCertificates) throws IOException {
115    execOperation(coprocEnvironments.isEmpty() ? null : new RpcObserverOperation() {
116      @Override
117      protected void call(RpcObserver observer) throws IOException {
118        observer.postAuthorizeConnection(this, userName, clientCertificates);
119      }
120    });
121  }
122}