001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.client.Connection; 025import org.apache.hadoop.hbase.client.SharedConnection; 026import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 027import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 028import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 029import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 030import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 031import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 032import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 033import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 034import org.apache.hadoop.hbase.metrics.MetricRegistry; 035import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.protobuf.Service; 042 043@InterfaceAudience.Private 044public class RegionServerCoprocessorHost 045 extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { 046 047 private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class); 048 049 private RegionServerServices rsServices; 050 051 public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) { 052 super(rsServices); 053 this.rsServices = rsServices; 054 this.conf = conf; 055 // Log the state of coprocessor loading here; should appear only once or 056 // twice in the daemon log, depending on HBase version, because there is 057 // only one RegionServerCoprocessorHost instance in the RS process 058 boolean coprocessorsEnabled = 059 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 060 boolean tableCoprocessorsEnabled = 061 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 062 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled")); 063 LOG.info("Table coprocessor loading is " 064 + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled")); 065 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); 066 } 067 068 @Override 069 public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority, 070 int sequence, Configuration conf) { 071 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 072 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 073 ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf, 074 this.rsServices) 075 : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices); 076 } 077 078 @Override 079 public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass) 080 throws InstantiationException, IllegalAccessException { 081 try { 082 if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) { 083 return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor() 084 .newInstance(); 085 } else { 086 LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}", 087 implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 088 return null; 089 } 090 } catch (NoSuchMethodException | InvocationTargetException e) { 091 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 092 } 093 } 094 095 private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter = 096 RegionServerCoprocessor::getRegionServerObserver; 097 098 abstract class RegionServerObserverOperation 099 extends ObserverOperationWithoutResult<RegionServerObserver> { 100 public RegionServerObserverOperation() { 101 super(rsObserverGetter); 102 } 103 104 public RegionServerObserverOperation(User user) { 105 super(rsObserverGetter, user); 106 } 107 } 108 109 ////////////////////////////////////////////////////////////////////////////////////////////////// 110 // RegionServerObserver operations 111 ////////////////////////////////////////////////////////////////////////////////////////////////// 112 113 public void preStop(String message, User user) throws IOException { 114 // While stopping the region server all coprocessors method should be executed first then the 115 // coprocessor should be cleaned up. 116 if (coprocEnvironments.isEmpty()) { 117 return; 118 } 119 execShutdown(new RegionServerObserverOperation(user) { 120 @Override 121 public void call(RegionServerObserver observer) throws IOException { 122 observer.preStopRegionServer(this); 123 } 124 125 @Override 126 public void postEnvCall() { 127 // invoke coprocessor stop method 128 shutdown(this.getEnvironment()); 129 } 130 }); 131 } 132 133 public void preRollWALWriterRequest() throws IOException { 134 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 135 @Override 136 public void call(RegionServerObserver observer) throws IOException { 137 observer.preRollWALWriterRequest(this); 138 } 139 }); 140 } 141 142 public void postRollWALWriterRequest() throws IOException { 143 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 144 @Override 145 public void call(RegionServerObserver observer) throws IOException { 146 observer.postRollWALWriterRequest(this); 147 } 148 }); 149 } 150 151 public void preReplicateLogEntries() throws IOException { 152 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 153 @Override 154 public void call(RegionServerObserver observer) throws IOException { 155 observer.preReplicateLogEntries(this); 156 } 157 }); 158 } 159 160 public void postReplicateLogEntries() throws IOException { 161 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 162 @Override 163 public void call(RegionServerObserver observer) throws IOException { 164 observer.postReplicateLogEntries(this); 165 } 166 }); 167 } 168 169 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) 170 throws IOException { 171 if (this.coprocEnvironments.isEmpty()) { 172 return endpoint; 173 } 174 return execOperationWithResult( 175 new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter, 176 endpoint) { 177 @Override 178 public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { 179 return observer.postCreateReplicationEndPoint(this, getResult()); 180 } 181 }); 182 } 183 184 public void preClearCompactionQueues() throws IOException { 185 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 186 @Override 187 public void call(RegionServerObserver observer) throws IOException { 188 observer.preClearCompactionQueues(this); 189 } 190 }); 191 } 192 193 public void postClearCompactionQueues() throws IOException { 194 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 195 @Override 196 public void call(RegionServerObserver observer) throws IOException { 197 observer.postClearCompactionQueues(this); 198 } 199 }); 200 } 201 202 public void preExecuteProcedures() throws IOException { 203 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 204 @Override 205 public void call(RegionServerObserver observer) throws IOException { 206 observer.preExecuteProcedures(this); 207 } 208 }); 209 } 210 211 public void postExecuteProcedures() throws IOException { 212 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 213 @Override 214 public void call(RegionServerObserver observer) throws IOException { 215 observer.postExecuteProcedures(this); 216 } 217 }); 218 } 219 220 /** 221 * Coprocessor environment extension providing access to region server related services. 222 */ 223 private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> 224 implements RegionServerCoprocessorEnvironment { 225 private final MetricRegistry metricRegistry; 226 private final RegionServerServices services; 227 228 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST", 229 justification = "Intentional; FB has trouble detecting isAssignableFrom") 230 public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority, 231 final int seq, final Configuration conf, final RegionServerServices services) { 232 super(impl, priority, seq, conf); 233 // If coprocessor exposes any services, register them. 234 for (Service service : impl.getServices()) { 235 services.registerService(service); 236 } 237 this.services = services; 238 this.metricRegistry = 239 MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); 240 } 241 242 @Override 243 public OnlineRegions getOnlineRegions() { 244 return this.services; 245 } 246 247 @Override 248 public ServerName getServerName() { 249 return this.services.getServerName(); 250 } 251 252 @Override 253 public Connection getConnection() { 254 return new SharedConnection(this.services.getConnection()); 255 } 256 257 @Override 258 public Connection createConnection(Configuration conf) throws IOException { 259 return this.services.createConnection(conf); 260 } 261 262 @Override 263 public MetricRegistry getMetricRegistryForRegionServer() { 264 return metricRegistry; 265 } 266 267 @Override 268 public void shutdown() { 269 super.shutdown(); 270 MetricsCoprocessor.removeRegistry(metricRegistry); 271 } 272 } 273 274 /** 275 * Special version of RegionServerEnvironment that exposes RegionServerServices for Core 276 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 277 */ 278 private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment 279 implements HasRegionServerServices { 280 final RegionServerServices regionServerServices; 281 282 public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl, 283 final int priority, final int seq, final Configuration conf, 284 final RegionServerServices services) { 285 super(impl, priority, seq, conf, services); 286 this.regionServerServices = services; 287 } 288 289 /** 290 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 291 * consumption. 292 */ 293 @Override 294 public RegionServerServices getRegionServerServices() { 295 return this.regionServerServices; 296 } 297 } 298}