001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import com.google.protobuf.Service; 021import java.io.IOException; 022import java.lang.reflect.InvocationTargetException; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.SharedConnection; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 028import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 029import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 030import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 031import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 032import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 034import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 035import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 036import org.apache.hadoop.hbase.coprocessor.SingletonCoprocessorService; 037import org.apache.hadoop.hbase.metrics.MetricRegistry; 038import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@InterfaceAudience.Private 045public class RegionServerCoprocessorHost 046 extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { 047 048 private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class); 049 050 private RegionServerServices rsServices; 051 052 public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) { 053 super(rsServices); 054 this.rsServices = rsServices; 055 this.conf = conf; 056 // Log the state of coprocessor loading here; should appear only once or 057 // twice in the daemon log, depending on HBase version, because there is 058 // only one RegionServerCoprocessorHost instance in the RS process 059 boolean coprocessorsEnabled = 060 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 061 boolean tableCoprocessorsEnabled = 062 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 063 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled")); 064 LOG.info("Table coprocessor loading is " 065 + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled")); 066 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); 067 } 068 069 @Override 070 public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority, 071 int sequence, Configuration conf) { 072 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 073 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 074 ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf, 075 this.rsServices) 076 : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices); 077 } 078 079 @Override 080 public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass) 081 throws InstantiationException, IllegalAccessException { 082 try { 083 if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) { 084 return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor() 085 .newInstance(); 086 } else if (SingletonCoprocessorService.class.isAssignableFrom(implClass)) { 087 // For backward compatibility with old CoprocessorService impl which don't extend 088 // RegionCoprocessor. 089 SingletonCoprocessorService tmp = implClass.asSubclass(SingletonCoprocessorService.class) 090 .getDeclaredConstructor().newInstance(); 091 return new CoprocessorServiceBackwardCompatiblity.RegionServerCoprocessorService(tmp); 092 } else { 093 LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}", 094 implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 095 return null; 096 } 097 } catch (NoSuchMethodException | InvocationTargetException e) { 098 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 099 } 100 } 101 102 private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter = 103 RegionServerCoprocessor::getRegionServerObserver; 104 105 abstract class RegionServerObserverOperation 106 extends ObserverOperationWithoutResult<RegionServerObserver> { 107 public RegionServerObserverOperation() { 108 super(rsObserverGetter); 109 } 110 111 public RegionServerObserverOperation(User user) { 112 super(rsObserverGetter, user); 113 } 114 } 115 116 ////////////////////////////////////////////////////////////////////////////////////////////////// 117 // RegionServerObserver operations 118 ////////////////////////////////////////////////////////////////////////////////////////////////// 119 120 public void preStop(String message, User user) throws IOException { 121 // While stopping the region server all coprocessors method should be executed first then the 122 // coprocessor should be cleaned up. 123 if (coprocEnvironments.isEmpty()) { 124 return; 125 } 126 execShutdown(new RegionServerObserverOperation(user) { 127 @Override 128 public void call(RegionServerObserver observer) throws IOException { 129 observer.preStopRegionServer(this); 130 } 131 132 @Override 133 public void postEnvCall() { 134 // invoke coprocessor stop method 135 shutdown(this.getEnvironment()); 136 } 137 }); 138 } 139 140 public void preRollWALWriterRequest() throws IOException { 141 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 142 @Override 143 public void call(RegionServerObserver observer) throws IOException { 144 observer.preRollWALWriterRequest(this); 145 } 146 }); 147 } 148 149 public void postRollWALWriterRequest() throws IOException { 150 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 151 @Override 152 public void call(RegionServerObserver observer) throws IOException { 153 observer.postRollWALWriterRequest(this); 154 } 155 }); 156 } 157 158 public void preReplicateLogEntries() throws IOException { 159 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 160 @Override 161 public void call(RegionServerObserver observer) throws IOException { 162 observer.preReplicateLogEntries(this); 163 } 164 }); 165 } 166 167 public void postReplicateLogEntries() throws IOException { 168 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 169 @Override 170 public void call(RegionServerObserver observer) throws IOException { 171 observer.postReplicateLogEntries(this); 172 } 173 }); 174 } 175 176 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) 177 throws IOException { 178 if (this.coprocEnvironments.isEmpty()) { 179 return endpoint; 180 } 181 return execOperationWithResult( 182 new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter, 183 endpoint) { 184 @Override 185 public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { 186 return observer.postCreateReplicationEndPoint(this, getResult()); 187 } 188 }); 189 } 190 191 public void preClearCompactionQueues() throws IOException { 192 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 193 @Override 194 public void call(RegionServerObserver observer) throws IOException { 195 observer.preClearCompactionQueues(this); 196 } 197 }); 198 } 199 200 public void postClearCompactionQueues() throws IOException { 201 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 202 @Override 203 public void call(RegionServerObserver observer) throws IOException { 204 observer.postClearCompactionQueues(this); 205 } 206 }); 207 } 208 209 public void preExecuteProcedures() throws IOException { 210 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 211 @Override 212 public void call(RegionServerObserver observer) throws IOException { 213 observer.preExecuteProcedures(this); 214 } 215 }); 216 } 217 218 public void postExecuteProcedures() throws IOException { 219 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 220 @Override 221 public void call(RegionServerObserver observer) throws IOException { 222 observer.postExecuteProcedures(this); 223 } 224 }); 225 } 226 227 /** 228 * Coprocessor environment extension providing access to region server related services. 229 */ 230 private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> 231 implements RegionServerCoprocessorEnvironment { 232 private final MetricRegistry metricRegistry; 233 private final RegionServerServices services; 234 235 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST", 236 justification = "Intentional; FB has trouble detecting isAssignableFrom") 237 public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority, 238 final int seq, final Configuration conf, final RegionServerServices services) { 239 super(impl, priority, seq, conf); 240 // If coprocessor exposes any services, register them. 241 for (Service service : impl.getServices()) { 242 services.registerService(service); 243 } 244 this.services = services; 245 this.metricRegistry = 246 MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); 247 } 248 249 @Override 250 public OnlineRegions getOnlineRegions() { 251 return this.services; 252 } 253 254 @Override 255 public ServerName getServerName() { 256 return this.services.getServerName(); 257 } 258 259 @Override 260 public Connection getConnection() { 261 return new SharedConnection(this.services.getConnection()); 262 } 263 264 @Override 265 public Connection createConnection(Configuration conf) throws IOException { 266 return this.services.createConnection(conf); 267 } 268 269 @Override 270 public MetricRegistry getMetricRegistryForRegionServer() { 271 return metricRegistry; 272 } 273 274 @Override 275 public void shutdown() { 276 super.shutdown(); 277 MetricsCoprocessor.removeRegistry(metricRegistry); 278 } 279 } 280 281 /** 282 * Special version of RegionServerEnvironment that exposes RegionServerServices for Core 283 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 284 */ 285 private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment 286 implements HasRegionServerServices { 287 final RegionServerServices regionServerServices; 288 289 public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl, 290 final int priority, final int seq, final Configuration conf, 291 final RegionServerServices services) { 292 super(impl, priority, seq, conf, services); 293 this.regionServerServices = services; 294 } 295 296 /** 297 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 298 * consumption. 299 */ 300 @Override 301 public RegionServerServices getRegionServerServices() { 302 return this.regionServerServices; 303 } 304 } 305}