001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.net.InetAddress; 023import java.security.cert.X509Certificate; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 026import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 027import org.apache.hadoop.hbase.coprocessor.RpcCoprocessor; 028import org.apache.hadoop.hbase.coprocessor.RpcCoprocessorEnvironment; 029import org.apache.hadoop.hbase.coprocessor.RpcObserver; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 036 037@InterfaceAudience.Private 038public class RpcCoprocessorHost extends CoprocessorHost<RpcCoprocessor, RpcCoprocessorEnvironment> { 039 040 private static final Logger LOG = LoggerFactory.getLogger(RpcCoprocessorHost.class); 041 042 private static class RpcEnvironment extends BaseEnvironment<RpcCoprocessor> 043 implements RpcCoprocessorEnvironment { 044 045 public RpcEnvironment(RpcCoprocessor impl, int priority, int seq, Configuration conf) { 046 super(impl, priority, seq, conf); 047 } 048 } 049 050 public RpcCoprocessorHost(final Configuration conf) { 051 // RPCServer cannot be aborted, so we don't pass Abortable down here. 052 super(null); 053 this.conf = conf; 054 boolean coprocessorsEnabled = 055 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 056 LOG.trace("System coprocessor loading is {}", (coprocessorsEnabled ? "enabled" : "disabled")); 057 loadSystemCoprocessors(conf, RPC_COPROCESSOR_CONF_KEY); 058 } 059 060 @Override 061 public RpcCoprocessorEnvironment createEnvironment(RpcCoprocessor instance, int priority, 062 int sequence, Configuration conf) { 063 return new RpcEnvironment(instance, priority, sequence, conf); 064 } 065 066 @Override 067 public RpcCoprocessor checkAndGetInstance(Class<?> implClass) 068 throws InstantiationException, IllegalAccessException { 069 try { 070 if (RpcCoprocessor.class.isAssignableFrom(implClass)) { 071 return implClass.asSubclass(RpcCoprocessor.class).getDeclaredConstructor().newInstance(); 072 } else { 073 LOG.error("{} is not of type RpcCoprocessor. Check the configuration of {}", 074 implClass.getName(), CoprocessorHost.RPC_COPROCESSOR_CONF_KEY); 075 return null; 076 } 077 } catch (NoSuchMethodException | InvocationTargetException e) { 078 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 079 } 080 } 081 082 private final ObserverGetter<RpcCoprocessor, RpcObserver> rpcObserverGetter = 083 RpcCoprocessor::getRpcObserver; 084 085 abstract class RpcObserverOperation extends ObserverOperationWithoutResult<RpcObserver> { 086 public RpcObserverOperation() { 087 super(rpcObserverGetter); 088 } 089 090 public RpcObserverOperation(boolean bypassable) { 091 this(null, bypassable); 092 } 093 094 public RpcObserverOperation(User user) { 095 super(rpcObserverGetter, user); 096 } 097 098 public RpcObserverOperation(User user, boolean bypassable) { 099 super(rpcObserverGetter, user, bypassable); 100 } 101 } 102 103 public void preAuthorizeConnection(RPCProtos.ConnectionHeader connectionHeader, 104 InetAddress remoteAddr) throws IOException { 105 execOperation(coprocEnvironments.isEmpty() ? null : new RpcObserverOperation() { 106 @Override 107 protected void call(RpcObserver observer) throws IOException { 108 observer.preAuthorizeConnection(this, connectionHeader, remoteAddr); 109 } 110 }); 111 } 112 113 public void postAuthorizeConnection(final String userName, 114 final X509Certificate[] clientCertificates) throws IOException { 115 execOperation(coprocEnvironments.isEmpty() ? null : new RpcObserverOperation() { 116 @Override 117 protected void call(RpcObserver observer) throws IOException { 118 observer.postAuthorizeConnection(this, userName, clientCertificates); 119 } 120 }); 121 } 122}