001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.reflect.InvocationTargetException; 023 024import com.google.protobuf.Service; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.SharedConnection; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 031import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 032import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 033import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 034import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 035import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 037import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 038import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 039import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService; 040import org.apache.hadoop.hbase.metrics.MetricRegistry; 041import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@InterfaceAudience.Private 048public class RegionServerCoprocessorHost extends 049 CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class); 052 053 private RegionServerServices rsServices; 054 055 public RegionServerCoprocessorHost(RegionServerServices rsServices, 056 Configuration conf) { 057 super(rsServices); 058 this.rsServices = rsServices; 059 this.conf = conf; 060 // Log the state of coprocessor loading here; should appear only once or 061 // twice in the daemon log, depending on HBase version, because there is 062 // only one RegionServerCoprocessorHost instance in the RS process 063 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 064 DEFAULT_COPROCESSORS_ENABLED); 065 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 066 DEFAULT_USER_COPROCESSORS_ENABLED); 067 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled")); 068 LOG.info("Table coprocessor loading is " + 069 ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled")); 070 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); 071 } 072 073 @Override 074 public RegionServerEnvironment createEnvironment( 075 RegionServerCoprocessor instance, int priority, int sequence, Configuration conf) { 076 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 077 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 078 new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf, 079 this.rsServices): 080 new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices); 081 } 082 083 @Override 084 public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass) 085 throws InstantiationException, IllegalAccessException { 086 try { 087 if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) { 088 return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor() 089 .newInstance(); 090 } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) { 091 // For backward compatibility with old CoprocessorService impl which don't extend 092 // RegionCoprocessor. 093 SingletonCoprocessorService tmp = implClass.asSubclass(SingletonCoprocessorService.class) 094 .getDeclaredConstructor().newInstance(); 095 return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(tmp); 096 } else { 097 LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}", 098 implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 099 return null; 100 } 101 } catch (NoSuchMethodException | InvocationTargetException e) { 102 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 103 } 104 } 105 106 private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter = 107 RegionServerCoprocessor::getRegionServerObserver; 108 109 abstract class RegionServerObserverOperation extends 110 ObserverOperationWithoutResult<RegionServerObserver> { 111 public RegionServerObserverOperation() { 112 super(rsObserverGetter); 113 } 114 115 public RegionServerObserverOperation(User user) { 116 super(rsObserverGetter, user); 117 } 118 } 119 120 ////////////////////////////////////////////////////////////////////////////////////////////////// 121 // RegionServerObserver operations 122 ////////////////////////////////////////////////////////////////////////////////////////////////// 123 124 public void preStop(String message, User user) throws IOException { 125 // While stopping the region server all coprocessors method should be executed first then the 126 // coprocessor should be cleaned up. 127 if (coprocEnvironments.isEmpty()) { 128 return; 129 } 130 execShutdown(new RegionServerObserverOperation(user) { 131 @Override 132 public void call(RegionServerObserver observer) throws IOException { 133 observer.preStopRegionServer(this); 134 } 135 136 @Override 137 public void postEnvCall() { 138 // invoke coprocessor stop method 139 shutdown(this.getEnvironment()); 140 } 141 }); 142 } 143 144 public void preRollWALWriterRequest() throws IOException { 145 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 146 @Override 147 public void call(RegionServerObserver observer) throws IOException { 148 observer.preRollWALWriterRequest(this); 149 } 150 }); 151 } 152 153 public void postRollWALWriterRequest() throws IOException { 154 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 155 @Override 156 public void call(RegionServerObserver observer) throws IOException { 157 observer.postRollWALWriterRequest(this); 158 } 159 }); 160 } 161 162 public void preReplicateLogEntries() 163 throws IOException { 164 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 165 @Override 166 public void call(RegionServerObserver observer) throws IOException { 167 observer.preReplicateLogEntries(this); 168 } 169 }); 170 } 171 172 public void postReplicateLogEntries() 173 throws IOException { 174 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 175 @Override 176 public void call(RegionServerObserver observer) throws IOException { 177 observer.postReplicateLogEntries(this); 178 } 179 }); 180 } 181 182 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) 183 throws IOException { 184 if (this.coprocEnvironments.isEmpty()) { 185 return endpoint; 186 } 187 return execOperationWithResult( 188 new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>( 189 rsObserverGetter, endpoint) { 190 @Override 191 public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { 192 return observer.postCreateReplicationEndPoint(this, getResult()); 193 } 194 }); 195 } 196 197 public void preClearCompactionQueues() throws IOException { 198 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 199 @Override 200 public void call(RegionServerObserver observer) throws IOException { 201 observer.preClearCompactionQueues(this); 202 } 203 }); 204 } 205 206 public void postClearCompactionQueues() throws IOException { 207 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 208 @Override 209 public void call(RegionServerObserver observer) throws IOException { 210 observer.postClearCompactionQueues(this); 211 } 212 }); 213 } 214 215 public void preExecuteProcedures() throws IOException { 216 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 217 @Override 218 public void call(RegionServerObserver observer) throws IOException { 219 observer.preExecuteProcedures(this); 220 } 221 }); 222 } 223 224 public void postExecuteProcedures() throws IOException { 225 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 226 @Override 227 public void call(RegionServerObserver observer) throws IOException { 228 observer.postExecuteProcedures(this); 229 } 230 }); 231 } 232 233 /** 234 * Coprocessor environment extension providing access to region server 235 * related services. 236 */ 237 private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> 238 implements RegionServerCoprocessorEnvironment { 239 private final MetricRegistry metricRegistry; 240 private final RegionServerServices services; 241 242 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST", 243 justification="Intentional; FB has trouble detecting isAssignableFrom") 244 public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority, 245 final int seq, final Configuration conf, final RegionServerServices services) { 246 super(impl, priority, seq, conf); 247 // If coprocessor exposes any services, register them. 248 for (Service service : impl.getServices()) { 249 services.registerService(service); 250 } 251 this.services = services; 252 this.metricRegistry = 253 MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); 254 } 255 256 @Override 257 public OnlineRegions getOnlineRegions() { 258 return this.services; 259 } 260 261 @Override 262 public ServerName getServerName() { 263 return this.services.getServerName(); 264 } 265 266 @Override 267 public Connection getConnection() { 268 return new SharedConnection(this.services.getConnection()); 269 } 270 271 @Override 272 public Connection createConnection(Configuration conf) throws IOException { 273 return this.services.createConnection(conf); 274 } 275 276 @Override 277 public MetricRegistry getMetricRegistryForRegionServer() { 278 return metricRegistry; 279 } 280 281 @Override 282 public void shutdown() { 283 super.shutdown(); 284 MetricsCoprocessor.removeRegistry(metricRegistry); 285 } 286 } 287 288 /** 289 * Special version of RegionServerEnvironment that exposes RegionServerServices for Core 290 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 291 */ 292 private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment 293 implements HasRegionServerServices { 294 final RegionServerServices regionServerServices; 295 296 public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl, 297 final int priority, final int seq, final Configuration conf, 298 final RegionServerServices services) { 299 super(impl, priority, seq, conf, services); 300 this.regionServerServices = services; 301 } 302 303 /** 304 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 305 * consumption. 306 */ 307 @Override 308 public RegionServerServices getRegionServerServices() { 309 return this.regionServerServices; 310 } 311 } 312}