001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Comparator; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Optional; 027import java.util.Set; 028import java.util.TreeSet; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentSkipListSet; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.function.Function; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.Coprocessor; 037import org.apache.hadoop.hbase.CoprocessorEnvironment; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ipc.RpcServer; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 043import org.apache.hadoop.hbase.util.SortedList; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Strings; 049 050/** 051 * Provides the common setup framework and runtime services for coprocessor invocation from HBase 052 * services. 053 * @param <C> type of specific coprocessor this host will handle 054 * @param <E> type of specific coprocessor environment this host requires. provides 055 */ 056@InterfaceAudience.Private 057public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> { 058 public static final String REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.region.classes"; 059 public static final String REGIONSERVER_COPROCESSOR_CONF_KEY = 060 "hbase.coprocessor.regionserver.classes"; 061 public static final String USER_REGION_COPROCESSOR_CONF_KEY = 062 "hbase.coprocessor.user.region.classes"; 063 public static final String MASTER_COPROCESSOR_CONF_KEY = "hbase.coprocessor.master.classes"; 064 public static final String CLIENT_META_COPROCESSOR_CONF_KEY = 065 "hbase.coprocessor.clientmeta.classes"; 066 public static final String WAL_COPROCESSOR_CONF_KEY = "hbase.coprocessor.wal.classes"; 067 public static final String RPC_COPROCESSOR_CONF_KEY = "hbase.coprocessor.rpc.classes"; 068 public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror"; 069 public static final boolean DEFAULT_ABORT_ON_ERROR = true; 070 public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled"; 071 public static final boolean DEFAULT_COPROCESSORS_ENABLED = true; 072 public static final String USER_COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.user.enabled"; 073 public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true; 074 public static final String SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = 075 "hbase.skip.load.duplicate.table.coprocessor"; 076 public static final boolean DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = false; 077 078 private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class); 079 protected Abortable abortable; 080 /** Ordered set of loaded coprocessors with lock */ 081 protected final SortedList<E> coprocEnvironments = 082 new SortedList<>(new EnvironmentPriorityComparator()); 083 protected Configuration conf; 084 // unique file prefix to use for local copies of jars when classloading 085 protected String pathPrefix; 086 protected AtomicInteger loadSequence = new AtomicInteger(); 087 088 public CoprocessorHost(Abortable abortable) { 089 this.abortable = abortable; 090 this.pathPrefix = UUID.randomUUID().toString(); 091 } 092 093 /** 094 * Not to be confused with the per-object _coprocessors_ (above), coprocessorNames is static and 095 * stores the set of all coprocessors ever loaded by any thread in this JVM. It is strictly 096 * additive: coprocessors are added to coprocessorNames, by checkAndLoadInstance() but are never 097 * removed, since the intention is to preserve a history of all loaded coprocessors for diagnosis 098 * in case of server crash (HBASE-4014). 099 */ 100 private static Set<String> coprocessorNames = Collections.synchronizedSet(new HashSet<String>()); 101 102 public static Set<String> getLoadedCoprocessors() { 103 synchronized (coprocessorNames) { 104 return new HashSet(coprocessorNames); 105 } 106 } 107 108 /** 109 * Used to create a parameter to the HServerLoad constructor so that HServerLoad can provide 110 * information about the coprocessors loaded by this regionserver. (HBASE-4070: Improve region 111 * server metrics to report loaded coprocessors to master). 112 */ 113 public Set<String> getCoprocessors() { 114 Set<String> returnValue = new TreeSet<>(); 115 for (E e : coprocEnvironments) { 116 returnValue.add(e.getInstance().getClass().getSimpleName()); 117 } 118 return returnValue; 119 } 120 121 /** 122 * Get the full class names of all loaded coprocessors. This method returns the complete class 123 * names including package information, which is useful for precise coprocessor identification and 124 * comparison. 125 */ 126 public Set<String> getCoprocessorClassNames() { 127 Set<String> returnValue = new TreeSet<>(); 128 for (E e : coprocEnvironments) { 129 returnValue.add(e.getInstance().getClass().getName()); 130 } 131 return returnValue; 132 } 133 134 /** 135 * Load system coprocessors once only. Read the class names from configuration. Called by 136 * constructor. 137 */ 138 protected void loadSystemCoprocessors(Configuration conf, String confKey) { 139 boolean coprocessorsEnabled = 140 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 141 if (!coprocessorsEnabled) { 142 return; 143 } 144 145 Class<?> implClass; 146 147 // load default coprocessors from configure file 148 String[] defaultCPClasses = conf.getStrings(confKey); 149 if (defaultCPClasses == null || defaultCPClasses.length == 0) return; 150 151 int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM; 152 for (String className : defaultCPClasses) { 153 // After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept 154 // an optional format of className|priority|path 155 String[] classNameToken = className.split("\\|"); 156 boolean hasPriorityOverride = false; 157 boolean hasPath = false; 158 className = classNameToken[0]; 159 int overridePriority = Coprocessor.PRIORITY_SYSTEM; 160 Path path = null; 161 if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) { 162 overridePriority = Integer.parseInt(classNameToken[1]); 163 hasPriorityOverride = true; 164 } 165 if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) { 166 path = new Path(classNameToken[2].trim()); 167 hasPath = true; 168 } 169 className = className.trim(); 170 if (findCoprocessor(className) != null) { 171 // If already loaded will just continue 172 LOG.warn("Attempted duplicate loading of " + className + "; skipped"); 173 continue; 174 } 175 ClassLoader cl = this.getClass().getClassLoader(); 176 try { 177 // override the class loader if a path for the system coprocessor is provided. 178 if (hasPath) { 179 cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(), 180 pathPrefix, conf); 181 } 182 Thread.currentThread().setContextClassLoader(cl); 183 implClass = cl.loadClass(className); 184 int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority; 185 // Add coprocessors as we go to guard against case where a coprocessor is specified twice 186 // in the configuration 187 E env = checkAndLoadInstance(implClass, coprocPriority, conf); 188 if (env != null) { 189 this.coprocEnvironments.add(env); 190 LOG.info("System coprocessor {} loaded, priority={}.", className, coprocPriority); 191 if (!hasPriorityOverride) { 192 ++currentSystemPriority; 193 } 194 } 195 } catch (Throwable t) { 196 // We always abort if system coprocessors cannot be loaded 197 abortServer(className, t); 198 } 199 } 200 } 201 202 /** 203 * Load a coprocessor implementation into the host 204 * @param path path to implementation jar 205 * @param className the main class name 206 * @param priority chaining priority 207 * @param conf configuration for coprocessor 208 * @throws java.io.IOException Exception 209 */ 210 public E load(Path path, String className, int priority, Configuration conf) throws IOException { 211 String[] includedClassPrefixes = null; 212 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 213 String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 214 includedClassPrefixes = prefixes.split(";"); 215 } 216 return load(path, className, priority, conf, includedClassPrefixes); 217 } 218 219 /** 220 * Load a coprocessor implementation into the host 221 * @param path path to implementation jar 222 * @param className the main class name 223 * @param priority chaining priority 224 * @param conf configuration for coprocessor 225 * @param includedClassPrefixes class name prefixes to include 226 * @throws java.io.IOException Exception 227 */ 228 public E load(Path path, String className, int priority, Configuration conf, 229 String[] includedClassPrefixes) throws IOException { 230 Class<?> implClass; 231 LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority " 232 + priority); 233 234 boolean skipLoadDuplicateCoprocessor = conf.getBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, 235 DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR); 236 if (skipLoadDuplicateCoprocessor && findCoprocessor(className) != null) { 237 // If already loaded will just continue 238 LOG.warn("Attempted duplicate loading of {}; skipped", className); 239 return null; 240 } 241 242 ClassLoader cl = null; 243 if (path == null) { 244 try { 245 implClass = getClass().getClassLoader().loadClass(className); 246 } catch (ClassNotFoundException e) { 247 throw new IOException("No jar path specified for " + className); 248 } 249 } else { 250 cl = 251 CoprocessorClassLoader.getClassLoader(path, getClass().getClassLoader(), pathPrefix, conf); 252 try { 253 implClass = ((CoprocessorClassLoader) cl).loadClass(className, includedClassPrefixes); 254 } catch (ClassNotFoundException e) { 255 throw new IOException("Cannot load external coprocessor class " + className, e); 256 } 257 } 258 259 // load custom code for coprocessor 260 Thread currentThread = Thread.currentThread(); 261 ClassLoader hostClassLoader = currentThread.getContextClassLoader(); 262 try { 263 // switch temporarily to the thread classloader for custom CP 264 currentThread.setContextClassLoader(cl); 265 E cpInstance = checkAndLoadInstance(implClass, priority, conf); 266 return cpInstance; 267 } finally { 268 // restore the fresh (host) classloader 269 currentThread.setContextClassLoader(hostClassLoader); 270 } 271 } 272 273 public void load(Class<? extends C> implClass, int priority, Configuration conf) 274 throws IOException { 275 E env = checkAndLoadInstance(implClass, priority, conf); 276 coprocEnvironments.add(env); 277 } 278 279 /** 280 * @param implClass Implementation class 281 * @param priority priority 282 * @param conf configuration 283 * @throws java.io.IOException Exception 284 */ 285 public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf) 286 throws IOException { 287 // create the instance 288 C impl; 289 try { 290 impl = checkAndGetInstance(implClass); 291 if (impl == null) { 292 LOG.error("Cannot load coprocessor " + implClass.getSimpleName()); 293 return null; 294 } 295 } catch (InstantiationException | IllegalAccessException e) { 296 throw new IOException(e); 297 } 298 // create the environment 299 E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf); 300 assert env instanceof BaseEnvironment; 301 ((BaseEnvironment<C>) env).startup(); 302 // HBASE-4014: maintain list of loaded coprocessors for later crash analysis 303 // if server (master or regionserver) aborts. 304 coprocessorNames.add(implClass.getName()); 305 return env; 306 } 307 308 /** 309 * Called when a new Coprocessor class is loaded 310 */ 311 public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf); 312 313 /** 314 * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class is 315 * what the corresponding host implementation expects. If it is of correct type, returns an 316 * instance of the coprocessor to be loaded. If not, returns null. If an exception occurs when 317 * trying to create instance of a coprocessor, it's passed up and eventually results into server 318 * aborting. 319 */ 320 public abstract C checkAndGetInstance(Class<?> implClass) 321 throws InstantiationException, IllegalAccessException; 322 323 public void shutdown(E e) { 324 assert e instanceof BaseEnvironment; 325 if (LOG.isDebugEnabled()) { 326 LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName()); 327 } 328 ((BaseEnvironment<C>) e).shutdown(); 329 } 330 331 /** 332 * Find coprocessors by full class name or simple name. 333 */ 334 public C findCoprocessor(String className) { 335 for (E env : coprocEnvironments) { 336 if ( 337 env.getInstance().getClass().getName().equals(className) 338 || env.getInstance().getClass().getSimpleName().equals(className) 339 ) { 340 return env.getInstance(); 341 } 342 } 343 return null; 344 } 345 346 public <T extends C> T findCoprocessor(Class<T> cls) { 347 for (E env : coprocEnvironments) { 348 if (cls.isAssignableFrom(env.getInstance().getClass())) { 349 return (T) env.getInstance(); 350 } 351 } 352 return null; 353 } 354 355 /** 356 * Find list of coprocessors that extend/implement the given class/interface 357 * @param cls the class/interface to look for 358 * @return the list of coprocessors, or null if not found 359 */ 360 public <T extends C> List<T> findCoprocessors(Class<T> cls) { 361 ArrayList<T> ret = new ArrayList<>(); 362 363 for (E env : coprocEnvironments) { 364 C cp = env.getInstance(); 365 366 if (cp != null) { 367 if (cls.isAssignableFrom(cp.getClass())) { 368 ret.add((T) cp); 369 } 370 } 371 } 372 return ret; 373 } 374 375 /** 376 * Find a coprocessor environment by class name 377 * @param className the class name 378 * @return the coprocessor, or null if not found 379 */ 380 public E findCoprocessorEnvironment(String className) { 381 for (E env : coprocEnvironments) { 382 if ( 383 env.getInstance().getClass().getName().equals(className) 384 || env.getInstance().getClass().getSimpleName().equals(className) 385 ) { 386 return env; 387 } 388 } 389 return null; 390 } 391 392 /** 393 * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external 394 * jar files. 395 * @return A set of ClassLoader instances 396 */ 397 Set<ClassLoader> getExternalClassLoaders() { 398 Set<ClassLoader> externalClassLoaders = new HashSet<>(); 399 final ClassLoader systemClassLoader = this.getClass().getClassLoader(); 400 for (E env : coprocEnvironments) { 401 ClassLoader cl = env.getInstance().getClass().getClassLoader(); 402 if (cl != systemClassLoader) { 403 // do not include system classloader 404 externalClassLoaders.add(cl); 405 } 406 } 407 return externalClassLoaders; 408 } 409 410 /** 411 * Environment priority comparator. Coprocessors are chained in sorted order. 412 */ 413 static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> { 414 @Override 415 public int compare(final CoprocessorEnvironment env1, final CoprocessorEnvironment env2) { 416 if (env1.getPriority() < env2.getPriority()) { 417 return -1; 418 } else if (env1.getPriority() > env2.getPriority()) { 419 return 1; 420 } 421 if (env1.getLoadSequence() < env2.getLoadSequence()) { 422 return -1; 423 } else if (env1.getLoadSequence() > env2.getLoadSequence()) { 424 return 1; 425 } 426 return 0; 427 } 428 } 429 430 protected void abortServer(final E environment, final Throwable e) { 431 abortServer(environment.getInstance().getClass().getName(), e); 432 } 433 434 protected void abortServer(final String coprocessorName, final Throwable e) { 435 String message = "The coprocessor " + coprocessorName + " threw " + e.toString(); 436 LOG.error(message, e); 437 if (abortable != null) { 438 abortable.abort(message, e); 439 } else { 440 LOG.warn("No available Abortable, process was not aborted"); 441 } 442 } 443 444 /** 445 * This is used by coprocessor hooks which are declared to throw IOException (or its subtypes). 446 * For such hooks, we should handle throwable objects depending on the Throwable's type. Those 447 * which are instances of IOException should be passed on to the client. This is in conformance 448 * with the HBase idiom regarding IOException: that it represents a circumstance that should be 449 * passed along to the client for its own handling. For example, a coprocessor that implements 450 * access controls would throw a subclass of IOException, such as AccessDeniedException, in its 451 * preGet() method to prevent an unauthorized client's performing a Get on a particular table. 452 * @param env Coprocessor Environment 453 * @param e Throwable object thrown by coprocessor. 454 * @exception IOException Exception 455 */ 456 // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver}, 457 // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate 458 // exceptions depending on situation. If any changes are made to this logic, make sure to 459 // update all classes' comments. 460 protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException { 461 if (e instanceof IOException) { 462 throw (IOException) e; 463 } 464 // If we got here, e is not an IOException. A loaded coprocessor has a 465 // fatal bug, and the server (master or regionserver) should remove the 466 // faulty coprocessor from its set of active coprocessors. Setting 467 // 'hbase.coprocessor.abortonerror' to true will cause abortServer(), 468 // which may be useful in development and testing environments where 469 // 'failing fast' for error analysis is desired. 470 if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 471 // server is configured to abort. 472 abortServer(env, e); 473 } else { 474 // If available, pull a table name out of the environment 475 if (env instanceof RegionCoprocessorEnvironment) { 476 String tableName = 477 ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getNameAsString(); 478 LOG.error("Removing coprocessor '" + env.toString() + "' from table '" + tableName + "'", 479 e); 480 } else { 481 LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment", e); 482 } 483 484 coprocEnvironments.remove(env); 485 try { 486 shutdown(env); 487 } catch (Exception x) { 488 LOG.error("Uncaught exception when shutting down coprocessor '" + env.toString() + "'", x); 489 } 490 throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e 491 + "' and has been removed from the active " + "coprocessor set.", e); 492 } 493 } 494 495 /** 496 * Used to limit legacy handling to once per Coprocessor class per classloader. 497 */ 498 private static final Set<Class<? extends Coprocessor>> legacyWarning = 499 new ConcurrentSkipListSet<>(new Comparator<Class<? extends Coprocessor>>() { 500 @Override 501 public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) { 502 if (c1.equals(c2)) { 503 return 0; 504 } 505 return c1.getName().compareTo(c2.getName()); 506 } 507 }); 508 509 /** 510 * Implementations defined function to get an observer of type {@code O} from a coprocessor of 511 * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each observer 512 * they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for each of 513 * RegionObserver, EndpointObserver and BulkLoadObserver. These getters are used by 514 * {@code ObserverOperation} to get appropriate observer from the coprocessor. 515 */ 516 @FunctionalInterface 517 public interface ObserverGetter<C, O> extends Function<C, Optional<O>> { 518 } 519 520 private abstract class ObserverOperation<O> extends ObserverContextImpl<E> { 521 ObserverGetter<C, O> observerGetter; 522 523 ObserverOperation(ObserverGetter<C, O> observerGetter) { 524 this(observerGetter, null); 525 } 526 527 ObserverOperation(ObserverGetter<C, O> observerGetter, User user) { 528 this(observerGetter, user, false); 529 } 530 531 ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) { 532 this(observerGetter, null, bypassable); 533 } 534 535 ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) { 536 super(user != null ? user : RpcServer.getRequestUser().orElse(null), bypassable); 537 this.observerGetter = observerGetter; 538 } 539 540 abstract void callObserver() throws IOException; 541 542 protected void postEnvCall() { 543 } 544 } 545 546 // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all 547 // ObserverCaller implementations will have to have a return statement. 548 // O = observer, E = environment, C = coprocessor, R=result type 549 public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> { 550 protected abstract void call(O observer) throws IOException; 551 552 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) { 553 super(observerGetter); 554 } 555 556 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) { 557 super(observerGetter, user); 558 } 559 560 public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user, 561 boolean bypassable) { 562 super(observerGetter, user, bypassable); 563 } 564 565 /** 566 * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor} 567 * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all observers, 568 * in which case they will return null for that observer's getter. We simply ignore such cases. 569 */ 570 @Override 571 void callObserver() throws IOException { 572 Optional<O> observer = observerGetter.apply(getEnvironment().getInstance()); 573 if (observer.isPresent()) { 574 call(observer.get()); 575 } 576 } 577 } 578 579 public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> { 580 protected abstract R call(O observer) throws IOException; 581 582 private R result; 583 584 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) { 585 this(observerGetter, result, false); 586 } 587 588 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, 589 boolean bypassable) { 590 this(observerGetter, result, null, bypassable); 591 } 592 593 public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) { 594 this(observerGetter, result, user, false); 595 } 596 597 private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user, 598 boolean bypassable) { 599 super(observerGetter, user, bypassable); 600 this.result = result; 601 } 602 603 protected R getResult() { 604 return this.result; 605 } 606 607 @Override 608 void callObserver() throws IOException { 609 Optional<O> observer = observerGetter.apply(getEnvironment().getInstance()); 610 if (observer.isPresent()) { 611 result = call(observer.get()); 612 } 613 } 614 } 615 616 ////////////////////////////////////////////////////////////////////////////////////////// 617 // Functions to execute observer hooks and handle results (if any) 618 ////////////////////////////////////////////////////////////////////////////////////////// 619 620 /** 621 * Do not call with an observerOperation that is null! Have the caller check. 622 */ 623 protected <O, R> R execOperationWithResult( 624 final ObserverOperationWithResult<O, R> observerOperation) throws IOException { 625 boolean bypass = execOperation(observerOperation); 626 R result = observerOperation.getResult(); 627 return bypass == observerOperation.isBypassable() ? result : null; 628 } 629 630 /** 631 * @return True if we are to bypass (Can only be <code>true</code> if 632 * ObserverOperation#isBypassable(). 633 */ 634 protected <O> boolean execOperation(final ObserverOperation<O> observerOperation) 635 throws IOException { 636 boolean bypass = false; 637 if (observerOperation == null) { 638 return bypass; 639 } 640 List<E> envs = coprocEnvironments.get(); 641 for (E env : envs) { 642 observerOperation.prepare(env); 643 Thread currentThread = Thread.currentThread(); 644 ClassLoader cl = currentThread.getContextClassLoader(); 645 try { 646 currentThread.setContextClassLoader(env.getClassLoader()); 647 observerOperation.callObserver(); 648 } catch (Throwable e) { 649 handleCoprocessorThrowable(env, e); 650 } finally { 651 currentThread.setContextClassLoader(cl); 652 } 653 // Internal to shouldBypass, it checks if obeserverOperation#isBypassable(). 654 bypass |= observerOperation.shouldBypass(); 655 observerOperation.postEnvCall(); 656 if (bypass) { 657 // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response. 658 // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'. 659 break; 660 } 661 } 662 return bypass; 663 } 664 665 /** 666 * Coprocessor classes can be configured in any order, based on that priority is set and chained 667 * in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is going down. 668 * This function first calls coprocessor methods (using ObserverOperation.call()) and then 669 * shutdowns the environment in postEnvCall(). <br> 670 * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors 671 * may remain shutdown if any exception occurs during next coprocessor execution which prevent 672 * master/regionserver stop or cluster shutdown. (Refer: 673 * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a> 674 * @return true if bypaas coprocessor execution, false if not. 675 */ 676 protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation) 677 throws IOException { 678 if (observerOperation == null) return false; 679 boolean bypass = false; 680 List<E> envs = coprocEnvironments.get(); 681 // Iterate the coprocessors and execute ObserverOperation's call() 682 for (E env : envs) { 683 observerOperation.prepare(env); 684 Thread currentThread = Thread.currentThread(); 685 ClassLoader cl = currentThread.getContextClassLoader(); 686 try { 687 currentThread.setContextClassLoader(env.getClassLoader()); 688 observerOperation.callObserver(); 689 } catch (Throwable e) { 690 handleCoprocessorThrowable(env, e); 691 } finally { 692 currentThread.setContextClassLoader(cl); 693 } 694 bypass |= observerOperation.shouldBypass(); 695 } 696 697 // Iterate the coprocessors and execute ObserverOperation's postEnvCall() 698 for (E env : envs) { 699 observerOperation.prepare(env); 700 observerOperation.postEnvCall(); 701 } 702 return bypass; 703 } 704}