001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Optional; 027import java.util.Set; 028import java.util.TreeSet; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentSkipListSet; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.function.Function; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.Coprocessor; 037import org.apache.hadoop.hbase.CoprocessorEnvironment; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ipc.RpcServer; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 043import org.apache.hadoop.hbase.util.SortedList; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Strings; 049 050/** 051 * Provides the common setup framework and runtime services for coprocessor invocation from HBase 052 * services. 053 * @param <C> type of specific coprocessor this host will handle 054 * @param <E> type of specific coprocessor environment this host requires. provides 055 */ 056@InterfaceAudience.Private 057public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> { 058 public static final String REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.region.classes"; 059 public static final String REGIONSERVER_COPROCESSOR_CONF_KEY = 060 "hbase.coprocessor.regionserver.classes"; 061 public static final String USER_REGION_COPROCESSOR_CONF_KEY = 062 "hbase.coprocessor.user.region.classes"; 063 public static final String MASTER_COPROCESSOR_CONF_KEY = "hbase.coprocessor.master.classes"; 064 public static final String WAL_COPROCESSOR_CONF_KEY = "hbase.coprocessor.wal.classes"; 065 public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror"; 066 public static final boolean DEFAULT_ABORT_ON_ERROR = true; 067 public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled"; 068 public static final boolean DEFAULT_COPROCESSORS_ENABLED = true; 069 public static final String USER_COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.user.enabled"; 070 public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true; 071 public static final String SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = 072 "hbase.skip.load.duplicate.table.coprocessor"; 073 public static final boolean DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = false; 074 075 private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class); 076 protected Abortable abortable; 077 /** Ordered set of loaded coprocessors with lock */ 078 protected final SortedList<E> coprocEnvironments = 079 new SortedList<>(new EnvironmentPriorityComparator()); 080 protected Configuration conf; 081 // unique file prefix to use for local copies of jars when classloading 082 protected String pathPrefix; 083 protected AtomicInteger loadSequence = new AtomicInteger(); 084 085 public CoprocessorHost(Abortable abortable) { 086 this.abortable = abortable; 087 this.pathPrefix = UUID.randomUUID().toString(); 088 } 089 090 /** 091 * Not to be confused with the per-object _coprocessors_ (above), coprocessorNames is static and 092 * stores the set of all coprocessors ever loaded by any thread in this JVM. It is strictly 093 * additive: coprocessors are added to coprocessorNames, by checkAndLoadInstance() but are never 094 * removed, since the intention is to preserve a history of all loaded coprocessors for diagnosis 095 * in case of server crash (HBASE-4014). 096 */ 097 private static Set<String> coprocessorNames = Collections.synchronizedSet(new HashSet<String>()); 098 099 public static Set<String> getLoadedCoprocessors() { 100 synchronized (coprocessorNames) { 101 return new HashSet(coprocessorNames); 102 } 103 } 104 105 /** 106 * Used to create a parameter to the HServerLoad constructor so that HServerLoad can provide 107 * information about the coprocessors loaded by this regionserver. (HBASE-4070: Improve region 108 * server metrics to report loaded coprocessors to master). 109 */ 110 public Set<String> getCoprocessors() { 111 Set<String> returnValue = new TreeSet<>(); 112 for (E e : coprocEnvironments) { 113 returnValue.add(e.getInstance().getClass().getSimpleName()); 114 } 115 return returnValue; 116 } 117 118 /** 119 * Load system coprocessors once only. Read the class names from configuration. Called by 120 * constructor. 121 */ 122 protected void loadSystemCoprocessors(Configuration conf, String confKey) { 123 boolean coprocessorsEnabled = 124 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 125 if (!coprocessorsEnabled) { 126 return; 127 } 128 129 Class<?> implClass; 130 131 // load default coprocessors from configure file 132 String[] defaultCPClasses = conf.getStrings(confKey); 133 if (defaultCPClasses == null || defaultCPClasses.length == 0) return; 134 135 int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM; 136 for (String className : defaultCPClasses) { 137 // After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept 138 // an optional format of className|priority|path 139 String[] classNameToken = className.split("\\|"); 140 boolean hasPriorityOverride = false; 141 boolean hasPath = false; 142 className = classNameToken[0]; 143 int overridePriority = Coprocessor.PRIORITY_SYSTEM; 144 Path path = null; 145 if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) { 146 overridePriority = Integer.parseInt(classNameToken[1]); 147 hasPriorityOverride = true; 148 } 149 if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) { 150 path = new Path(classNameToken[2].trim()); 151 hasPath = true; 152 } 153 className = className.trim(); 154 if (findCoprocessor(className) != null) { 155 // If already loaded will just continue 156 LOG.warn("Attempted duplicate loading of " + className + "; skipped"); 157 continue; 158 } 159 ClassLoader cl = this.getClass().getClassLoader(); 160 try { 161 // override the class loader if a path for the system coprocessor is provided. 162 if (hasPath) { 163 cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(), 164 pathPrefix, conf); 165 } 166 Thread.currentThread().setContextClassLoader(cl); 167 implClass = cl.loadClass(className); 168 int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority; 169 // Add coprocessors as we go to guard against case where a coprocessor is specified twice 170 // in the configuration 171 E env = checkAndLoadInstance(implClass, coprocPriority, conf); 172 if (env != null) { 173 this.coprocEnvironments.add(env); 174 LOG.info("System coprocessor {} loaded, priority={}.", className, coprocPriority); 175 if (!hasPriorityOverride) { 176 ++currentSystemPriority; 177 } 178 } 179 } catch (Throwable t) { 180 // We always abort if system coprocessors cannot be loaded 181 abortServer(className, t); 182 } 183 } 184 } 185 186 /** 187 * Load a coprocessor implementation into the host 188 * @param path path to implementation jar 189 * @param className the main class name 190 * @param priority chaining priority 191 * @param conf configuration for coprocessor 192 * @throws java.io.IOException Exception 193 */ 194 public E load(Path path, String className, int priority, Configuration conf) throws IOException { 195 String[] includedClassPrefixes = null; 196 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 197 String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 198 includedClassPrefixes = prefixes.split(";"); 199 } 200 return load(path, className, priority, conf, includedClassPrefixes); 201 } 202 203 /** 204 * Load a coprocessor implementation into the host 205 * @param path path to implementation jar 206 * @param className the main class name 207 * @param priority chaining priority 208 * @param conf configuration for coprocessor 209 * @param includedClassPrefixes class name prefixes to include 210 * @throws java.io.IOException Exception 211 */ 212 public E load(Path path, String className, int priority, Configuration conf, 213 String[] includedClassPrefixes) throws IOException { 214 Class<?> implClass; 215 LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority " 216 + priority); 217 218 boolean skipLoadDuplicateCoprocessor = conf.getBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, 219 DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR); 220 if (skipLoadDuplicateCoprocessor && findCoprocessor(className) != null) { 221 // If already loaded will just continue 222 LOG.warn("Attempted duplicate loading of {}; skipped", className); 223 return null; 224 } 225 226 ClassLoader cl = null; 227 if (path == null) { 228 try { 229 implClass = getClass().getClassLoader().loadClass(className); 230 } catch (ClassNotFoundException e) { 231 throw new IOException("No jar path specified for " + className); 232 } 233 } else { 234 cl = 235 CoprocessorClassLoader.getClassLoader(path, getClass().getClassLoader(), pathPrefix, conf); 236 try { 237 implClass = ((CoprocessorClassLoader) cl).loadClass(className, includedClassPrefixes); 238 } catch (ClassNotFoundException e) { 239 throw new IOException("Cannot load external coprocessor class " + className, e); 240 } 241 } 242 243 // load custom code for coprocessor 244 Thread currentThread = Thread.currentThread(); 245 ClassLoader hostClassLoader = currentThread.getContextClassLoader(); 246 try { 247 // switch temporarily to the thread classloader for custom CP 248 currentThread.setContextClassLoader(cl); 249 E cpInstance = checkAndLoadInstance(implClass, priority, conf); 250 return cpInstance; 251 } finally { 252 // restore the fresh (host) classloader 253 currentThread.setContextClassLoader(hostClassLoader); 254 } 255 } 256 257 public void load(Class<? extends C> implClass, int priority, Configuration conf) 258 throws IOException { 259 E env = checkAndLoadInstance(implClass, priority, conf); 260 coprocEnvironments.add(env); 261 } 262 263 /** 264 * @param implClass Implementation class 265 * @param priority priority 266 * @param conf configuration 267 * @throws java.io.IOException Exception 268 */ 269 public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf) 270 throws IOException { 271 // create the instance 272 C impl; 273 try { 274 impl = checkAndGetInstance(implClass); 275 if (impl == null) { 276 LOG.error("Cannot load coprocessor " + implClass.getSimpleName()); 277 return null; 278 } 279 } catch (InstantiationException | IllegalAccessException e) { 280 throw new IOException(e); 281 } 282 // create the environment 283 E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf); 284 assert env instanceof BaseEnvironment; 285 ((BaseEnvironment<C>) env).startup(); 286 // HBASE-4014: maintain list of loaded coprocessors for later crash analysis 287 // if server (master or regionserver) aborts. 288 coprocessorNames.add(implClass.getName()); 289 return env; 290 } 291 292 /** 293 * Called when a new Coprocessor class is loaded 294 */ 295 public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf); 296 297 /** 298 * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class is 299 * what the corresponding host implementation expects. If it is of correct type, returns an 300 * instance of the coprocessor to be loaded. If not, returns null. If an exception occurs when 301 * trying to create instance of a coprocessor, it's passed up and eventually results into server 302 * aborting. 303 */ 304 public abstract C checkAndGetInstance(Class<?> implClass) 305 throws InstantiationException, IllegalAccessException; 306 307 public void shutdown(E e) { 308 assert e instanceof BaseEnvironment; 309 if (LOG.isDebugEnabled()) { 310 LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName()); 311 } 312 ((BaseEnvironment<C>) e).shutdown(); 313 } 314 315 /** 316 * Find coprocessors by full class name or simple name. 317 */ 318 public C findCoprocessor(String className) { 319 for (E env : coprocEnvironments) { 320 if ( 321 env.getInstance().getClass().getName().equals(className) 322 || env.getInstance().getClass().getSimpleName().equals(className) 323 ) { 324 return env.getInstance(); 325 } 326 } 327 return null; 328 } 329 330 public <T extends C> T findCoprocessor(Class<T> cls) { 331 for (E env : coprocEnvironments) { 332 if (cls.isAssignableFrom(env.getInstance().getClass())) { 333 return (T) env.getInstance(); 334 } 335 } 336 return null; 337 } 338 339 /** 340 * Find list of coprocessors that extend/implement the given class/interface 341 * @param cls the class/interface to look for 342 * @return the list of coprocessors, or null if not found 343 */ 344 public <T extends C> List<T> findCoprocessors(Class<T> cls) { 345 ArrayList<T> ret = new ArrayList<>(); 346 347 for (E env : coprocEnvironments) { 348 C cp = env.getInstance(); 349 350 if (cp != null) { 351 if (cls.isAssignableFrom(cp.getClass())) { 352 ret.add((T) cp); 353 } 354 } 355 } 356 return ret; 357 } 358 359 /** 360 * Find a coprocessor environment by class name 361 * @param className the class name 362 * @return the coprocessor, or null if not found 363 */ 364 public E findCoprocessorEnvironment(String className) { 365 for (E env : coprocEnvironments) { 366 if ( 367 env.getInstance().getClass().getName().equals(className) 368 || env.getInstance().getClass().getSimpleName().equals(className) 369 ) { 370 return env; 371 } 372 } 373 return null; 374 } 375 376 /** 377 * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external 378 * jar files. 379 * @return A set of ClassLoader instances 380 */ 381 Set<ClassLoader> getExternalClassLoaders() { 382 Set<ClassLoader> externalClassLoaders = new HashSet<>(); 383 final ClassLoader systemClassLoader = this.getClass().getClassLoader(); 384 for (E env : coprocEnvironments) { 385 ClassLoader cl = env.getInstance().getClass().getClassLoader(); 386 if (cl != systemClassLoader) { 387 // do not include system classloader 388 externalClassLoaders.add(cl); 389 } 390 } 391 return externalClassLoaders; 392 } 393 394 /** 395 * Environment priority comparator. Coprocessors are chained in sorted order. 396 */ 397 static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> { 398 @Override 399 public int compare(final CoprocessorEnvironment env1, final CoprocessorEnvironment env2) { 400 if (env1.getPriority() < env2.getPriority()) { 401 return -1; 402 } else if (env1.getPriority() > env2.getPriority()) { 403 return 1; 404 } 405 if (env1.getLoadSequence() < env2.getLoadSequence()) { 406 return -1; 407 } else if (env1.getLoadSequence() > env2.getLoadSequence()) { 408 return 1; 409 } 410 return 0; 411 } 412 } 413 414 protected void abortServer(final E environment, final Throwable e) { 415 abortServer(environment.getInstance().getClass().getName(), e); 416 } 417 418 protected void abortServer(final String coprocessorName, final Throwable e) { 419 String message = "The coprocessor " + coprocessorName + " threw " + e.toString(); 420 LOG.error(message, e); 421 if (abortable != null) { 422 abortable.abort(message, e); 423 } else { 424 LOG.warn("No available Abortable, process was not aborted"); 425 } 426 } 427 428 /** 429 * This is used by coprocessor hooks which are declared to throw IOException (or its subtypes). 430 * For such hooks, we should handle throwable objects depending on the Throwable's type. Those 431 * which are instances of IOException should be passed on to the client. This is in conformance 432 * with the HBase idiom regarding IOException: that it represents a circumstance that should be 433 * passed along to the client for its own handling. For example, a coprocessor that implements 434 * access controls would throw a subclass of IOException, such as AccessDeniedException, in its 435 * preGet() method to prevent an unauthorized client's performing a Get on a particular table. 436 * @param env Coprocessor Environment 437 * @param e Throwable object thrown by coprocessor. 438 * @exception IOException Exception 439 */ 440 // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver}, 441 // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate 442 // exceptions depending on situation. If any changes are made to this logic, make sure to 443 // update all classes' comments. 444 protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException { 445 if (e instanceof IOException) { 446 throw (IOException) e; 447 } 448 // If we got here, e is not an IOException. A loaded coprocessor has a 449 // fatal bug, and the server (master or regionserver) should remove the 450 // faulty coprocessor from its set of active coprocessors. Setting 451 // 'hbase.coprocessor.abortonerror' to true will cause abortServer(), 452 // which may be useful in development and testing environments where 453 // 'failing fast' for error analysis is desired. 454 if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 455 // server is configured to abort. 456 abortServer(env, e); 457 } else { 458 // If available, pull a table name out of the environment 459 if (env instanceof RegionCoprocessorEnvironment) { 460 String tableName = 461 ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getNameAsString(); 462 LOG.error("Removing coprocessor '" + env.toString() + "' from table '" + tableName + "'", 463 e); 464 } else { 465 LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment", e); 466 } 467 468 coprocEnvironments.remove(env); 469 try { 470 shutdown(env); 471 } catch (Exception x) { 472 LOG.error("Uncaught exception when shutting down coprocessor '" + env.toString() + "'", x); 473 } 474 throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e 475 + "' and has been removed from the active " + "coprocessor set.", e); 476 } 477 } 478 479 /** 480 * Used to limit legacy handling to once per Coprocessor class per classloader. 481 */ 482 private static final Set<Class<? extends Coprocessor>> legacyWarning = 483 new ConcurrentSkipListSet<>(new Comparator<Class<? extends Coprocessor>>() { 484 @Override 485 public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) { 486 if (c1.equals(c2)) { 487 return 0; 488 } 489 return c1.getName().compareTo(c2.getName()); 490 } 491 }); 492 493 /** 494 * Implementations defined function to get an observer of type {@code O} from a coprocessor of 495 * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each observer 496 * they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for each of 497 * RegionObserver, EndpointObserver and BulkLoadObserver. These getters are used by 498 * {@code ObserverOperation} to get appropriate observer from the coprocessor. 499 */ 500 @FunctionalInterface 501 public interface ObserverGetter<C, O> extends Function<C, Optional<O>> { 502 } 503 504 private abstract class ObserverOperation<O> extends ObserverContextImpl<E> { 505 ObserverGetter<C, O> observerGetter; 506 507 ObserverOperation(ObserverGetter<C, O> observerGetter) { 508 this(observerGetter, null); 509 } 510 511 ObserverOperation(ObserverGetter<C, O> observerGetter, User user) { 512 this(observerGetter, user, false); 513 } 514 515 ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) { 516 this(observerGetter, null, bypassable); 517 } 518 519 ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) { 520 super(user != null ? user : RpcServer.getRequestUser().orElse(null), bypassable); 521 this.observerGetter = observerGetter; 522 } 523 524 abstract void callObserver() throws IOException; 525 526 protected void postEnvCall() { 527 } 528 } 529 530 // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all 531 // ObserverCaller implementations will have to have a return statement. 532 // O = observer, E = environment, C = coprocessor, R=result type 533 public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> { 534 protected abstract void call(O observer) throws IOException; 535 536 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) { 537 super(observerGetter); 538 } 539 540 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) { 541 super(observerGetter, user); 542 } 543 544 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user, 545 boolean bypassable) { 546 super(observerGetter, user, bypassable); 547 } 548 549 /** 550 * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor} 551 * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all observers, 552 * in which case they will return null for that observer's getter. We simply ignore such cases. 553 */ 554 @Override 555 void callObserver() throws IOException { 556 Optional<O> observer = observerGetter.apply(getEnvironment().getInstance()); 557 if (observer.isPresent()) { 558 call(observer.get()); 559 } 560 } 561 } 562 563 public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> { 564 protected abstract R call(O observer) throws IOException; 565 566 private R result; 567 568 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) { 569 this(observerGetter, result, false); 570 } 571 572 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, 573 boolean bypassable) { 574 this(observerGetter, result, null, bypassable); 575 } 576 577 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) { 578 this(observerGetter, result, user, false); 579 } 580 581 private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user, 582 boolean bypassable) { 583 super(observerGetter, user, bypassable); 584 this.result = result; 585 } 586 587 protected R getResult() { 588 return this.result; 589 } 590 591 @Override 592 void callObserver() throws IOException { 593 Optional<O> observer = observerGetter.apply(getEnvironment().getInstance()); 594 if (observer.isPresent()) { 595 result = call(observer.get()); 596 } 597 } 598 } 599 600 ////////////////////////////////////////////////////////////////////////////////////////// 601 // Functions to execute observer hooks and handle results (if any) 602 ////////////////////////////////////////////////////////////////////////////////////////// 603 604 /** 605 * Do not call with an observerOperation that is null! Have the caller check. 606 */ 607 protected <O, R> R execOperationWithResult( 608 final ObserverOperationWithResult<O, R> observerOperation) throws IOException { 609 boolean bypass = execOperation(observerOperation); 610 R result = observerOperation.getResult(); 611 return bypass == observerOperation.isBypassable() ? result : null; 612 } 613 614 /** 615 * @return True if we are to bypass (Can only be <code>true</code> if 616 * ObserverOperation#isBypassable(). 617 */ 618 protected <O> boolean execOperation(final ObserverOperation<O> observerOperation) 619 throws IOException { 620 boolean bypass = false; 621 if (observerOperation == null) { 622 return bypass; 623 } 624 List<E> envs = coprocEnvironments.get(); 625 for (E env : envs) { 626 observerOperation.prepare(env); 627 Thread currentThread = Thread.currentThread(); 628 ClassLoader cl = currentThread.getContextClassLoader(); 629 try { 630 currentThread.setContextClassLoader(env.getClassLoader()); 631 observerOperation.callObserver(); 632 } catch (Throwable e) { 633 handleCoprocessorThrowable(env, e); 634 } finally { 635 currentThread.setContextClassLoader(cl); 636 } 637 // Internal to shouldBypass, it checks if obeserverOperation#isBypassable(). 638 bypass |= observerOperation.shouldBypass(); 639 observerOperation.postEnvCall(); 640 if (bypass) { 641 // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response. 642 // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'. 643 break; 644 } 645 } 646 return bypass; 647 } 648 649 /** 650 * Coprocessor classes can be configured in any order, based on that priority is set and chained 651 * in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is going down. 652 * This function first calls coprocessor methods (using ObserverOperation.call()) and then 653 * shutdowns the environment in postEnvCall(). <br> 654 * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors 655 * may remain shutdown if any exception occurs during next coprocessor execution which prevent 656 * master/regionserver stop or cluster shutdown. (Refer: 657 * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a> 658 * @return true if bypaas coprocessor execution, false if not. 659 */ 660 protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation) 661 throws IOException { 662 if (observerOperation == null) return false; 663 boolean bypass = false; 664 List<E> envs = coprocEnvironments.get(); 665 // Iterate the coprocessors and execute ObserverOperation's call() 666 for (E env : envs) { 667 observerOperation.prepare(env); 668 Thread currentThread = Thread.currentThread(); 669 ClassLoader cl = currentThread.getContextClassLoader(); 670 try { 671 currentThread.setContextClassLoader(env.getClassLoader()); 672 observerOperation.callObserver(); 673 } catch (Throwable e) { 674 handleCoprocessorThrowable(env, e); 675 } finally { 676 currentThread.setContextClassLoader(cl); 677 } 678 bypass |= observerOperation.shouldBypass(); 679 } 680 681 // Iterate the coprocessors and execute ObserverOperation's postEnvCall() 682 for (E env : envs) { 683 observerOperation.prepare(env); 684 observerOperation.postEnvCall(); 685 } 686 return bypass; 687 } 688}