001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Optional; 027import java.util.Set; 028import java.util.TreeSet; 029import java.util.UUID; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.function.Function; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.Coprocessor; 036import org.apache.hadoop.hbase.CoprocessorEnvironment; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.ipc.RpcServer; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 042import org.apache.hadoop.hbase.util.SortedList; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.base.Strings; 048 049/** 050 * Provides the common setup framework and runtime services for coprocessor invocation from HBase 051 * services. 052 * @param <C> type of specific coprocessor this host will handle 053 * @param <E> type of specific coprocessor environment this host requires. provides 054 */ 055@InterfaceAudience.Private 056public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> { 057 public static final String REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.region.classes"; 058 public static final String REGIONSERVER_COPROCESSOR_CONF_KEY = 059 "hbase.coprocessor.regionserver.classes"; 060 public static final String USER_REGION_COPROCESSOR_CONF_KEY = 061 "hbase.coprocessor.user.region.classes"; 062 public static final String MASTER_COPROCESSOR_CONF_KEY = "hbase.coprocessor.master.classes"; 063 public static final String CLIENT_META_COPROCESSOR_CONF_KEY = 064 "hbase.coprocessor.clientmeta.classes"; 065 public static final String WAL_COPROCESSOR_CONF_KEY = "hbase.coprocessor.wal.classes"; 066 public static final String RPC_COPROCESSOR_CONF_KEY = "hbase.coprocessor.rpc.classes"; 067 public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror"; 068 public static final boolean DEFAULT_ABORT_ON_ERROR = true; 069 public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled"; 070 public static final boolean DEFAULT_COPROCESSORS_ENABLED = true; 071 public static final String USER_COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.user.enabled"; 072 public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true; 073 public static final String SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = 074 "hbase.skip.load.duplicate.table.coprocessor"; 075 public static final boolean DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = false; 076 077 private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class); 078 protected Abortable abortable; 079 /** Ordered set of loaded coprocessors with lock */ 080 protected final SortedList<E> coprocEnvironments = 081 new SortedList<>(new EnvironmentPriorityComparator()); 082 protected Configuration conf; 083 // unique file prefix to use for local copies of jars when classloading 084 protected String pathPrefix; 085 protected AtomicInteger loadSequence = new AtomicInteger(); 086 087 public CoprocessorHost(Abortable abortable) { 088 this.abortable = abortable; 089 this.pathPrefix = UUID.randomUUID().toString(); 090 } 091 092 /** 093 * Not to be confused with the per-object _coprocessors_ (above), coprocessorNames is static and 094 * stores the set of all coprocessors ever loaded by any thread in this JVM. It is strictly 095 * additive: coprocessors are added to coprocessorNames, by checkAndLoadInstance() but are never 096 * removed, since the intention is to preserve a history of all loaded coprocessors for diagnosis 097 * in case of server crash (HBASE-4014). 098 */ 099 private static Set<String> coprocessorNames = Collections.synchronizedSet(new HashSet<String>()); 100 101 public static Set<String> getLoadedCoprocessors() { 102 synchronized (coprocessorNames) { 103 return new HashSet(coprocessorNames); 104 } 105 } 106 107 /** 108 * Used to create a parameter to the HServerLoad constructor so that HServerLoad can provide 109 * information about the coprocessors loaded by this regionserver. (HBASE-4070: Improve region 110 * server metrics to report loaded coprocessors to master). 111 */ 112 public Set<String> getCoprocessors() { 113 Set<String> returnValue = new TreeSet<>(); 114 for (E e : coprocEnvironments) { 115 returnValue.add(e.getInstance().getClass().getSimpleName()); 116 } 117 return returnValue; 118 } 119 120 /** 121 * Get the full class names of all loaded coprocessors. This method returns the complete class 122 * names including package information, which is useful for precise coprocessor identification and 123 * comparison. 124 */ 125 public Set<String> getCoprocessorClassNames() { 126 Set<String> returnValue = new TreeSet<>(); 127 for (E e : coprocEnvironments) { 128 returnValue.add(e.getInstance().getClass().getName()); 129 } 130 return returnValue; 131 } 132 133 /** 134 * Load system coprocessors once only. Read the class names from configuration. Called by 135 * constructor. 136 */ 137 protected void loadSystemCoprocessors(Configuration conf, String confKey) { 138 boolean coprocessorsEnabled = 139 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 140 if (!coprocessorsEnabled) { 141 return; 142 } 143 144 Class<?> implClass; 145 146 // load default coprocessors from configure file 147 String[] defaultCPClasses = conf.getStrings(confKey); 148 if (defaultCPClasses == null || defaultCPClasses.length == 0) return; 149 150 int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM; 151 for (String className : defaultCPClasses) { 152 // After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept 153 // an optional format of className|priority|path 154 String[] classNameToken = className.split("\\|"); 155 boolean hasPriorityOverride = false; 156 boolean hasPath = false; 157 className = classNameToken[0]; 158 int overridePriority = Coprocessor.PRIORITY_SYSTEM; 159 Path path = null; 160 if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) { 161 overridePriority = Integer.parseInt(classNameToken[1]); 162 hasPriorityOverride = true; 163 } 164 if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) { 165 path = new Path(classNameToken[2].trim()); 166 hasPath = true; 167 } 168 className = className.trim(); 169 if (findCoprocessor(className) != null) { 170 // If already loaded will just continue 171 LOG.warn("Attempted duplicate loading of " + className + "; skipped"); 172 continue; 173 } 174 ClassLoader cl = this.getClass().getClassLoader(); 175 try { 176 // override the class loader if a path for the system coprocessor is provided. 177 if (hasPath) { 178 cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(), 179 pathPrefix, conf); 180 } 181 Thread.currentThread().setContextClassLoader(cl); 182 implClass = cl.loadClass(className); 183 int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority; 184 // Add coprocessors as we go to guard against case where a coprocessor is specified twice 185 // in the configuration 186 E env = checkAndLoadInstance(implClass, coprocPriority, conf); 187 if (env != null) { 188 this.coprocEnvironments.add(env); 189 LOG.info("System coprocessor {} loaded, priority={}.", className, coprocPriority); 190 if (!hasPriorityOverride) { 191 ++currentSystemPriority; 192 } 193 } 194 } catch (Throwable t) { 195 // We always abort if system coprocessors cannot be loaded 196 abortServer(className, t); 197 } 198 } 199 } 200 201 /** 202 * Load a coprocessor implementation into the host 203 * @param path path to implementation jar 204 * @param className the main class name 205 * @param priority chaining priority 206 * @param conf configuration for coprocessor 207 * @throws java.io.IOException Exception 208 */ 209 public E load(Path path, String className, int priority, Configuration conf) throws IOException { 210 String[] includedClassPrefixes = null; 211 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 212 String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 213 includedClassPrefixes = prefixes.split(";"); 214 } 215 return load(path, className, priority, conf, includedClassPrefixes); 216 } 217 218 /** 219 * Load a coprocessor implementation into the host 220 * @param path path to implementation jar 221 * @param className the main class name 222 * @param priority chaining priority 223 * @param conf configuration for coprocessor 224 * @param includedClassPrefixes class name prefixes to include 225 * @throws java.io.IOException Exception 226 */ 227 public E load(Path path, String className, int priority, Configuration conf, 228 String[] includedClassPrefixes) throws IOException { 229 Class<?> implClass; 230 LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority " 231 + priority); 232 233 boolean skipLoadDuplicateCoprocessor = conf.getBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, 234 DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR); 235 if (skipLoadDuplicateCoprocessor && findCoprocessor(className) != null) { 236 // If already loaded will just continue 237 LOG.warn("Attempted duplicate loading of {}; skipped", className); 238 return null; 239 } 240 241 ClassLoader cl = null; 242 if (path == null) { 243 try { 244 implClass = getClass().getClassLoader().loadClass(className); 245 } catch (ClassNotFoundException e) { 246 throw new IOException("No jar path specified for " + className); 247 } 248 } else { 249 cl = 250 CoprocessorClassLoader.getClassLoader(path, getClass().getClassLoader(), pathPrefix, conf); 251 try { 252 implClass = ((CoprocessorClassLoader) cl).loadClass(className, includedClassPrefixes); 253 } catch (ClassNotFoundException e) { 254 throw new IOException("Cannot load external coprocessor class " + className, e); 255 } 256 } 257 258 // load custom code for coprocessor 259 Thread currentThread = Thread.currentThread(); 260 ClassLoader hostClassLoader = currentThread.getContextClassLoader(); 261 try { 262 // switch temporarily to the thread classloader for custom CP 263 currentThread.setContextClassLoader(cl); 264 E cpInstance = checkAndLoadInstance(implClass, priority, conf); 265 return cpInstance; 266 } finally { 267 // restore the fresh (host) classloader 268 currentThread.setContextClassLoader(hostClassLoader); 269 } 270 } 271 272 public void load(Class<? extends C> implClass, int priority, Configuration conf) 273 throws IOException { 274 E env = checkAndLoadInstance(implClass, priority, conf); 275 coprocEnvironments.add(env); 276 } 277 278 /** 279 * @param implClass Implementation class 280 * @param priority priority 281 * @param conf configuration 282 * @throws java.io.IOException Exception 283 */ 284 public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf) 285 throws IOException { 286 // create the instance 287 C impl; 288 try { 289 impl = checkAndGetInstance(implClass); 290 if (impl == null) { 291 LOG.error("Cannot load coprocessor " + implClass.getSimpleName()); 292 return null; 293 } 294 } catch (InstantiationException | IllegalAccessException e) { 295 throw new IOException(e); 296 } 297 // create the environment 298 E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf); 299 assert env instanceof BaseEnvironment; 300 ((BaseEnvironment<C>) env).startup(); 301 // HBASE-4014: maintain list of loaded coprocessors for later crash analysis 302 // if server (master or regionserver) aborts. 303 coprocessorNames.add(implClass.getName()); 304 return env; 305 } 306 307 /** 308 * Called when a new Coprocessor class is loaded 309 */ 310 public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf); 311 312 /** 313 * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class is 314 * what the corresponding host implementation expects. If it is of correct type, returns an 315 * instance of the coprocessor to be loaded. If not, returns null. If an exception occurs when 316 * trying to create instance of a coprocessor, it's passed up and eventually results into server 317 * aborting. 318 */ 319 public abstract C checkAndGetInstance(Class<?> implClass) 320 throws InstantiationException, IllegalAccessException; 321 322 public void shutdown(E e) { 323 assert e instanceof BaseEnvironment; 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName()); 326 } 327 ((BaseEnvironment<C>) e).shutdown(); 328 } 329 330 /** 331 * Find coprocessors by full class name or simple name. 332 */ 333 public C findCoprocessor(String className) { 334 for (E env : coprocEnvironments) { 335 if ( 336 env.getInstance().getClass().getName().equals(className) 337 || env.getInstance().getClass().getSimpleName().equals(className) 338 ) { 339 return env.getInstance(); 340 } 341 } 342 return null; 343 } 344 345 public <T extends C> T findCoprocessor(Class<T> cls) { 346 for (E env : coprocEnvironments) { 347 if (cls.isAssignableFrom(env.getInstance().getClass())) { 348 return (T) env.getInstance(); 349 } 350 } 351 return null; 352 } 353 354 /** 355 * Find list of coprocessors that extend/implement the given class/interface 356 * @param cls the class/interface to look for 357 * @return the list of coprocessors, or null if not found 358 */ 359 public <T extends C> List<T> findCoprocessors(Class<T> cls) { 360 ArrayList<T> ret = new ArrayList<>(); 361 362 for (E env : coprocEnvironments) { 363 C cp = env.getInstance(); 364 365 if (cp != null) { 366 if (cls.isAssignableFrom(cp.getClass())) { 367 ret.add((T) cp); 368 } 369 } 370 } 371 return ret; 372 } 373 374 /** 375 * Find a coprocessor environment by class name 376 * @param className the class name 377 * @return the coprocessor, or null if not found 378 */ 379 public E findCoprocessorEnvironment(String className) { 380 for (E env : coprocEnvironments) { 381 if ( 382 env.getInstance().getClass().getName().equals(className) 383 || env.getInstance().getClass().getSimpleName().equals(className) 384 ) { 385 return env; 386 } 387 } 388 return null; 389 } 390 391 /** 392 * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external 393 * jar files. 394 * @return A set of ClassLoader instances 395 */ 396 Set<ClassLoader> getExternalClassLoaders() { 397 Set<ClassLoader> externalClassLoaders = new HashSet<>(); 398 final ClassLoader systemClassLoader = this.getClass().getClassLoader(); 399 for (E env : coprocEnvironments) { 400 ClassLoader cl = env.getInstance().getClass().getClassLoader(); 401 if (cl != systemClassLoader) { 402 // do not include system classloader 403 externalClassLoaders.add(cl); 404 } 405 } 406 return externalClassLoaders; 407 } 408 409 /** 410 * Environment priority comparator. Coprocessors are chained in sorted order. 411 */ 412 static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> { 413 @Override 414 public int compare(final CoprocessorEnvironment env1, final CoprocessorEnvironment env2) { 415 if (env1.getPriority() < env2.getPriority()) { 416 return -1; 417 } else if (env1.getPriority() > env2.getPriority()) { 418 return 1; 419 } 420 if (env1.getLoadSequence() < env2.getLoadSequence()) { 421 return -1; 422 } else if (env1.getLoadSequence() > env2.getLoadSequence()) { 423 return 1; 424 } 425 return 0; 426 } 427 } 428 429 protected void abortServer(final E environment, final Throwable e) { 430 abortServer(environment.getInstance().getClass().getName(), e); 431 } 432 433 protected void abortServer(final String coprocessorName, final Throwable e) { 434 String message = "The coprocessor " + coprocessorName + " threw " + e.toString(); 435 LOG.error(message, e); 436 if (abortable != null) { 437 abortable.abort(message, e); 438 } else { 439 LOG.warn("No available Abortable, process was not aborted"); 440 } 441 } 442 443 /** 444 * This is used by coprocessor hooks which are declared to throw IOException (or its subtypes). 445 * For such hooks, we should handle throwable objects depending on the Throwable's type. Those 446 * which are instances of IOException should be passed on to the client. This is in conformance 447 * with the HBase idiom regarding IOException: that it represents a circumstance that should be 448 * passed along to the client for its own handling. For example, a coprocessor that implements 449 * access controls would throw a subclass of IOException, such as AccessDeniedException, in its 450 * preGet() method to prevent an unauthorized client's performing a Get on a particular table. 451 * @param env Coprocessor Environment 452 * @param e Throwable object thrown by coprocessor. 453 * @exception IOException Exception 454 */ 455 // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver}, 456 // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate 457 // exceptions depending on situation. If any changes are made to this logic, make sure to 458 // update all classes' comments. 459 protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException { 460 if (e instanceof IOException) { 461 throw (IOException) e; 462 } 463 // If we got here, e is not an IOException. A loaded coprocessor has a 464 // fatal bug, and the server (master or regionserver) should remove the 465 // faulty coprocessor from its set of active coprocessors. Setting 466 // 'hbase.coprocessor.abortonerror' to true will cause abortServer(), 467 // which may be useful in development and testing environments where 468 // 'failing fast' for error analysis is desired. 469 if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 470 // server is configured to abort. 471 abortServer(env, e); 472 } else { 473 // If available, pull a table name out of the environment 474 if (env instanceof RegionCoprocessorEnvironment) { 475 String tableName = 476 ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getNameAsString(); 477 LOG.error("Removing coprocessor '" + env.toString() + "' from table '" + tableName + "'", 478 e); 479 } else { 480 LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment", e); 481 } 482 483 coprocEnvironments.remove(env); 484 try { 485 shutdown(env); 486 } catch (Exception x) { 487 LOG.error("Uncaught exception when shutting down coprocessor '" + env.toString() + "'", x); 488 } 489 throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e 490 + "' and has been removed from the active " + "coprocessor set.", e); 491 } 492 } 493 494 /** 495 * Implementations defined function to get an observer of type {@code O} from a coprocessor of 496 * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each observer 497 * they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for each of 498 * RegionObserver, EndpointObserver and BulkLoadObserver. These getters are used by 499 * {@code ObserverOperation} to get appropriate observer from the coprocessor. 500 */ 501 @FunctionalInterface 502 public interface ObserverGetter<C, O> extends Function<C, Optional<O>> { 503 } 504 505 private abstract class ObserverOperation<O> extends ObserverContextImpl<E> { 506 ObserverGetter<C, O> observerGetter; 507 508 ObserverOperation(ObserverGetter<C, O> observerGetter) { 509 this(observerGetter, (ObserverRpcCallContext) null); 510 } 511 512 ObserverOperation(ObserverGetter<C, O> observerGetter, User user) { 513 this(observerGetter, createRpcCallContext(user), false); 514 } 515 516 ObserverOperation(ObserverGetter<C, O> observerGetter, ObserverRpcCallContext rpcCallContext) { 517 this(observerGetter, rpcCallContext, false); 518 } 519 520 ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) { 521 this(observerGetter, (ObserverRpcCallContext) null, bypassable); 522 } 523 524 ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) { 525 this(observerGetter, createRpcCallContext(user), bypassable); 526 } 527 528 ObserverOperation(ObserverGetter<C, O> observerGetter, ObserverRpcCallContext rpcCallContext, 529 boolean bypassable) { 530 super(rpcCallContext != null ? rpcCallContext : createRpcCallContext(), bypassable); 531 this.observerGetter = observerGetter; 532 } 533 534 private static ObserverRpcCallContext createRpcCallContext() { 535 return RpcServer.getRequestUser() 536 .map(user -> new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes())) 537 .orElse(null); 538 } 539 540 private static ObserverRpcCallContext createRpcCallContext(User user) { 541 if (user == null) { 542 return null; 543 } else { 544 return new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes()); 545 } 546 } 547 548 abstract void callObserver() throws IOException; 549 550 protected void postEnvCall() { 551 } 552 } 553 554 // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all 555 // ObserverCaller implementations will have to have a return statement. 556 // O = observer, E = environment, C = coprocessor, R=result type 557 public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> { 558 protected abstract void call(O observer) throws IOException; 559 560 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) { 561 super(observerGetter); 562 } 563 564 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) { 565 super(observerGetter, user); 566 } 567 568 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, 569 ObserverRpcCallContext rpcCallContext) { 570 super(observerGetter, rpcCallContext); 571 } 572 573 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user, 574 boolean bypassable) { 575 super(observerGetter, user, bypassable); 576 } 577 578 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, 579 ObserverRpcCallContext rpcCallContext, boolean bypassable) { 580 super(observerGetter, rpcCallContext, bypassable); 581 } 582 583 /** 584 * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor} 585 * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all observers, 586 * in which case they will return null for that observer's getter. We simply ignore such cases. 587 */ 588 @Override 589 void callObserver() throws IOException { 590 Optional<O> observer = observerGetter.apply(getEnvironment().getInstance()); 591 if (observer.isPresent()) { 592 call(observer.get()); 593 } 594 } 595 } 596 597 public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> { 598 protected abstract R call(O observer) throws IOException; 599 600 private R result; 601 602 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) { 603 this(observerGetter, result, false); 604 } 605 606 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, 607 boolean bypassable) { 608 this(observerGetter, result, (ObserverRpcCallContext) null, bypassable); 609 } 610 611 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) { 612 this(observerGetter, result, user, false); 613 } 614 615 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, 616 ObserverRpcCallContext rpcCallContext) { 617 this(observerGetter, result, rpcCallContext, false); 618 } 619 620 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user, 621 boolean bypassable) { 622 super(observerGetter, user, bypassable); 623 this.result = result; 624 } 625 626 private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, 627 ObserverRpcCallContext rpcCallContext, boolean bypassable) { 628 super(observerGetter, rpcCallContext, bypassable); 629 this.result = result; 630 } 631 632 protected R getResult() { 633 return this.result; 634 } 635 636 @Override 637 void callObserver() throws IOException { 638 Optional<O> observer = observerGetter.apply(getEnvironment().getInstance()); 639 if (observer.isPresent()) { 640 result = call(observer.get()); 641 } 642 } 643 } 644 645 ////////////////////////////////////////////////////////////////////////////////////////// 646 // Functions to execute observer hooks and handle results (if any) 647 ////////////////////////////////////////////////////////////////////////////////////////// 648 649 /** 650 * Do not call with an observerOperation that is null! Have the caller check. 651 */ 652 protected <O, R> R execOperationWithResult( 653 final ObserverOperationWithResult<O, R> observerOperation) throws IOException { 654 boolean bypass = execOperation(observerOperation); 655 R result = observerOperation.getResult(); 656 return bypass == observerOperation.isBypassable() ? result : null; 657 } 658 659 /** 660 * @return True if we are to bypass (Can only be <code>true</code> if 661 * ObserverOperation#isBypassable(). 662 */ 663 protected <O> boolean execOperation(final ObserverOperation<O> observerOperation) 664 throws IOException { 665 boolean bypass = false; 666 if (observerOperation == null) { 667 return bypass; 668 } 669 List<E> envs = coprocEnvironments.get(); 670 for (E env : envs) { 671 observerOperation.prepare(env); 672 Thread currentThread = Thread.currentThread(); 673 ClassLoader cl = currentThread.getContextClassLoader(); 674 try { 675 currentThread.setContextClassLoader(env.getClassLoader()); 676 observerOperation.callObserver(); 677 } catch (Throwable e) { 678 handleCoprocessorThrowable(env, e); 679 } finally { 680 currentThread.setContextClassLoader(cl); 681 } 682 // Internal to shouldBypass, it checks if obeserverOperation#isBypassable(). 683 bypass |= observerOperation.shouldBypass(); 684 observerOperation.postEnvCall(); 685 if (bypass) { 686 // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response. 687 // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'. 688 break; 689 } 690 } 691 return bypass; 692 } 693 694 /** 695 * Coprocessor classes can be configured in any order, based on that priority is set and chained 696 * in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is going down. 697 * This function first calls coprocessor methods (using ObserverOperation.call()) and then 698 * shutdowns the environment in postEnvCall(). <br> 699 * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors 700 * may remain shutdown if any exception occurs during next coprocessor execution which prevent 701 * master/regionserver stop or cluster shutdown. (Refer: 702 * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a> 703 * @return true if bypaas coprocessor execution, false if not. 704 */ 705 protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation) 706 throws IOException { 707 if (observerOperation == null) return false; 708 boolean bypass = false; 709 List<E> envs = coprocEnvironments.get(); 710 // Iterate the coprocessors and execute ObserverOperation's call() 711 for (E env : envs) { 712 observerOperation.prepare(env); 713 Thread currentThread = Thread.currentThread(); 714 ClassLoader cl = currentThread.getContextClassLoader(); 715 try { 716 currentThread.setContextClassLoader(env.getClassLoader()); 717 observerOperation.callObserver(); 718 } catch (Throwable e) { 719 handleCoprocessorThrowable(env, e); 720 } finally { 721 currentThread.setContextClassLoader(cl); 722 } 723 bypass |= observerOperation.shouldBypass(); 724 } 725 726 // Iterate the coprocessors and execute ObserverOperation's postEnvCall() 727 for (E env : envs) { 728 observerOperation.prepare(env); 729 observerOperation.postEnvCall(); 730 } 731 return bypass; 732 } 733}