001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.client.Connection; 025import org.apache.hadoop.hbase.client.Mutation; 026import org.apache.hadoop.hbase.client.SharedConnection; 027import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 028import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 029import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 030import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 031import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 032import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 033import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; 034import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; 035import org.apache.hadoop.hbase.metrics.MetricRegistry; 036import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.protobuf.Service; 043 044import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 045 046@InterfaceAudience.Private 047public class RegionServerCoprocessorHost 048 extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> { 049 050 private static final Logger LOG = LoggerFactory.getLogger(RegionServerCoprocessorHost.class); 051 052 private RegionServerServices rsServices; 053 054 public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) { 055 super(rsServices); 056 this.rsServices = rsServices; 057 this.conf = conf; 058 // Log the state of coprocessor loading here; should appear only once or 059 // twice in the daemon log, depending on HBase version, because there is 060 // only one RegionServerCoprocessorHost instance in the RS process 061 boolean coprocessorsEnabled = 062 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 063 boolean tableCoprocessorsEnabled = 064 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 065 LOG.info("System coprocessor loading is " + (coprocessorsEnabled ? "enabled" : "disabled")); 066 LOG.info("Table coprocessor loading is " 067 + ((coprocessorsEnabled && tableCoprocessorsEnabled) ? "enabled" : "disabled")); 068 loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); 069 } 070 071 @Override 072 public RegionServerEnvironment createEnvironment(RegionServerCoprocessor instance, int priority, 073 int sequence, Configuration conf) { 074 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 075 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 076 ? new RegionServerEnvironmentForCoreCoprocessors(instance, priority, sequence, conf, 077 this.rsServices) 078 : new RegionServerEnvironment(instance, priority, sequence, conf, this.rsServices); 079 } 080 081 @Override 082 public RegionServerCoprocessor checkAndGetInstance(Class<?> implClass) 083 throws InstantiationException, IllegalAccessException { 084 try { 085 if (RegionServerCoprocessor.class.isAssignableFrom(implClass)) { 086 return implClass.asSubclass(RegionServerCoprocessor.class).getDeclaredConstructor() 087 .newInstance(); 088 } else { 089 LOG.error("{} is not of type RegionServerCoprocessor. Check the configuration of {}", 090 implClass.getName(), CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY); 091 return null; 092 } 093 } catch (NoSuchMethodException | InvocationTargetException e) { 094 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 095 } 096 } 097 098 private ObserverGetter<RegionServerCoprocessor, RegionServerObserver> rsObserverGetter = 099 RegionServerCoprocessor::getRegionServerObserver; 100 101 abstract class RegionServerObserverOperation 102 extends ObserverOperationWithoutResult<RegionServerObserver> { 103 public RegionServerObserverOperation() { 104 super(rsObserverGetter); 105 } 106 107 public RegionServerObserverOperation(User user) { 108 super(rsObserverGetter, user); 109 } 110 } 111 112 ////////////////////////////////////////////////////////////////////////////////////////////////// 113 // RegionServerObserver operations 114 ////////////////////////////////////////////////////////////////////////////////////////////////// 115 116 public void preStop(String message, User user) throws IOException { 117 // While stopping the region server all coprocessors method should be executed first then the 118 // coprocessor should be cleaned up. 119 if (coprocEnvironments.isEmpty()) { 120 return; 121 } 122 execShutdown(new RegionServerObserverOperation(user) { 123 @Override 124 public void call(RegionServerObserver observer) throws IOException { 125 observer.preStopRegionServer(this); 126 } 127 128 @Override 129 public void postEnvCall() { 130 // invoke coprocessor stop method 131 shutdown(this.getEnvironment()); 132 } 133 }); 134 } 135 136 public void preRollWALWriterRequest() throws IOException { 137 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 138 @Override 139 public void call(RegionServerObserver observer) throws IOException { 140 observer.preRollWALWriterRequest(this); 141 } 142 }); 143 } 144 145 public void postRollWALWriterRequest() throws IOException { 146 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 147 @Override 148 public void call(RegionServerObserver observer) throws IOException { 149 observer.postRollWALWriterRequest(this); 150 } 151 }); 152 } 153 154 public void preReplicateLogEntries() throws IOException { 155 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 156 @Override 157 public void call(RegionServerObserver observer) throws IOException { 158 observer.preReplicateLogEntries(this); 159 } 160 }); 161 } 162 163 public void postReplicateLogEntries() throws IOException { 164 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 165 @Override 166 public void call(RegionServerObserver observer) throws IOException { 167 observer.postReplicateLogEntries(this); 168 } 169 }); 170 } 171 172 public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) 173 throws IOException { 174 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 175 @Override 176 public void call(RegionServerObserver observer) throws IOException { 177 observer.preReplicationSinkBatchMutate(this, walEntry, mutation); 178 } 179 }); 180 } 181 182 public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation) 183 throws IOException { 184 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 185 @Override 186 public void call(RegionServerObserver observer) throws IOException { 187 observer.postReplicationSinkBatchMutate(this, walEntry, mutation); 188 } 189 }); 190 } 191 192 public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint) 193 throws IOException { 194 if (this.coprocEnvironments.isEmpty()) { 195 return endpoint; 196 } 197 return execOperationWithResult( 198 new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(rsObserverGetter, 199 endpoint) { 200 @Override 201 public ReplicationEndpoint call(RegionServerObserver observer) throws IOException { 202 return observer.postCreateReplicationEndPoint(this, getResult()); 203 } 204 }); 205 } 206 207 public void preClearCompactionQueues() throws IOException { 208 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 209 @Override 210 public void call(RegionServerObserver observer) throws IOException { 211 observer.preClearCompactionQueues(this); 212 } 213 }); 214 } 215 216 public void postClearCompactionQueues() throws IOException { 217 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 218 @Override 219 public void call(RegionServerObserver observer) throws IOException { 220 observer.postClearCompactionQueues(this); 221 } 222 }); 223 } 224 225 public void preExecuteProcedures() throws IOException { 226 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 227 @Override 228 public void call(RegionServerObserver observer) throws IOException { 229 observer.preExecuteProcedures(this); 230 } 231 }); 232 } 233 234 public void postExecuteProcedures() throws IOException { 235 execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() { 236 @Override 237 public void call(RegionServerObserver observer) throws IOException { 238 observer.postExecuteProcedures(this); 239 } 240 }); 241 } 242 243 /** 244 * Coprocessor environment extension providing access to region server related services. 245 */ 246 private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> 247 implements RegionServerCoprocessorEnvironment { 248 private final MetricRegistry metricRegistry; 249 private final RegionServerServices services; 250 251 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BC_UNCONFIRMED_CAST", 252 justification = "Intentional; FB has trouble detecting isAssignableFrom") 253 public RegionServerEnvironment(final RegionServerCoprocessor impl, final int priority, 254 final int seq, final Configuration conf, final RegionServerServices services) { 255 super(impl, priority, seq, conf); 256 // If coprocessor exposes any services, register them. 257 for (Service service : impl.getServices()) { 258 services.registerService(service); 259 } 260 this.services = services; 261 this.metricRegistry = 262 MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); 263 } 264 265 @Override 266 public OnlineRegions getOnlineRegions() { 267 return this.services; 268 } 269 270 @Override 271 public ServerName getServerName() { 272 return this.services.getServerName(); 273 } 274 275 @Override 276 public Connection getConnection() { 277 return new SharedConnection(this.services.getConnection()); 278 } 279 280 @Override 281 public Connection createConnection(Configuration conf) throws IOException { 282 return this.services.createConnection(conf); 283 } 284 285 @Override 286 public MetricRegistry getMetricRegistryForRegionServer() { 287 return metricRegistry; 288 } 289 290 @Override 291 public void shutdown() { 292 super.shutdown(); 293 MetricsCoprocessor.removeRegistry(metricRegistry); 294 } 295 } 296 297 /** 298 * Special version of RegionServerEnvironment that exposes RegionServerServices for Core 299 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 300 */ 301 private static class RegionServerEnvironmentForCoreCoprocessors extends RegionServerEnvironment 302 implements HasRegionServerServices { 303 final RegionServerServices regionServerServices; 304 305 public RegionServerEnvironmentForCoreCoprocessors(final RegionServerCoprocessor impl, 306 final int priority, final int seq, final Configuration conf, 307 final RegionServerServices services) { 308 super(impl, priority, seq, conf, services); 309 this.regionServerServices = services; 310 } 311 312 /** 313 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 314 * consumption. 315 */ 316 @Override 317 public RegionServerServices getRegionServerServices() { 318 return this.regionServerServices; 319 } 320 } 321}