001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.coprocessor;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Optional;
028import java.util.Set;
029import java.util.TreeSet;
030import java.util.UUID;
031import java.util.concurrent.ConcurrentSkipListSet;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.function.Function;
034
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.Abortable;
041import org.apache.hadoop.hbase.Coprocessor;
042import org.apache.hadoop.hbase.CoprocessorEnvironment;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
048import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
049import org.apache.hadoop.hbase.util.SortedList;
050
051/**
052 * Provides the common setup framework and runtime services for coprocessor
053 * invocation from HBase services.
054 * @param <C> type of specific coprocessor this host will handle
055 * @param <E> type of specific coprocessor environment this host requires.
056 * provides
057 */
058@InterfaceAudience.Private
059public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> {
060  public static final String REGION_COPROCESSOR_CONF_KEY =
061      "hbase.coprocessor.region.classes";
062  public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
063      "hbase.coprocessor.regionserver.classes";
064  public static final String USER_REGION_COPROCESSOR_CONF_KEY =
065      "hbase.coprocessor.user.region.classes";
066  public static final String MASTER_COPROCESSOR_CONF_KEY =
067      "hbase.coprocessor.master.classes";
068  public static final String WAL_COPROCESSOR_CONF_KEY =
069    "hbase.coprocessor.wal.classes";
070  public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
071  public static final boolean DEFAULT_ABORT_ON_ERROR = true;
072  public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
073  public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
074  public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
075    "hbase.coprocessor.user.enabled";
076  public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
077  public static final String SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR =
078      "hbase.skip.load.duplicate.table.coprocessor";
079  public static final boolean DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = false;
080
081  private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class);
082  protected Abortable abortable;
083  /** Ordered set of loaded coprocessors with lock */
084  protected final SortedList<E> coprocEnvironments =
085      new SortedList<>(new EnvironmentPriorityComparator());
086  protected Configuration conf;
087  // unique file prefix to use for local copies of jars when classloading
088  protected String pathPrefix;
089  protected AtomicInteger loadSequence = new AtomicInteger();
090
091  public CoprocessorHost(Abortable abortable) {
092    this.abortable = abortable;
093    this.pathPrefix = UUID.randomUUID().toString();
094  }
095
096  /**
097   * Not to be confused with the per-object _coprocessors_ (above),
098   * coprocessorNames is static and stores the set of all coprocessors ever
099   * loaded by any thread in this JVM. It is strictly additive: coprocessors are
100   * added to coprocessorNames, by checkAndLoadInstance() but are never removed, since
101   * the intention is to preserve a history of all loaded coprocessors for
102   * diagnosis in case of server crash (HBASE-4014).
103   */
104  private static Set<String> coprocessorNames =
105      Collections.synchronizedSet(new HashSet<String>());
106
107  public static Set<String> getLoadedCoprocessors() {
108    synchronized (coprocessorNames) {
109      return new HashSet(coprocessorNames);
110    }
111  }
112
113  /**
114   * Used to create a parameter to the HServerLoad constructor so that
115   * HServerLoad can provide information about the coprocessors loaded by this
116   * regionserver.
117   * (HBASE-4070: Improve region server metrics to report loaded coprocessors
118   * to master).
119   */
120  public Set<String> getCoprocessors() {
121    Set<String> returnValue = new TreeSet<>();
122    for (E e: coprocEnvironments) {
123      returnValue.add(e.getInstance().getClass().getSimpleName());
124    }
125    return returnValue;
126  }
127
128  /**
129   * Load system coprocessors once only. Read the class names from configuration.
130   * Called by constructor.
131   */
132  protected void loadSystemCoprocessors(Configuration conf, String confKey) {
133    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
134      DEFAULT_COPROCESSORS_ENABLED);
135    if (!coprocessorsEnabled) {
136      return;
137    }
138
139    Class<?> implClass;
140
141    // load default coprocessors from configure file
142    String[] defaultCPClasses = conf.getStrings(confKey);
143    if (defaultCPClasses == null || defaultCPClasses.length == 0)
144      return;
145
146    int priority = Coprocessor.PRIORITY_SYSTEM;
147    for (String className : defaultCPClasses) {
148      className = className.trim();
149      if (findCoprocessor(className) != null) {
150        // If already loaded will just continue
151        LOG.warn("Attempted duplicate loading of " + className + "; skipped");
152        continue;
153      }
154      ClassLoader cl = this.getClass().getClassLoader();
155      Thread.currentThread().setContextClassLoader(cl);
156      try {
157        implClass = cl.loadClass(className);
158        // Add coprocessors as we go to guard against case where a coprocessor is specified twice
159        // in the configuration
160        E env = checkAndLoadInstance(implClass, priority, conf);
161        if (env != null) {
162          this.coprocEnvironments.add(env);
163          LOG.info("System coprocessor {} loaded, priority={}.", className, priority);
164          ++priority;
165        }
166      } catch (Throwable t) {
167        // We always abort if system coprocessors cannot be loaded
168        abortServer(className, t);
169      }
170    }
171  }
172
173  /**
174   * Load a coprocessor implementation into the host
175   * @param path path to implementation jar
176   * @param className the main class name
177   * @param priority chaining priority
178   * @param conf configuration for coprocessor
179   * @throws java.io.IOException Exception
180   */
181  public E load(Path path, String className, int priority,
182      Configuration conf) throws IOException {
183    String[] includedClassPrefixes = null;
184    if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null){
185      String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
186      includedClassPrefixes = prefixes.split(";");
187    }
188    return load(path, className, priority, conf, includedClassPrefixes);
189  }
190
191  /**
192   * Load a coprocessor implementation into the host
193   * @param path path to implementation jar
194   * @param className the main class name
195   * @param priority chaining priority
196   * @param conf configuration for coprocessor
197   * @param includedClassPrefixes class name prefixes to include
198   * @throws java.io.IOException Exception
199   */
200  public E load(Path path, String className, int priority,
201      Configuration conf, String[] includedClassPrefixes) throws IOException {
202    Class<?> implClass;
203    LOG.debug("Loading coprocessor class " + className + " with path " +
204        path + " and priority " + priority);
205
206    boolean skipLoadDuplicateCoprocessor = conf.getBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR,
207      DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR);
208    if (skipLoadDuplicateCoprocessor && findCoprocessor(className) != null) {
209      // If already loaded will just continue
210      LOG.warn("Attempted duplicate loading of {}; skipped", className);
211      return null;
212    }
213
214    ClassLoader cl = null;
215    if (path == null) {
216      try {
217        implClass = getClass().getClassLoader().loadClass(className);
218      } catch (ClassNotFoundException e) {
219        throw new IOException("No jar path specified for " + className);
220      }
221    } else {
222      cl = CoprocessorClassLoader.getClassLoader(
223        path, getClass().getClassLoader(), pathPrefix, conf);
224      try {
225        implClass = ((CoprocessorClassLoader)cl).loadClass(className, includedClassPrefixes);
226      } catch (ClassNotFoundException e) {
227        throw new IOException("Cannot load external coprocessor class " + className, e);
228      }
229    }
230
231    //load custom code for coprocessor
232    Thread currentThread = Thread.currentThread();
233    ClassLoader hostClassLoader = currentThread.getContextClassLoader();
234    try{
235      // switch temporarily to the thread classloader for custom CP
236      currentThread.setContextClassLoader(cl);
237      E cpInstance = checkAndLoadInstance(implClass, priority, conf);
238      return cpInstance;
239    } finally {
240      // restore the fresh (host) classloader
241      currentThread.setContextClassLoader(hostClassLoader);
242    }
243  }
244
245  @VisibleForTesting
246  public void load(Class<? extends C> implClass, int priority, Configuration conf)
247      throws IOException {
248    E env = checkAndLoadInstance(implClass, priority, conf);
249    coprocEnvironments.add(env);
250  }
251
252  /**
253   * @param implClass Implementation class
254   * @param priority priority
255   * @param conf configuration
256   * @throws java.io.IOException Exception
257   */
258  public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf)
259      throws IOException {
260    // create the instance
261    C impl;
262    try {
263      impl = checkAndGetInstance(implClass);
264      if (impl == null) {
265        LOG.error("Cannot load coprocessor " + implClass.getSimpleName());
266        return null;
267      }
268    } catch (InstantiationException|IllegalAccessException e) {
269      throw new IOException(e);
270    }
271    // create the environment
272    E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
273    assert env instanceof BaseEnvironment;
274    ((BaseEnvironment<C>) env).startup();
275    // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
276    // if server (master or regionserver) aborts.
277    coprocessorNames.add(implClass.getName());
278    return env;
279  }
280
281  /**
282   * Called when a new Coprocessor class is loaded
283   */
284  public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf);
285
286  /**
287   * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class
288   * is what the corresponding host implementation expects. If it is of correct type, returns an
289   * instance of the coprocessor to be loaded. If not, returns null.
290   * If an exception occurs when trying to create instance of a coprocessor, it's passed up and
291   * eventually results into server aborting.
292   */
293  public abstract C checkAndGetInstance(Class<?> implClass)
294      throws InstantiationException, IllegalAccessException;
295
296  public void shutdown(E e) {
297    assert e instanceof BaseEnvironment;
298    if (LOG.isDebugEnabled()) {
299      LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
300    }
301    ((BaseEnvironment<C>) e).shutdown();
302  }
303
304  /**
305   * Find coprocessors by full class name or simple name.
306   */
307  public C findCoprocessor(String className) {
308    for (E env: coprocEnvironments) {
309      if (env.getInstance().getClass().getName().equals(className) ||
310          env.getInstance().getClass().getSimpleName().equals(className)) {
311        return env.getInstance();
312      }
313    }
314    return null;
315  }
316
317  @VisibleForTesting
318  public <T extends C> T findCoprocessor(Class<T> cls) {
319    for (E env: coprocEnvironments) {
320      if (cls.isAssignableFrom(env.getInstance().getClass())) {
321        return (T) env.getInstance();
322      }
323    }
324    return null;
325  }
326
327  /**
328   * Find list of coprocessors that extend/implement the given class/interface
329   * @param cls the class/interface to look for
330   * @return the list of coprocessors, or null if not found
331   */
332  public <T extends C> List<T> findCoprocessors(Class<T> cls) {
333    ArrayList<T> ret = new ArrayList<>();
334
335    for (E env: coprocEnvironments) {
336      C cp = env.getInstance();
337
338      if(cp != null) {
339        if (cls.isAssignableFrom(cp.getClass())) {
340          ret.add((T)cp);
341        }
342      }
343    }
344    return ret;
345  }
346
347  /**
348   * Find a coprocessor environment by class name
349   * @param className the class name
350   * @return the coprocessor, or null if not found
351   */
352  @VisibleForTesting
353  public E findCoprocessorEnvironment(String className) {
354    for (E env: coprocEnvironments) {
355      if (env.getInstance().getClass().getName().equals(className) ||
356          env.getInstance().getClass().getSimpleName().equals(className)) {
357        return env;
358      }
359    }
360    return null;
361  }
362
363  /**
364   * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
365   * jar files.
366   * @return A set of ClassLoader instances
367   */
368  Set<ClassLoader> getExternalClassLoaders() {
369    Set<ClassLoader> externalClassLoaders = new HashSet<>();
370    final ClassLoader systemClassLoader = this.getClass().getClassLoader();
371    for (E env : coprocEnvironments) {
372      ClassLoader cl = env.getInstance().getClass().getClassLoader();
373      if (cl != systemClassLoader){
374        //do not include system classloader
375        externalClassLoaders.add(cl);
376      }
377    }
378    return externalClassLoaders;
379  }
380
381  /**
382   * Environment priority comparator.
383   * Coprocessors are chained in sorted order.
384   */
385  static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> {
386    @Override
387    public int compare(final CoprocessorEnvironment env1,
388        final CoprocessorEnvironment env2) {
389      if (env1.getPriority() < env2.getPriority()) {
390        return -1;
391      } else if (env1.getPriority() > env2.getPriority()) {
392        return 1;
393      }
394      if (env1.getLoadSequence() < env2.getLoadSequence()) {
395        return -1;
396      } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
397        return 1;
398      }
399      return 0;
400    }
401  }
402
403  protected void abortServer(final E environment, final Throwable e) {
404    abortServer(environment.getInstance().getClass().getName(), e);
405  }
406
407  protected void abortServer(final String coprocessorName, final Throwable e) {
408    String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
409    LOG.error(message, e);
410    if (abortable != null) {
411      abortable.abort(message, e);
412    } else {
413      LOG.warn("No available Abortable, process was not aborted");
414    }
415  }
416
417  /**
418   * This is used by coprocessor hooks which are declared to throw IOException
419   * (or its subtypes). For such hooks, we should handle throwable objects
420   * depending on the Throwable's type. Those which are instances of
421   * IOException should be passed on to the client. This is in conformance with
422   * the HBase idiom regarding IOException: that it represents a circumstance
423   * that should be passed along to the client for its own handling. For
424   * example, a coprocessor that implements access controls would throw a
425   * subclass of IOException, such as AccessDeniedException, in its preGet()
426   * method to prevent an unauthorized client's performing a Get on a particular
427   * table.
428   * @param env Coprocessor Environment
429   * @param e Throwable object thrown by coprocessor.
430   * @exception IOException Exception
431   */
432  // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver},
433  // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate
434  // exceptions depending on situation. If any changes are made to this logic, make sure to
435  // update all classes' comments.
436  protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException {
437    if (e instanceof IOException) {
438      throw (IOException)e;
439    }
440    // If we got here, e is not an IOException. A loaded coprocessor has a
441    // fatal bug, and the server (master or regionserver) should remove the
442    // faulty coprocessor from its set of active coprocessors. Setting
443    // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
444    // which may be useful in development and testing environments where
445    // 'failing fast' for error analysis is desired.
446    if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
447      // server is configured to abort.
448      abortServer(env, e);
449    } else {
450      // If available, pull a table name out of the environment
451      if(env instanceof RegionCoprocessorEnvironment) {
452        String tableName = ((RegionCoprocessorEnvironment)env).getRegionInfo().getTable().getNameAsString();
453        LOG.error("Removing coprocessor '" + env.toString() + "' from table '"+ tableName + "'", e);
454      } else {
455        LOG.error("Removing coprocessor '" + env.toString() + "' from " +
456                "environment",e);
457      }
458
459      coprocEnvironments.remove(env);
460      try {
461        shutdown(env);
462      } catch (Exception x) {
463        LOG.error("Uncaught exception when shutting down coprocessor '"
464            + env.toString() + "'", x);
465      }
466      throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
467          "' threw: '" + e + "' and has been removed from the active " +
468          "coprocessor set.", e);
469    }
470  }
471
472  /**
473   * Used to limit legacy handling to once per Coprocessor class per classloader.
474   */
475  private static final Set<Class<? extends Coprocessor>> legacyWarning =
476      new ConcurrentSkipListSet<>(
477          new Comparator<Class<? extends Coprocessor>>() {
478            @Override
479            public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
480              if (c1.equals(c2)) {
481                return 0;
482              }
483              return c1.getName().compareTo(c2.getName());
484            }
485          });
486
487  /**
488   * Implementations defined function to get an observer of type {@code O} from a coprocessor of
489   * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each
490   * observer they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for
491   * each of RegionObserver, EndpointObserver and BulkLoadObserver.
492   * These getters are used by {@code ObserverOperation} to get appropriate observer from the
493   * coprocessor.
494   */
495  @FunctionalInterface
496  public interface ObserverGetter<C, O> extends Function<C, Optional<O>> {}
497
498  private abstract class ObserverOperation<O> extends ObserverContextImpl<E> {
499    ObserverGetter<C, O> observerGetter;
500
501    ObserverOperation(ObserverGetter<C, O> observerGetter) {
502      this(observerGetter, null);
503    }
504
505    ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
506      this(observerGetter, user, false);
507    }
508
509    ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
510      this(observerGetter, null, bypassable);
511    }
512
513    ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
514      super(user != null? user: RpcServer.getRequestUser().orElse(null), bypassable);
515      this.observerGetter = observerGetter;
516    }
517
518    abstract void callObserver() throws IOException;
519    protected void postEnvCall() {}
520  }
521
522  // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all
523  // ObserverCaller implementations will have to have a return statement.
524  // O = observer, E = environment, C = coprocessor, R=result type
525  public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> {
526    protected abstract void call(O observer) throws IOException;
527
528    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) {
529      super(observerGetter);
530    }
531
532    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) {
533      super(observerGetter, user);
534    }
535
536    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
537        boolean bypassable) {
538      super(observerGetter, user, bypassable);
539    }
540
541    /**
542     * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
543     * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all
544     * observers, in which case they will return null for that observer's getter.
545     * We simply ignore such cases.
546     */
547    @Override
548    void callObserver() throws IOException {
549      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
550      if (observer.isPresent()) {
551        call(observer.get());
552      }
553    }
554  }
555
556  public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> {
557    protected abstract R call(O observer) throws IOException;
558
559    private R result;
560
561    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) {
562      this(observerGetter, result, false);
563    }
564
565    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
566        boolean bypassable) {
567      this(observerGetter, result, null, bypassable);
568    }
569
570    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
571        User user) {
572      this(observerGetter, result, user, false);
573    }
574
575    private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
576        boolean bypassable) {
577      super(observerGetter, user, bypassable);
578      this.result = result;
579    }
580
581    protected R getResult() {
582      return this.result;
583    }
584
585    @Override
586    void callObserver() throws IOException {
587      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
588      if (observer.isPresent()) {
589        result = call(observer.get());
590      }
591    }
592  }
593
594  //////////////////////////////////////////////////////////////////////////////////////////
595  // Functions to execute observer hooks and handle results (if any)
596  //////////////////////////////////////////////////////////////////////////////////////////
597
598  /**
599   * Do not call with an observerOperation that is null! Have the caller check.
600   */
601  protected <O, R> R execOperationWithResult(
602      final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
603    boolean bypass = execOperation(observerOperation);
604    R result = observerOperation.getResult();
605    return bypass == observerOperation.isBypassable()? result: null;
606  }
607
608  /**
609   * @return True if we are to bypass (Can only be <code>true</code> if
610   * ObserverOperation#isBypassable().
611   */
612  protected <O> boolean execOperation(final ObserverOperation<O> observerOperation)
613      throws IOException {
614    boolean bypass = false;
615    if (observerOperation == null) {
616      return bypass;
617    }
618    List<E> envs = coprocEnvironments.get();
619    for (E env : envs) {
620      observerOperation.prepare(env);
621      Thread currentThread = Thread.currentThread();
622      ClassLoader cl = currentThread.getContextClassLoader();
623      try {
624        currentThread.setContextClassLoader(env.getClassLoader());
625        observerOperation.callObserver();
626      } catch (Throwable e) {
627        handleCoprocessorThrowable(env, e);
628      } finally {
629        currentThread.setContextClassLoader(cl);
630      }
631      // Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
632      bypass |= observerOperation.shouldBypass();
633      observerOperation.postEnvCall();
634      if (bypass) {
635        // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response.
636        // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'.
637        break;
638      }
639    }
640    return bypass;
641  }
642
643  /**
644   * Coprocessor classes can be configured in any order, based on that priority is set and
645   * chained in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is
646   * going down. This function first calls coprocessor methods (using ObserverOperation.call())
647   * and then shutdowns the environment in postEnvCall(). <br>
648   * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
649   * may remain shutdown if any exception occurs during next coprocessor execution which prevent
650   * master/regionserver stop or cluster shutdown. (Refer:
651   * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
652   * @return true if bypaas coprocessor execution, false if not.
653   * @throws IOException
654   */
655  protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation)
656      throws IOException {
657    if (observerOperation == null) return false;
658    boolean bypass = false;
659    List<E> envs = coprocEnvironments.get();
660    // Iterate the coprocessors and execute ObserverOperation's call()
661    for (E env : envs) {
662      observerOperation.prepare(env);
663      Thread currentThread = Thread.currentThread();
664      ClassLoader cl = currentThread.getContextClassLoader();
665      try {
666        currentThread.setContextClassLoader(env.getClassLoader());
667        observerOperation.callObserver();
668      } catch (Throwable e) {
669        handleCoprocessorThrowable(env, e);
670      } finally {
671        currentThread.setContextClassLoader(cl);
672      }
673      bypass |= observerOperation.shouldBypass();
674    }
675
676    // Iterate the coprocessors and execute ObserverOperation's postEnvCall()
677    for (E env : envs) {
678      observerOperation.prepare(env);
679      observerOperation.postEnvCall();
680    }
681    return bypass;
682  }
683}