001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.CacheEvictionStats; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.client.Mutation; 027import org.apache.hadoop.hbase.client.SharedConnection; 028import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 029import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 030import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 031import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 032import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 034import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 035import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 036import org.apache.hadoop.hbase.metrics.MetricRegistry; 037import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 038import org.apache.hadoop.hbase.security.User; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.protobuf.Service; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 046 047@InterfaceAudience.Private 048public class RegionServerCoprocessorHost 049 extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class); 052 053 private RegionServerServices rsServices; 054 055 public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) { 056 super(rsServices); 057 this.rsServices = rsServices; 058 this.conf = conf; 059 // Log the state of coprocessor loading here; should appear only once or 060 // twice in the daemon log, depending on HBase version, because there is 061 // only one RegionServerCoprocessorHost instance in the RS process 062 boolean coprocessorsEnabled = 063 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 064 boolean tableCoprocessorsEnabled = 065 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 066 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled")); 067 LOG.info("Table coprocessor loading is " 068 + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled")); 069 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); 070 } 071 072 @Override 073 public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority, 074 int sequence, Configuration conf) { 075 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 076 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 077 ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf, 078 this.rsServices) 079 : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices); 080 } 081 082 @Override 083 public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass) 084 throws InstantiationException, IllegalAccessException { 085 try { 086 if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) { 087 return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor() 088 .newInstance(); 089 } else { 090 LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}", 091 implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 092 return null; 093 } 094 } catch (NoSuchMethodException | InvocationTargetException e) { 095 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 096 } 097 } 098 099 private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter = 100 RegionServerCoprocessor::getRegionServerObserver; 101 102 abstract class RegionServerObserverOperation 103 extends ObserverOperationWithoutResult<RegionServerObserver> { 104 public RegionServerObserverOperation() { 105 super(rsObserverGetter); 106 } 107 108 public RegionServerObserverOperation(User user) { 109 super(rsObserverGetter, user); 110 } 111 } 112 113 ////////////////////////////////////////////////////////////////////////////////////////////////// 114 // RegionServerObserver operations 115 ////////////////////////////////////////////////////////////////////////////////////////////////// 116 117 public void preStop(String message, User user) throws IOException { 118 // While stopping the region server all coprocessors method should be executed first then the 119 // coprocessor should be cleaned up. 120 if (coprocEnvironments.isEmpty()) { 121 return; 122 } 123 execShutdown(new RegionServerObserverOperation(user) { 124 @Override 125 public void call(RegionServerObserver observer) throws IOException { 126 observer.preStopRegionServer(this); 127 } 128 129 @Override 130 public void postEnvCall() { 131 // invoke coprocessor stop method 132 shutdown(this.getEnvironment()); 133 } 134 }); 135 } 136 137 public void preRollWALWriterRequest() throws IOException { 138 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 139 @Override 140 public void call(RegionServerObserver observer) throws IOException { 141 observer.preRollWALWriterRequest(this); 142 } 143 }); 144 } 145 146 public void postRollWALWriterRequest() throws IOException { 147 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 148 @Override 149 public void call(RegionServerObserver observer) throws IOException { 150 observer.postRollWALWriterRequest(this); 151 } 152 }); 153 } 154 155 public void preReplicateLogEntries() throws IOException { 156 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 157 @Override 158 public void call(RegionServerObserver observer) throws IOException { 159 observer.preReplicateLogEntries(this); 160 } 161 }); 162 } 163 164 public void postReplicateLogEntries() throws IOException { 165 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 166 @Override 167 public void call(RegionServerObserver observer) throws IOException { 168 observer.postReplicateLogEntries(this); 169 } 170 }); 171 } 172 173 public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) 174 throws IOException { 175 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 176 @Override 177 public void call(RegionServerObserver observer) throws IOException { 178 observer.preReplicationSinkBatchMutate(this, walEntry, mutation); 179 } 180 }); 181 } 182 183 public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) 184 throws IOException { 185 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 186 @Override 187 public void call(RegionServerObserver observer) throws IOException { 188 observer.postReplicationSinkBatchMutate(this, walEntry, mutation); 189 } 190 }); 191 } 192 193 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) 194 throws IOException { 195 if (this.coprocEnvironments.isEmpty()) { 196 return endpoint; 197 } 198 return execOperationWithResult( 199 new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter, 200 endpoint) { 201 @Override 202 public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { 203 return observer.postCreateReplicationEndPoint(this, getResult()); 204 } 205 }); 206 } 207 208 public void preClearCompactionQueues() throws IOException { 209 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 210 @Override 211 public void call(RegionServerObserver observer) throws IOException { 212 observer.preClearCompactionQueues(this); 213 } 214 }); 215 } 216 217 public void postClearCompactionQueues() throws IOException { 218 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 219 @Override 220 public void call(RegionServerObserver observer) throws IOException { 221 observer.postClearCompactionQueues(this); 222 } 223 }); 224 } 225 226 public void preExecuteProcedures() throws IOException { 227 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 228 @Override 229 public void call(RegionServerObserver observer) throws IOException { 230 observer.preExecuteProcedures(this); 231 } 232 }); 233 } 234 235 public void postExecuteProcedures() throws IOException { 236 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 237 @Override 238 public void call(RegionServerObserver observer) throws IOException { 239 observer.postExecuteProcedures(this); 240 } 241 }); 242 } 243 244 public void preUpdateConfiguration(Configuration preReloadConf) throws IOException { 245 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 246 @Override 247 public void call(RegionServerObserver observer) throws IOException { 248 observer.preUpdateRegionServerConfiguration(this, preReloadConf); 249 } 250 }); 251 } 252 253 public void postUpdateConfiguration(Configuration postReloadConf) throws IOException { 254 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 255 @Override 256 public void call(RegionServerObserver observer) throws IOException { 257 observer.postUpdateRegionServerConfiguration(this, postReloadConf); 258 } 259 }); 260 } 261 262 public void preClearRegionBlockCache() throws IOException { 263 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 264 @Override 265 public void call(RegionServerObserver observer) throws IOException { 266 observer.preClearRegionBlockCache(this); 267 } 268 }); 269 } 270 271 public void postClearRegionBlockCache(CacheEvictionStats stats) throws IOException { 272 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 273 @Override 274 public void call(RegionServerObserver observer) throws IOException { 275 observer.postClearRegionBlockCache(this, stats); 276 } 277 }); 278 } 279 280 /** 281 * Coprocessor environment extension providing access to region server related services. 282 */ 283 private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> 284 implements RegionServerCoprocessorEnvironment { 285 private final MetricRegistry metricRegistry; 286 private final RegionServerServices services; 287 288 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST", 289 justification = "Intentional; FB has trouble detecting isAssignableFrom") 290 public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority, 291 final int seq, final Configuration conf, final RegionServerServices services) { 292 super(impl, priority, seq, conf); 293 // If coprocessor exposes any services, register them. 294 for (Service service : impl.getServices()) { 295 services.registerService(service); 296 } 297 this.services = services; 298 this.metricRegistry = 299 MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); 300 } 301 302 @Override 303 public OnlineRegions getOnlineRegions() { 304 return this.services; 305 } 306 307 @Override 308 public ServerName getServerName() { 309 return this.services.getServerName(); 310 } 311 312 @Override 313 public Connection getConnection() { 314 return new SharedConnection(this.services.getConnection()); 315 } 316 317 @Override 318 public Connection createConnection(Configuration conf) throws IOException { 319 return this.services.createConnection(conf); 320 } 321 322 @Override 323 public MetricRegistry getMetricRegistryForRegionServer() { 324 return metricRegistry; 325 } 326 327 @Override 328 public void shutdown() { 329 super.shutdown(); 330 MetricsCoprocessor.removeRegistry(metricRegistry); 331 } 332 } 333 334 /** 335 * Special version of RegionServerEnvironment that exposes RegionServerServices for Core 336 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 337 */ 338 private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment 339 implements HasRegionServerServices { 340 final RegionServerServices regionServerServices; 341 342 public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl, 343 final int priority, final int seq, final Configuration conf, 344 final RegionServerServices services) { 345 super(impl, priority, seq, conf, services); 346 this.regionServerServices = services; 347 } 348 349 /** 350 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 351 * consumption. 352 */ 353 @Override 354 public RegionServerServices getRegionServerServices() { 355 return this.regionServerServices; 356 } 357 } 358}