001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Optional;
027import java.util.Set;
028import java.util.TreeSet;
029import java.util.UUID;
030import java.util.concurrent.ConcurrentSkipListSet;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.function.Function;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.Coprocessor;
037import org.apache.hadoop.hbase.CoprocessorEnvironment;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ipc.RpcServer;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
043import org.apache.hadoop.hbase.util.SortedList;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Strings;
049
050/**
051 * Provides the common setup framework and runtime services for coprocessor invocation from HBase
052 * services.
053 * @param <C> type of specific coprocessor this host will handle
054 * @param <E> type of specific coprocessor environment this host requires. provides
055 */
056@InterfaceAudience.Private
057public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> {
058  public static final String REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.region.classes";
059  public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
060    "hbase.coprocessor.regionserver.classes";
061  public static final String USER_REGION_COPROCESSOR_CONF_KEY =
062    "hbase.coprocessor.user.region.classes";
063  public static final String MASTER_COPROCESSOR_CONF_KEY = "hbase.coprocessor.master.classes";
064  public static final String CLIENT_META_COPROCESSOR_CONF_KEY =
065    "hbase.coprocessor.clientmeta.classes";
066  public static final String WAL_COPROCESSOR_CONF_KEY = "hbase.coprocessor.wal.classes";
067  public static final String RPC_COPROCESSOR_CONF_KEY = "hbase.coprocessor.rpc.classes";
068  public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
069  public static final boolean DEFAULT_ABORT_ON_ERROR = true;
070  public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
071  public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
072  public static final String USER_COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.user.enabled";
073  public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
074  public static final String SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR =
075    "hbase.skip.load.duplicate.table.coprocessor";
076  public static final boolean DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = false;
077
078  private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class);
079  protected Abortable abortable;
080  /** Ordered set of loaded coprocessors with lock */
081  protected final SortedList<E> coprocEnvironments =
082    new SortedList<>(new EnvironmentPriorityComparator());
083  protected Configuration conf;
084  // unique file prefix to use for local copies of jars when classloading
085  protected String pathPrefix;
086  protected AtomicInteger loadSequence = new AtomicInteger();
087
088  public CoprocessorHost(Abortable abortable) {
089    this.abortable = abortable;
090    this.pathPrefix = UUID.randomUUID().toString();
091  }
092
093  /**
094   * Not to be confused with the per-object _coprocessors_ (above), coprocessorNames is static and
095   * stores the set of all coprocessors ever loaded by any thread in this JVM. It is strictly
096   * additive: coprocessors are added to coprocessorNames, by checkAndLoadInstance() but are never
097   * removed, since the intention is to preserve a history of all loaded coprocessors for diagnosis
098   * in case of server crash (HBASE-4014).
099   */
100  private static Set<String> coprocessorNames = Collections.synchronizedSet(new HashSet<String>());
101
102  public static Set<String> getLoadedCoprocessors() {
103    synchronized (coprocessorNames) {
104      return new HashSet(coprocessorNames);
105    }
106  }
107
108  /**
109   * Used to create a parameter to the HServerLoad constructor so that HServerLoad can provide
110   * information about the coprocessors loaded by this regionserver. (HBASE-4070: Improve region
111   * server metrics to report loaded coprocessors to master).
112   */
113  public Set<String> getCoprocessors() {
114    Set<String> returnValue = new TreeSet<>();
115    for (E e : coprocEnvironments) {
116      returnValue.add(e.getInstance().getClass().getSimpleName());
117    }
118    return returnValue;
119  }
120
121  /**
122   * Get the full class names of all loaded coprocessors. This method returns the complete class
123   * names including package information, which is useful for precise coprocessor identification and
124   * comparison.
125   */
126  public Set<String> getCoprocessorClassNames() {
127    Set<String> returnValue = new TreeSet<>();
128    for (E e : coprocEnvironments) {
129      returnValue.add(e.getInstance().getClass().getName());
130    }
131    return returnValue;
132  }
133
134  /**
135   * Load system coprocessors once only. Read the class names from configuration. Called by
136   * constructor.
137   */
138  protected void loadSystemCoprocessors(Configuration conf, String confKey) {
139    boolean coprocessorsEnabled =
140      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
141    if (!coprocessorsEnabled) {
142      return;
143    }
144
145    Class<?> implClass;
146
147    // load default coprocessors from configure file
148    String[] defaultCPClasses = conf.getStrings(confKey);
149    if (defaultCPClasses == null || defaultCPClasses.length == 0) return;
150
151    int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM;
152    for (String className : defaultCPClasses) {
153      // After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept
154      // an optional format of className|priority|path
155      String[] classNameToken = className.split("\\|");
156      boolean hasPriorityOverride = false;
157      boolean hasPath = false;
158      className = classNameToken[0];
159      int overridePriority = Coprocessor.PRIORITY_SYSTEM;
160      Path path = null;
161      if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) {
162        overridePriority = Integer.parseInt(classNameToken[1]);
163        hasPriorityOverride = true;
164      }
165      if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) {
166        path = new Path(classNameToken[2].trim());
167        hasPath = true;
168      }
169      className = className.trim();
170      if (findCoprocessor(className) != null) {
171        // If already loaded will just continue
172        LOG.warn("Attempted duplicate loading of " + className + "; skipped");
173        continue;
174      }
175      ClassLoader cl = this.getClass().getClassLoader();
176      try {
177        // override the class loader if a path for the system coprocessor is provided.
178        if (hasPath) {
179          cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(),
180            pathPrefix, conf);
181        }
182        Thread.currentThread().setContextClassLoader(cl);
183        implClass = cl.loadClass(className);
184        int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority;
185        // Add coprocessors as we go to guard against case where a coprocessor is specified twice
186        // in the configuration
187        E env = checkAndLoadInstance(implClass, coprocPriority, conf);
188        if (env != null) {
189          this.coprocEnvironments.add(env);
190          LOG.info("System coprocessor {} loaded, priority={}.", className, coprocPriority);
191          if (!hasPriorityOverride) {
192            ++currentSystemPriority;
193          }
194        }
195      } catch (Throwable t) {
196        // We always abort if system coprocessors cannot be loaded
197        abortServer(className, t);
198      }
199    }
200  }
201
202  /**
203   * Load a coprocessor implementation into the host
204   * @param path      path to implementation jar
205   * @param className the main class name
206   * @param priority  chaining priority
207   * @param conf      configuration for coprocessor
208   * @throws java.io.IOException Exception
209   */
210  public E load(Path path, String className, int priority, Configuration conf) throws IOException {
211    String[] includedClassPrefixes = null;
212    if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
213      String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
214      includedClassPrefixes = prefixes.split(";");
215    }
216    return load(path, className, priority, conf, includedClassPrefixes);
217  }
218
219  /**
220   * Load a coprocessor implementation into the host
221   * @param path                  path to implementation jar
222   * @param className             the main class name
223   * @param priority              chaining priority
224   * @param conf                  configuration for coprocessor
225   * @param includedClassPrefixes class name prefixes to include
226   * @throws java.io.IOException Exception
227   */
228  public E load(Path path, String className, int priority, Configuration conf,
229    String[] includedClassPrefixes) throws IOException {
230    Class<?> implClass;
231    LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority "
232      + priority);
233
234    boolean skipLoadDuplicateCoprocessor = conf.getBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR,
235      DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR);
236    if (skipLoadDuplicateCoprocessor && findCoprocessor(className) != null) {
237      // If already loaded will just continue
238      LOG.warn("Attempted duplicate loading of {}; skipped", className);
239      return null;
240    }
241
242    ClassLoader cl = null;
243    if (path == null) {
244      try {
245        implClass = getClass().getClassLoader().loadClass(className);
246      } catch (ClassNotFoundException e) {
247        throw new IOException("No jar path specified for " + className);
248      }
249    } else {
250      cl =
251        CoprocessorClassLoader.getClassLoader(path, getClass().getClassLoader(), pathPrefix, conf);
252      try {
253        implClass = ((CoprocessorClassLoader) cl).loadClass(className, includedClassPrefixes);
254      } catch (ClassNotFoundException e) {
255        throw new IOException("Cannot load external coprocessor class " + className, e);
256      }
257    }
258
259    // load custom code for coprocessor
260    Thread currentThread = Thread.currentThread();
261    ClassLoader hostClassLoader = currentThread.getContextClassLoader();
262    try {
263      // switch temporarily to the thread classloader for custom CP
264      currentThread.setContextClassLoader(cl);
265      E cpInstance = checkAndLoadInstance(implClass, priority, conf);
266      return cpInstance;
267    } finally {
268      // restore the fresh (host) classloader
269      currentThread.setContextClassLoader(hostClassLoader);
270    }
271  }
272
273  public void load(Class<? extends C> implClass, int priority, Configuration conf)
274    throws IOException {
275    E env = checkAndLoadInstance(implClass, priority, conf);
276    coprocEnvironments.add(env);
277  }
278
279  /**
280   * @param implClass Implementation class
281   * @param priority  priority
282   * @param conf      configuration
283   * @throws java.io.IOException Exception
284   */
285  public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf)
286    throws IOException {
287    // create the instance
288    C impl;
289    try {
290      impl = checkAndGetInstance(implClass);
291      if (impl == null) {
292        LOG.error("Cannot load coprocessor " + implClass.getSimpleName());
293        return null;
294      }
295    } catch (InstantiationException | IllegalAccessException e) {
296      throw new IOException(e);
297    }
298    // create the environment
299    E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
300    assert env instanceof BaseEnvironment;
301    ((BaseEnvironment<C>) env).startup();
302    // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
303    // if server (master or regionserver) aborts.
304    coprocessorNames.add(implClass.getName());
305    return env;
306  }
307
308  /**
309   * Called when a new Coprocessor class is loaded
310   */
311  public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf);
312
313  /**
314   * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class is
315   * what the corresponding host implementation expects. If it is of correct type, returns an
316   * instance of the coprocessor to be loaded. If not, returns null. If an exception occurs when
317   * trying to create instance of a coprocessor, it's passed up and eventually results into server
318   * aborting.
319   */
320  public abstract C checkAndGetInstance(Class<?> implClass)
321    throws InstantiationException, IllegalAccessException;
322
323  public void shutdown(E e) {
324    assert e instanceof BaseEnvironment;
325    if (LOG.isDebugEnabled()) {
326      LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
327    }
328    ((BaseEnvironment<C>) e).shutdown();
329  }
330
331  /**
332   * Find coprocessors by full class name or simple name.
333   */
334  public C findCoprocessor(String className) {
335    for (E env : coprocEnvironments) {
336      if (
337        env.getInstance().getClass().getName().equals(className)
338          || env.getInstance().getClass().getSimpleName().equals(className)
339      ) {
340        return env.getInstance();
341      }
342    }
343    return null;
344  }
345
346  public <T extends C> T findCoprocessor(Class<T> cls) {
347    for (E env : coprocEnvironments) {
348      if (cls.isAssignableFrom(env.getInstance().getClass())) {
349        return (T) env.getInstance();
350      }
351    }
352    return null;
353  }
354
355  /**
356   * Find list of coprocessors that extend/implement the given class/interface
357   * @param cls the class/interface to look for
358   * @return the list of coprocessors, or null if not found
359   */
360  public <T extends C> List<T> findCoprocessors(Class<T> cls) {
361    ArrayList<T> ret = new ArrayList<>();
362
363    for (E env : coprocEnvironments) {
364      C cp = env.getInstance();
365
366      if (cp != null) {
367        if (cls.isAssignableFrom(cp.getClass())) {
368          ret.add((T) cp);
369        }
370      }
371    }
372    return ret;
373  }
374
375  /**
376   * Find a coprocessor environment by class name
377   * @param className the class name
378   * @return the coprocessor, or null if not found
379   */
380  public E findCoprocessorEnvironment(String className) {
381    for (E env : coprocEnvironments) {
382      if (
383        env.getInstance().getClass().getName().equals(className)
384          || env.getInstance().getClass().getSimpleName().equals(className)
385      ) {
386        return env;
387      }
388    }
389    return null;
390  }
391
392  /**
393   * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
394   * jar files.
395   * @return A set of ClassLoader instances
396   */
397  Set<ClassLoader> getExternalClassLoaders() {
398    Set<ClassLoader> externalClassLoaders = new HashSet<>();
399    final ClassLoader systemClassLoader = this.getClass().getClassLoader();
400    for (E env : coprocEnvironments) {
401      ClassLoader cl = env.getInstance().getClass().getClassLoader();
402      if (cl != systemClassLoader) {
403        // do not include system classloader
404        externalClassLoaders.add(cl);
405      }
406    }
407    return externalClassLoaders;
408  }
409
410  /**
411   * Environment priority comparator. Coprocessors are chained in sorted order.
412   */
413  static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> {
414    @Override
415    public int compare(final CoprocessorEnvironment env1, final CoprocessorEnvironment env2) {
416      if (env1.getPriority() < env2.getPriority()) {
417        return -1;
418      } else if (env1.getPriority() > env2.getPriority()) {
419        return 1;
420      }
421      if (env1.getLoadSequence() < env2.getLoadSequence()) {
422        return -1;
423      } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
424        return 1;
425      }
426      return 0;
427    }
428  }
429
430  protected void abortServer(final E environment, final Throwable e) {
431    abortServer(environment.getInstance().getClass().getName(), e);
432  }
433
434  protected void abortServer(final String coprocessorName, final Throwable e) {
435    String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
436    LOG.error(message, e);
437    if (abortable != null) {
438      abortable.abort(message, e);
439    } else {
440      LOG.warn("No available Abortable, process was not aborted");
441    }
442  }
443
444  /**
445   * This is used by coprocessor hooks which are declared to throw IOException (or its subtypes).
446   * For such hooks, we should handle throwable objects depending on the Throwable's type. Those
447   * which are instances of IOException should be passed on to the client. This is in conformance
448   * with the HBase idiom regarding IOException: that it represents a circumstance that should be
449   * passed along to the client for its own handling. For example, a coprocessor that implements
450   * access controls would throw a subclass of IOException, such as AccessDeniedException, in its
451   * preGet() method to prevent an unauthorized client's performing a Get on a particular table.
452   * @param env Coprocessor Environment
453   * @param e   Throwable object thrown by coprocessor.
454   * @exception IOException Exception
455   */
456  // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver},
457  // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate
458  // exceptions depending on situation. If any changes are made to this logic, make sure to
459  // update all classes' comments.
460  protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException {
461    if (e instanceof IOException) {
462      throw (IOException) e;
463    }
464    // If we got here, e is not an IOException. A loaded coprocessor has a
465    // fatal bug, and the server (master or regionserver) should remove the
466    // faulty coprocessor from its set of active coprocessors. Setting
467    // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
468    // which may be useful in development and testing environments where
469    // 'failing fast' for error analysis is desired.
470    if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
471      // server is configured to abort.
472      abortServer(env, e);
473    } else {
474      // If available, pull a table name out of the environment
475      if (env instanceof RegionCoprocessorEnvironment) {
476        String tableName =
477          ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getNameAsString();
478        LOG.error("Removing coprocessor '" + env.toString() + "' from table '" + tableName + "'",
479          e);
480      } else {
481        LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment", e);
482      }
483
484      coprocEnvironments.remove(env);
485      try {
486        shutdown(env);
487      } catch (Exception x) {
488        LOG.error("Uncaught exception when shutting down coprocessor '" + env.toString() + "'", x);
489      }
490      throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e
491        + "' and has been removed from the active " + "coprocessor set.", e);
492    }
493  }
494
495  /**
496   * Used to limit legacy handling to once per Coprocessor class per classloader.
497   */
498  private static final Set<Class<? extends Coprocessor>> legacyWarning =
499    new ConcurrentSkipListSet<>(new Comparator<Class<? extends Coprocessor>>() {
500      @Override
501      public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
502        if (c1.equals(c2)) {
503          return 0;
504        }
505        return c1.getName().compareTo(c2.getName());
506      }
507    });
508
509  /**
510   * Implementations defined function to get an observer of type {@code O} from a coprocessor of
511   * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each observer
512   * they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for each of
513   * RegionObserver, EndpointObserver and BulkLoadObserver. These getters are used by
514   * {@code ObserverOperation} to get appropriate observer from the coprocessor.
515   */
516  @FunctionalInterface
517  public interface ObserverGetter<C, O> extends Function<C, Optional<O>> {
518  }
519
520  private abstract class ObserverOperation<O> extends ObserverContextImpl<E> {
521    ObserverGetter<C, O> observerGetter;
522
523    ObserverOperation(ObserverGetter<C, O> observerGetter) {
524      this(observerGetter, null);
525    }
526
527    ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
528      this(observerGetter, user, false);
529    }
530
531    ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
532      this(observerGetter, null, bypassable);
533    }
534
535    ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
536      super(user != null ? user : RpcServer.getRequestUser().orElse(null), bypassable);
537      this.observerGetter = observerGetter;
538    }
539
540    abstract void callObserver() throws IOException;
541
542    protected void postEnvCall() {
543    }
544  }
545
546  // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all
547  // ObserverCaller implementations will have to have a return statement.
548  // O = observer, E = environment, C = coprocessor, R=result type
549  public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> {
550    protected abstract void call(O observer) throws IOException;
551
552    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) {
553      super(observerGetter);
554    }
555
556    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) {
557      super(observerGetter, user);
558    }
559
560    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
561      boolean bypassable) {
562      super(observerGetter, user, bypassable);
563    }
564
565    /**
566     * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
567     * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all observers,
568     * in which case they will return null for that observer's getter. We simply ignore such cases.
569     */
570    @Override
571    void callObserver() throws IOException {
572      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
573      if (observer.isPresent()) {
574        call(observer.get());
575      }
576    }
577  }
578
579  public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> {
580    protected abstract R call(O observer) throws IOException;
581
582    private R result;
583
584    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) {
585      this(observerGetter, result, false);
586    }
587
588    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
589      boolean bypassable) {
590      this(observerGetter, result, null, bypassable);
591    }
592
593    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) {
594      this(observerGetter, result, user, false);
595    }
596
597    private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
598      boolean bypassable) {
599      super(observerGetter, user, bypassable);
600      this.result = result;
601    }
602
603    protected R getResult() {
604      return this.result;
605    }
606
607    @Override
608    void callObserver() throws IOException {
609      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
610      if (observer.isPresent()) {
611        result = call(observer.get());
612      }
613    }
614  }
615
616  //////////////////////////////////////////////////////////////////////////////////////////
617  // Functions to execute observer hooks and handle results (if any)
618  //////////////////////////////////////////////////////////////////////////////////////////
619
620  /**
621   * Do not call with an observerOperation that is null! Have the caller check.
622   */
623  protected <O, R> R execOperationWithResult(
624    final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
625    boolean bypass = execOperation(observerOperation);
626    R result = observerOperation.getResult();
627    return bypass == observerOperation.isBypassable() ? result : null;
628  }
629
630  /**
631   * @return True if we are to bypass (Can only be <code>true</code> if
632   *         ObserverOperation#isBypassable().
633   */
634  protected <O> boolean execOperation(final ObserverOperation<O> observerOperation)
635    throws IOException {
636    boolean bypass = false;
637    if (observerOperation == null) {
638      return bypass;
639    }
640    List<E> envs = coprocEnvironments.get();
641    for (E env : envs) {
642      observerOperation.prepare(env);
643      Thread currentThread = Thread.currentThread();
644      ClassLoader cl = currentThread.getContextClassLoader();
645      try {
646        currentThread.setContextClassLoader(env.getClassLoader());
647        observerOperation.callObserver();
648      } catch (Throwable e) {
649        handleCoprocessorThrowable(env, e);
650      } finally {
651        currentThread.setContextClassLoader(cl);
652      }
653      // Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
654      bypass |= observerOperation.shouldBypass();
655      observerOperation.postEnvCall();
656      if (bypass) {
657        // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response.
658        // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'.
659        break;
660      }
661    }
662    return bypass;
663  }
664
665  /**
666   * Coprocessor classes can be configured in any order, based on that priority is set and chained
667   * in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is going down.
668   * This function first calls coprocessor methods (using ObserverOperation.call()) and then
669   * shutdowns the environment in postEnvCall(). <br>
670   * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
671   * may remain shutdown if any exception occurs during next coprocessor execution which prevent
672   * master/regionserver stop or cluster shutdown. (Refer:
673   * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
674   * @return true if bypaas coprocessor execution, false if not.
675   */
676  protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation)
677    throws IOException {
678    if (observerOperation == null) return false;
679    boolean bypass = false;
680    List<E> envs = coprocEnvironments.get();
681    // Iterate the coprocessors and execute ObserverOperation's call()
682    for (E env : envs) {
683      observerOperation.prepare(env);
684      Thread currentThread = Thread.currentThread();
685      ClassLoader cl = currentThread.getContextClassLoader();
686      try {
687        currentThread.setContextClassLoader(env.getClassLoader());
688        observerOperation.callObserver();
689      } catch (Throwable e) {
690        handleCoprocessorThrowable(env, e);
691      } finally {
692        currentThread.setContextClassLoader(cl);
693      }
694      bypass |= observerOperation.shouldBypass();
695    }
696
697    // Iterate the coprocessors and execute ObserverOperation's postEnvCall()
698    for (E env : envs) {
699      observerOperation.prepare(env);
700      observerOperation.postEnvCall();
701    }
702    return bypass;
703  }
704}