001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.coprocessor;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Optional;
028import java.util.Set;
029import java.util.TreeSet;
030import java.util.UUID;
031import java.util.concurrent.ConcurrentSkipListSet;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.function.Function;
034
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Abortable;
041import org.apache.hadoop.hbase.Coprocessor;
042import org.apache.hadoop.hbase.CoprocessorEnvironment;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
048import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
049import org.apache.hadoop.hbase.util.SortedList;
050
051/**
052 * Provides the common setup framework and runtime services for coprocessor
053 * invocation from HBase services.
054 * @param <C> type of specific coprocessor this host will handle
055 * @param <E> type of specific coprocessor environment this host requires.
056 * provides
057 */
058@InterfaceAudience.Private
059public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> {
060  public static final String REGION_COPROCESSOR_CONF_KEY =
061      "hbase.coprocessor.region.classes";
062  public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
063      "hbase.coprocessor.regionserver.classes";
064  public static final String USER_REGION_COPROCESSOR_CONF_KEY =
065      "hbase.coprocessor.user.region.classes";
066  public static final String MASTER_COPROCESSOR_CONF_KEY =
067      "hbase.coprocessor.master.classes";
068  public static final String WAL_COPROCESSOR_CONF_KEY =
069    "hbase.coprocessor.wal.classes";
070  public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
071  public static final boolean DEFAULT_ABORT_ON_ERROR = true;
072  public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
073  public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
074  public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
075    "hbase.coprocessor.user.enabled";
076  public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
077
078  private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class);
079  protected Abortable abortable;
080  /** Ordered set of loaded coprocessors with lock */
081  protected final SortedList<E> coprocEnvironments =
082      new SortedList<>(new EnvironmentPriorityComparator());
083  protected Configuration conf;
084  // unique file prefix to use for local copies of jars when classloading
085  protected String pathPrefix;
086  protected AtomicInteger loadSequence = new AtomicInteger();
087
088  public CoprocessorHost(Abortable abortable) {
089    this.abortable = abortable;
090    this.pathPrefix = UUID.randomUUID().toString();
091  }
092
093  /**
094   * Not to be confused with the per-object _coprocessors_ (above),
095   * coprocessorNames is static and stores the set of all coprocessors ever
096   * loaded by any thread in this JVM. It is strictly additive: coprocessors are
097   * added to coprocessorNames, by checkAndLoadInstance() but are never removed, since
098   * the intention is to preserve a history of all loaded coprocessors for
099   * diagnosis in case of server crash (HBASE-4014).
100   */
101  private static Set<String> coprocessorNames =
102      Collections.synchronizedSet(new HashSet<String>());
103
104  public static Set<String> getLoadedCoprocessors() {
105    synchronized (coprocessorNames) {
106      return new HashSet(coprocessorNames);
107    }
108  }
109
110  /**
111   * Used to create a parameter to the HServerLoad constructor so that
112   * HServerLoad can provide information about the coprocessors loaded by this
113   * regionserver.
114   * (HBASE-4070: Improve region server metrics to report loaded coprocessors
115   * to master).
116   */
117  public Set<String> getCoprocessors() {
118    Set<String> returnValue = new TreeSet<>();
119    for (E e: coprocEnvironments) {
120      returnValue.add(e.getInstance().getClass().getSimpleName());
121    }
122    return returnValue;
123  }
124
125  /**
126   * Load system coprocessors once only. Read the class names from configuration.
127   * Called by constructor.
128   */
129  protected void loadSystemCoprocessors(Configuration conf, String confKey) {
130    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
131      DEFAULT_COPROCESSORS_ENABLED);
132    if (!coprocessorsEnabled) {
133      return;
134    }
135
136    Class<?> implClass;
137
138    // load default coprocessors from configure file
139    String[] defaultCPClasses = conf.getStrings(confKey);
140    if (defaultCPClasses == null || defaultCPClasses.length == 0)
141      return;
142
143    int priority = Coprocessor.PRIORITY_SYSTEM;
144    for (String className : defaultCPClasses) {
145      className = className.trim();
146      if (findCoprocessor(className) != null) {
147        // If already loaded will just continue
148        LOG.warn("Attempted duplicate loading of " + className + "; skipped");
149        continue;
150      }
151      ClassLoader cl = this.getClass().getClassLoader();
152      Thread.currentThread().setContextClassLoader(cl);
153      try {
154        implClass = cl.loadClass(className);
155        // Add coprocessors as we go to guard against case where a coprocessor is specified twice
156        // in the configuration
157        E env = checkAndLoadInstance(implClass, priority, conf);
158        if (env != null) {
159          this.coprocEnvironments.add(env);
160          LOG.info("System coprocessor {} loaded, priority={}.", className, priority);
161          ++priority;
162        }
163      } catch (Throwable t) {
164        // We always abort if system coprocessors cannot be loaded
165        abortServer(className, t);
166      }
167    }
168  }
169
170  /**
171   * Load a coprocessor implementation into the host
172   * @param path path to implementation jar
173   * @param className the main class name
174   * @param priority chaining priority
175   * @param conf configuration for coprocessor
176   * @throws java.io.IOException Exception
177   */
178  public E load(Path path, String className, int priority,
179      Configuration conf) throws IOException {
180    String[] includedClassPrefixes = null;
181    if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null){
182      String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
183      includedClassPrefixes = prefixes.split(";");
184    }
185    return load(path, className, priority, conf, includedClassPrefixes);
186  }
187
188  /**
189   * Load a coprocessor implementation into the host
190   * @param path path to implementation jar
191   * @param className the main class name
192   * @param priority chaining priority
193   * @param conf configuration for coprocessor
194   * @param includedClassPrefixes class name prefixes to include
195   * @throws java.io.IOException Exception
196   */
197  public E load(Path path, String className, int priority,
198      Configuration conf, String[] includedClassPrefixes) throws IOException {
199    Class<?> implClass;
200    LOG.debug("Loading coprocessor class " + className + " with path " +
201        path + " and priority " + priority);
202
203    ClassLoader cl = null;
204    if (path == null) {
205      try {
206        implClass = getClass().getClassLoader().loadClass(className);
207      } catch (ClassNotFoundException e) {
208        throw new IOException("No jar path specified for " + className);
209      }
210    } else {
211      cl = CoprocessorClassLoader.getClassLoader(
212        path, getClass().getClassLoader(), pathPrefix, conf);
213      try {
214        implClass = ((CoprocessorClassLoader)cl).loadClass(className, includedClassPrefixes);
215      } catch (ClassNotFoundException e) {
216        throw new IOException("Cannot load external coprocessor class " + className, e);
217      }
218    }
219
220    //load custom code for coprocessor
221    Thread currentThread = Thread.currentThread();
222    ClassLoader hostClassLoader = currentThread.getContextClassLoader();
223    try{
224      // switch temporarily to the thread classloader for custom CP
225      currentThread.setContextClassLoader(cl);
226      E cpInstance = checkAndLoadInstance(implClass, priority, conf);
227      return cpInstance;
228    } finally {
229      // restore the fresh (host) classloader
230      currentThread.setContextClassLoader(hostClassLoader);
231    }
232  }
233
234  @VisibleForTesting
235  public void load(Class<? extends C> implClass, int priority, Configuration conf)
236      throws IOException {
237    E env = checkAndLoadInstance(implClass, priority, conf);
238    coprocEnvironments.add(env);
239  }
240
241  /**
242   * @param implClass Implementation class
243   * @param priority priority
244   * @param conf configuration
245   * @throws java.io.IOException Exception
246   */
247  public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf)
248      throws IOException {
249    // create the instance
250    C impl;
251    try {
252      impl = checkAndGetInstance(implClass);
253      if (impl == null) {
254        LOG.error("Cannot load coprocessor " + implClass.getSimpleName());
255        return null;
256      }
257    } catch (InstantiationException|IllegalAccessException e) {
258      throw new IOException(e);
259    }
260    // create the environment
261    E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
262    assert env instanceof BaseEnvironment;
263    ((BaseEnvironment<C>) env).startup();
264    // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
265    // if server (master or regionserver) aborts.
266    coprocessorNames.add(implClass.getName());
267    return env;
268  }
269
270  /**
271   * Called when a new Coprocessor class is loaded
272   */
273  public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf);
274
275  /**
276   * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class
277   * is what the corresponding host implementation expects. If it is of correct type, returns an
278   * instance of the coprocessor to be loaded. If not, returns null.
279   * If an exception occurs when trying to create instance of a coprocessor, it's passed up and
280   * eventually results into server aborting.
281   */
282  public abstract C checkAndGetInstance(Class<?> implClass)
283      throws InstantiationException, IllegalAccessException;
284
285  public void shutdown(E e) {
286    assert e instanceof BaseEnvironment;
287    if (LOG.isDebugEnabled()) {
288      LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
289    }
290    ((BaseEnvironment<C>) e).shutdown();
291  }
292
293  /**
294   * Find coprocessors by full class name or simple name.
295   */
296  public C findCoprocessor(String className) {
297    for (E env: coprocEnvironments) {
298      if (env.getInstance().getClass().getName().equals(className) ||
299          env.getInstance().getClass().getSimpleName().equals(className)) {
300        return env.getInstance();
301      }
302    }
303    return null;
304  }
305
306  @VisibleForTesting
307  public <T extends C> T findCoprocessor(Class<T> cls) {
308    for (E env: coprocEnvironments) {
309      if (cls.isAssignableFrom(env.getInstance().getClass())) {
310        return (T) env.getInstance();
311      }
312    }
313    return null;
314  }
315
316  /**
317   * Find list of coprocessors that extend/implement the given class/interface
318   * @param cls the class/interface to look for
319   * @return the list of coprocessors, or null if not found
320   */
321  public <T extends C> List<T> findCoprocessors(Class<T> cls) {
322    ArrayList<T> ret = new ArrayList<>();
323
324    for (E env: coprocEnvironments) {
325      C cp = env.getInstance();
326
327      if(cp != null) {
328        if (cls.isAssignableFrom(cp.getClass())) {
329          ret.add((T)cp);
330        }
331      }
332    }
333    return ret;
334  }
335
336  /**
337   * Find a coprocessor environment by class name
338   * @param className the class name
339   * @return the coprocessor, or null if not found
340   */
341  @VisibleForTesting
342  public E findCoprocessorEnvironment(String className) {
343    for (E env: coprocEnvironments) {
344      if (env.getInstance().getClass().getName().equals(className) ||
345          env.getInstance().getClass().getSimpleName().equals(className)) {
346        return env;
347      }
348    }
349    return null;
350  }
351
352  /**
353   * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
354   * jar files.
355   * @return A set of ClassLoader instances
356   */
357  Set<ClassLoader> getExternalClassLoaders() {
358    Set<ClassLoader> externalClassLoaders = new HashSet<>();
359    final ClassLoader systemClassLoader = this.getClass().getClassLoader();
360    for (E env : coprocEnvironments) {
361      ClassLoader cl = env.getInstance().getClass().getClassLoader();
362      if (cl != systemClassLoader){
363        //do not include system classloader
364        externalClassLoaders.add(cl);
365      }
366    }
367    return externalClassLoaders;
368  }
369
370  /**
371   * Environment priority comparator.
372   * Coprocessors are chained in sorted order.
373   */
374  static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> {
375    @Override
376    public int compare(final CoprocessorEnvironment env1,
377        final CoprocessorEnvironment env2) {
378      if (env1.getPriority() < env2.getPriority()) {
379        return -1;
380      } else if (env1.getPriority() > env2.getPriority()) {
381        return 1;
382      }
383      if (env1.getLoadSequence() < env2.getLoadSequence()) {
384        return -1;
385      } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
386        return 1;
387      }
388      return 0;
389    }
390  }
391
392  protected void abortServer(final E environment, final Throwable e) {
393    abortServer(environment.getInstance().getClass().getName(), e);
394  }
395
396  protected void abortServer(final String coprocessorName, final Throwable e) {
397    String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
398    LOG.error(message, e);
399    if (abortable != null) {
400      abortable.abort(message, e);
401    } else {
402      LOG.warn("No available Abortable, process was not aborted");
403    }
404  }
405
406  /**
407   * This is used by coprocessor hooks which are declared to throw IOException
408   * (or its subtypes). For such hooks, we should handle throwable objects
409   * depending on the Throwable's type. Those which are instances of
410   * IOException should be passed on to the client. This is in conformance with
411   * the HBase idiom regarding IOException: that it represents a circumstance
412   * that should be passed along to the client for its own handling. For
413   * example, a coprocessor that implements access controls would throw a
414   * subclass of IOException, such as AccessDeniedException, in its preGet()
415   * method to prevent an unauthorized client's performing a Get on a particular
416   * table.
417   * @param env Coprocessor Environment
418   * @param e Throwable object thrown by coprocessor.
419   * @exception IOException Exception
420   */
421  // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver},
422  // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate
423  // exceptions depending on situation. If any changes are made to this logic, make sure to
424  // update all classes' comments.
425  protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException {
426    if (e instanceof IOException) {
427      throw (IOException)e;
428    }
429    // If we got here, e is not an IOException. A loaded coprocessor has a
430    // fatal bug, and the server (master or regionserver) should remove the
431    // faulty coprocessor from its set of active coprocessors. Setting
432    // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
433    // which may be useful in development and testing environments where
434    // 'failing fast' for error analysis is desired.
435    if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
436      // server is configured to abort.
437      abortServer(env, e);
438    } else {
439      // If available, pull a table name out of the environment
440      if(env instanceof RegionCoprocessorEnvironment) {
441        String tableName = ((RegionCoprocessorEnvironment)env).getRegionInfo().getTable().getNameAsString();
442        LOG.error("Removing coprocessor '" + env.toString() + "' from table '"+ tableName + "'", e);
443      } else {
444        LOG.error("Removing coprocessor '" + env.toString() + "' from " +
445                "environment",e);
446      }
447
448      coprocEnvironments.remove(env);
449      try {
450        shutdown(env);
451      } catch (Exception x) {
452        LOG.error("Uncaught exception when shutting down coprocessor '"
453            + env.toString() + "'", x);
454      }
455      throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
456          "' threw: '" + e + "' and has been removed from the active " +
457          "coprocessor set.", e);
458    }
459  }
460
461  /**
462   * Used to limit legacy handling to once per Coprocessor class per classloader.
463   */
464  private static final Set<Class<? extends Coprocessor>> legacyWarning =
465      new ConcurrentSkipListSet<>(
466          new Comparator<Class<? extends Coprocessor>>() {
467            @Override
468            public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
469              if (c1.equals(c2)) {
470                return 0;
471              }
472              return c1.getName().compareTo(c2.getName());
473            }
474          });
475
476  /**
477   * Implementations defined function to get an observer of type {@code O} from a coprocessor of
478   * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each
479   * observer they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for
480   * each of RegionObserver, EndpointObserver and BulkLoadObserver.
481   * These getters are used by {@code ObserverOperation} to get appropriate observer from the
482   * coprocessor.
483   */
484  @FunctionalInterface
485  public interface ObserverGetter<C, O> extends Function<C, Optional<O>> {}
486
487  private abstract class ObserverOperation<O> extends ObserverContextImpl<E> {
488    ObserverGetter<C, O> observerGetter;
489
490    ObserverOperation(ObserverGetter<C, O> observerGetter) {
491      this(observerGetter, null);
492    }
493
494    ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
495      this(observerGetter, user, false);
496    }
497
498    ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
499      this(observerGetter, null, bypassable);
500    }
501
502    ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
503      super(user != null? user: RpcServer.getRequestUser().orElse(null), bypassable);
504      this.observerGetter = observerGetter;
505    }
506
507    abstract void callObserver() throws IOException;
508    protected void postEnvCall() {}
509  }
510
511  // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all
512  // ObserverCaller implementations will have to have a return statement.
513  // O = observer, E = environment, C = coprocessor, R=result type
514  public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> {
515    protected abstract void call(O observer) throws IOException;
516
517    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) {
518      super(observerGetter);
519    }
520
521    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) {
522      super(observerGetter, user);
523    }
524
525    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
526        boolean bypassable) {
527      super(observerGetter, user, bypassable);
528    }
529
530    /**
531     * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
532     * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all
533     * observers, in which case they will return null for that observer's getter.
534     * We simply ignore such cases.
535     */
536    @Override
537    void callObserver() throws IOException {
538      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
539      if (observer.isPresent()) {
540        call(observer.get());
541      }
542    }
543  }
544
545  public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> {
546    protected abstract R call(O observer) throws IOException;
547
548    private R result;
549
550    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) {
551      this(observerGetter, result, false);
552    }
553
554    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
555        boolean bypassable) {
556      this(observerGetter, result, null, bypassable);
557    }
558
559    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
560        User user) {
561      this(observerGetter, result, user, false);
562    }
563
564    private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
565        boolean bypassable) {
566      super(observerGetter, user, bypassable);
567      this.result = result;
568    }
569
570    protected R getResult() {
571      return this.result;
572    }
573
574    @Override
575    void callObserver() throws IOException {
576      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
577      if (observer.isPresent()) {
578        result = call(observer.get());
579      }
580    }
581  }
582
583  //////////////////////////////////////////////////////////////////////////////////////////
584  // Functions to execute observer hooks and handle results (if any)
585  //////////////////////////////////////////////////////////////////////////////////////////
586
587  /**
588   * Do not call with an observerOperation that is null! Have the caller check.
589   */
590  protected <O, R> R execOperationWithResult(
591      final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
592    boolean bypass = execOperation(observerOperation);
593    R result = observerOperation.getResult();
594    return bypass == observerOperation.isBypassable()? result: null;
595  }
596
597  /**
598   * @return True if we are to bypass (Can only be <code>true</code> if
599   * ObserverOperation#isBypassable().
600   */
601  protected <O> boolean execOperation(final ObserverOperation<O> observerOperation)
602      throws IOException {
603    boolean bypass = false;
604    if (observerOperation == null) {
605      return bypass;
606    }
607    List<E> envs = coprocEnvironments.get();
608    for (E env : envs) {
609      observerOperation.prepare(env);
610      Thread currentThread = Thread.currentThread();
611      ClassLoader cl = currentThread.getContextClassLoader();
612      try {
613        currentThread.setContextClassLoader(env.getClassLoader());
614        observerOperation.callObserver();
615      } catch (Throwable e) {
616        handleCoprocessorThrowable(env, e);
617      } finally {
618        currentThread.setContextClassLoader(cl);
619      }
620      // Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
621      bypass |= observerOperation.shouldBypass();
622      observerOperation.postEnvCall();
623      if (bypass) {
624        // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response.
625        // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'.
626        break;
627      }
628    }
629    return bypass;
630  }
631
632  /**
633   * Coprocessor classes can be configured in any order, based on that priority is set and
634   * chained in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is
635   * going down. This function first calls coprocessor methods (using ObserverOperation.call())
636   * and then shutdowns the environment in postEnvCall(). <br>
637   * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
638   * may remain shutdown if any exception occurs during next coprocessor execution which prevent
639   * master/regionserver stop or cluster shutdown. (Refer:
640   * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
641   * @return true if bypaas coprocessor execution, false if not.
642   * @throws IOException
643   */
644  protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation)
645      throws IOException {
646    if (observerOperation == null) return false;
647    boolean bypass = false;
648    List<E> envs = coprocEnvironments.get();
649    // Iterate the coprocessors and execute ObserverOperation's call()
650    for (E env : envs) {
651      observerOperation.prepare(env);
652      Thread currentThread = Thread.currentThread();
653      ClassLoader cl = currentThread.getContextClassLoader();
654      try {
655        currentThread.setContextClassLoader(env.getClassLoader());
656        observerOperation.callObserver();
657      } catch (Throwable e) {
658        handleCoprocessorThrowable(env, e);
659      } finally {
660        currentThread.setContextClassLoader(cl);
661      }
662      bypass |= observerOperation.shouldBypass();
663    }
664
665    // Iterate the coprocessors and execute ObserverOperation's postEnvCall()
666    for (E env : envs) {
667      observerOperation.prepare(env);
668      observerOperation.postEnvCall();
669    }
670    return bypass;
671  }
672}