001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Optional;
027import java.util.Set;
028import java.util.TreeSet;
029import java.util.UUID;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.function.Function;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.Coprocessor;
036import org.apache.hadoop.hbase.CoprocessorEnvironment;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.ipc.RpcServer;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
042import org.apache.hadoop.hbase.util.SortedList;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Strings;
048
049/**
050 * Provides the common setup framework and runtime services for coprocessor invocation from HBase
051 * services.
052 * @param <C> type of specific coprocessor this host will handle
053 * @param <E> type of specific coprocessor environment this host requires. provides
054 */
055@InterfaceAudience.Private
056public abstract class CoprocessorHost<C extends Coprocessor, E extends CoprocessorEnvironment<C>> {
057  public static final String REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.region.classes";
058  public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
059    "hbase.coprocessor.regionserver.classes";
060  public static final String USER_REGION_COPROCESSOR_CONF_KEY =
061    "hbase.coprocessor.user.region.classes";
062  public static final String MASTER_COPROCESSOR_CONF_KEY = "hbase.coprocessor.master.classes";
063  public static final String CLIENT_META_COPROCESSOR_CONF_KEY =
064    "hbase.coprocessor.clientmeta.classes";
065  public static final String WAL_COPROCESSOR_CONF_KEY = "hbase.coprocessor.wal.classes";
066  public static final String RPC_COPROCESSOR_CONF_KEY = "hbase.coprocessor.rpc.classes";
067  public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
068  public static final boolean DEFAULT_ABORT_ON_ERROR = true;
069  public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
070  public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
071  public static final String USER_COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.user.enabled";
072  public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
073  public static final String SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR =
074    "hbase.skip.load.duplicate.table.coprocessor";
075  public static final boolean DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR = false;
076
077  private static final Logger LOG = LoggerFactory.getLogger(CoprocessorHost.class);
078  protected Abortable abortable;
079  /** Ordered set of loaded coprocessors with lock */
080  protected final SortedList<E> coprocEnvironments =
081    new SortedList<>(new EnvironmentPriorityComparator());
082  protected Configuration conf;
083  // unique file prefix to use for local copies of jars when classloading
084  protected String pathPrefix;
085  protected AtomicInteger loadSequence = new AtomicInteger();
086
087  public CoprocessorHost(Abortable abortable) {
088    this.abortable = abortable;
089    this.pathPrefix = UUID.randomUUID().toString();
090  }
091
092  /**
093   * Not to be confused with the per-object _coprocessors_ (above), coprocessorNames is static and
094   * stores the set of all coprocessors ever loaded by any thread in this JVM. It is strictly
095   * additive: coprocessors are added to coprocessorNames, by checkAndLoadInstance() but are never
096   * removed, since the intention is to preserve a history of all loaded coprocessors for diagnosis
097   * in case of server crash (HBASE-4014).
098   */
099  private static Set<String> coprocessorNames = Collections.synchronizedSet(new HashSet<String>());
100
101  public static Set<String> getLoadedCoprocessors() {
102    synchronized (coprocessorNames) {
103      return new HashSet(coprocessorNames);
104    }
105  }
106
107  /**
108   * Used to create a parameter to the HServerLoad constructor so that HServerLoad can provide
109   * information about the coprocessors loaded by this regionserver. (HBASE-4070: Improve region
110   * server metrics to report loaded coprocessors to master).
111   */
112  public Set<String> getCoprocessors() {
113    Set<String> returnValue = new TreeSet<>();
114    for (E e : coprocEnvironments) {
115      returnValue.add(e.getInstance().getClass().getSimpleName());
116    }
117    return returnValue;
118  }
119
120  /**
121   * Get the full class names of all loaded coprocessors. This method returns the complete class
122   * names including package information, which is useful for precise coprocessor identification and
123   * comparison.
124   */
125  public Set<String> getCoprocessorClassNames() {
126    Set<String> returnValue = new TreeSet<>();
127    for (E e : coprocEnvironments) {
128      returnValue.add(e.getInstance().getClass().getName());
129    }
130    return returnValue;
131  }
132
133  /**
134   * Load system coprocessors once only. Read the class names from configuration. Called by
135   * constructor.
136   */
137  protected void loadSystemCoprocessors(Configuration conf, String confKey) {
138    boolean coprocessorsEnabled =
139      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
140    if (!coprocessorsEnabled) {
141      return;
142    }
143
144    Class<?> implClass;
145
146    // load default coprocessors from configure file
147    String[] defaultCPClasses = conf.getStrings(confKey);
148    if (defaultCPClasses == null || defaultCPClasses.length == 0) return;
149
150    int currentSystemPriority = Coprocessor.PRIORITY_SYSTEM;
151    for (String className : defaultCPClasses) {
152      // After HBASE-23710 and HBASE-26714 when configuring for system coprocessor, we accept
153      // an optional format of className|priority|path
154      String[] classNameToken = className.split("\\|");
155      boolean hasPriorityOverride = false;
156      boolean hasPath = false;
157      className = classNameToken[0];
158      int overridePriority = Coprocessor.PRIORITY_SYSTEM;
159      Path path = null;
160      if (classNameToken.length > 1 && !Strings.isNullOrEmpty(classNameToken[1])) {
161        overridePriority = Integer.parseInt(classNameToken[1]);
162        hasPriorityOverride = true;
163      }
164      if (classNameToken.length > 2 && !Strings.isNullOrEmpty(classNameToken[2])) {
165        path = new Path(classNameToken[2].trim());
166        hasPath = true;
167      }
168      className = className.trim();
169      if (findCoprocessor(className) != null) {
170        // If already loaded will just continue
171        LOG.warn("Attempted duplicate loading of " + className + "; skipped");
172        continue;
173      }
174      ClassLoader cl = this.getClass().getClassLoader();
175      try {
176        // override the class loader if a path for the system coprocessor is provided.
177        if (hasPath) {
178          cl = CoprocessorClassLoader.getClassLoader(path, this.getClass().getClassLoader(),
179            pathPrefix, conf);
180        }
181        Thread.currentThread().setContextClassLoader(cl);
182        implClass = cl.loadClass(className);
183        int coprocPriority = hasPriorityOverride ? overridePriority : currentSystemPriority;
184        // Add coprocessors as we go to guard against case where a coprocessor is specified twice
185        // in the configuration
186        E env = checkAndLoadInstance(implClass, coprocPriority, conf);
187        if (env != null) {
188          this.coprocEnvironments.add(env);
189          LOG.info("System coprocessor {} loaded, priority={}.", className, coprocPriority);
190          if (!hasPriorityOverride) {
191            ++currentSystemPriority;
192          }
193        }
194      } catch (Throwable t) {
195        // We always abort if system coprocessors cannot be loaded
196        abortServer(className, t);
197      }
198    }
199  }
200
201  /**
202   * Load a coprocessor implementation into the host
203   * @param path      path to implementation jar
204   * @param className the main class name
205   * @param priority  chaining priority
206   * @param conf      configuration for coprocessor
207   * @throws java.io.IOException Exception
208   */
209  public E load(Path path, String className, int priority, Configuration conf) throws IOException {
210    String[] includedClassPrefixes = null;
211    if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
212      String prefixes = conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
213      includedClassPrefixes = prefixes.split(";");
214    }
215    return load(path, className, priority, conf, includedClassPrefixes);
216  }
217
218  /**
219   * Load a coprocessor implementation into the host
220   * @param path                  path to implementation jar
221   * @param className             the main class name
222   * @param priority              chaining priority
223   * @param conf                  configuration for coprocessor
224   * @param includedClassPrefixes class name prefixes to include
225   * @throws java.io.IOException Exception
226   */
227  public E load(Path path, String className, int priority, Configuration conf,
228    String[] includedClassPrefixes) throws IOException {
229    Class<?> implClass;
230    LOG.debug("Loading coprocessor class " + className + " with path " + path + " and priority "
231      + priority);
232
233    boolean skipLoadDuplicateCoprocessor = conf.getBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR,
234      DEFAULT_SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR);
235    if (skipLoadDuplicateCoprocessor && findCoprocessor(className) != null) {
236      // If already loaded will just continue
237      LOG.warn("Attempted duplicate loading of {}; skipped", className);
238      return null;
239    }
240
241    ClassLoader cl = null;
242    if (path == null) {
243      try {
244        implClass = getClass().getClassLoader().loadClass(className);
245      } catch (ClassNotFoundException e) {
246        throw new IOException("No jar path specified for " + className);
247      }
248    } else {
249      cl =
250        CoprocessorClassLoader.getClassLoader(path, getClass().getClassLoader(), pathPrefix, conf);
251      try {
252        implClass = ((CoprocessorClassLoader) cl).loadClass(className, includedClassPrefixes);
253      } catch (ClassNotFoundException e) {
254        throw new IOException("Cannot load external coprocessor class " + className, e);
255      }
256    }
257
258    // load custom code for coprocessor
259    Thread currentThread = Thread.currentThread();
260    ClassLoader hostClassLoader = currentThread.getContextClassLoader();
261    try {
262      // switch temporarily to the thread classloader for custom CP
263      currentThread.setContextClassLoader(cl);
264      E cpInstance = checkAndLoadInstance(implClass, priority, conf);
265      return cpInstance;
266    } finally {
267      // restore the fresh (host) classloader
268      currentThread.setContextClassLoader(hostClassLoader);
269    }
270  }
271
272  public void load(Class<? extends C> implClass, int priority, Configuration conf)
273    throws IOException {
274    E env = checkAndLoadInstance(implClass, priority, conf);
275    coprocEnvironments.add(env);
276  }
277
278  /**
279   * @param implClass Implementation class
280   * @param priority  priority
281   * @param conf      configuration
282   * @throws java.io.IOException Exception
283   */
284  public E checkAndLoadInstance(Class<?> implClass, int priority, Configuration conf)
285    throws IOException {
286    // create the instance
287    C impl;
288    try {
289      impl = checkAndGetInstance(implClass);
290      if (impl == null) {
291        LOG.error("Cannot load coprocessor " + implClass.getSimpleName());
292        return null;
293      }
294    } catch (InstantiationException | IllegalAccessException e) {
295      throw new IOException(e);
296    }
297    // create the environment
298    E env = createEnvironment(impl, priority, loadSequence.incrementAndGet(), conf);
299    assert env instanceof BaseEnvironment;
300    ((BaseEnvironment<C>) env).startup();
301    // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
302    // if server (master or regionserver) aborts.
303    coprocessorNames.add(implClass.getName());
304    return env;
305  }
306
307  /**
308   * Called when a new Coprocessor class is loaded
309   */
310  public abstract E createEnvironment(C instance, int priority, int sequence, Configuration conf);
311
312  /**
313   * Called when a new Coprocessor class needs to be loaded. Checks if type of the given class is
314   * what the corresponding host implementation expects. If it is of correct type, returns an
315   * instance of the coprocessor to be loaded. If not, returns null. If an exception occurs when
316   * trying to create instance of a coprocessor, it's passed up and eventually results into server
317   * aborting.
318   */
319  public abstract C checkAndGetInstance(Class<?> implClass)
320    throws InstantiationException, IllegalAccessException;
321
322  public void shutdown(E e) {
323    assert e instanceof BaseEnvironment;
324    if (LOG.isDebugEnabled()) {
325      LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
326    }
327    ((BaseEnvironment<C>) e).shutdown();
328  }
329
330  /**
331   * Find coprocessors by full class name or simple name.
332   */
333  public C findCoprocessor(String className) {
334    for (E env : coprocEnvironments) {
335      if (
336        env.getInstance().getClass().getName().equals(className)
337          || env.getInstance().getClass().getSimpleName().equals(className)
338      ) {
339        return env.getInstance();
340      }
341    }
342    return null;
343  }
344
345  public <T extends C> T findCoprocessor(Class<T> cls) {
346    for (E env : coprocEnvironments) {
347      if (cls.isAssignableFrom(env.getInstance().getClass())) {
348        return (T) env.getInstance();
349      }
350    }
351    return null;
352  }
353
354  /**
355   * Find list of coprocessors that extend/implement the given class/interface
356   * @param cls the class/interface to look for
357   * @return the list of coprocessors, or null if not found
358   */
359  public <T extends C> List<T> findCoprocessors(Class<T> cls) {
360    ArrayList<T> ret = new ArrayList<>();
361
362    for (E env : coprocEnvironments) {
363      C cp = env.getInstance();
364
365      if (cp != null) {
366        if (cls.isAssignableFrom(cp.getClass())) {
367          ret.add((T) cp);
368        }
369      }
370    }
371    return ret;
372  }
373
374  /**
375   * Find a coprocessor environment by class name
376   * @param className the class name
377   * @return the coprocessor, or null if not found
378   */
379  public E findCoprocessorEnvironment(String className) {
380    for (E env : coprocEnvironments) {
381      if (
382        env.getInstance().getClass().getName().equals(className)
383          || env.getInstance().getClass().getSimpleName().equals(className)
384      ) {
385        return env;
386      }
387    }
388    return null;
389  }
390
391  /**
392   * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
393   * jar files.
394   * @return A set of ClassLoader instances
395   */
396  Set<ClassLoader> getExternalClassLoaders() {
397    Set<ClassLoader> externalClassLoaders = new HashSet<>();
398    final ClassLoader systemClassLoader = this.getClass().getClassLoader();
399    for (E env : coprocEnvironments) {
400      ClassLoader cl = env.getInstance().getClass().getClassLoader();
401      if (cl != systemClassLoader) {
402        // do not include system classloader
403        externalClassLoaders.add(cl);
404      }
405    }
406    return externalClassLoaders;
407  }
408
409  /**
410   * Environment priority comparator. Coprocessors are chained in sorted order.
411   */
412  static class EnvironmentPriorityComparator implements Comparator<CoprocessorEnvironment> {
413    @Override
414    public int compare(final CoprocessorEnvironment env1, final CoprocessorEnvironment env2) {
415      if (env1.getPriority() < env2.getPriority()) {
416        return -1;
417      } else if (env1.getPriority() > env2.getPriority()) {
418        return 1;
419      }
420      if (env1.getLoadSequence() < env2.getLoadSequence()) {
421        return -1;
422      } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
423        return 1;
424      }
425      return 0;
426    }
427  }
428
429  protected void abortServer(final E environment, final Throwable e) {
430    abortServer(environment.getInstance().getClass().getName(), e);
431  }
432
433  protected void abortServer(final String coprocessorName, final Throwable e) {
434    String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
435    LOG.error(message, e);
436    if (abortable != null) {
437      abortable.abort(message, e);
438    } else {
439      LOG.warn("No available Abortable, process was not aborted");
440    }
441  }
442
443  /**
444   * This is used by coprocessor hooks which are declared to throw IOException (or its subtypes).
445   * For such hooks, we should handle throwable objects depending on the Throwable's type. Those
446   * which are instances of IOException should be passed on to the client. This is in conformance
447   * with the HBase idiom regarding IOException: that it represents a circumstance that should be
448   * passed along to the client for its own handling. For example, a coprocessor that implements
449   * access controls would throw a subclass of IOException, such as AccessDeniedException, in its
450   * preGet() method to prevent an unauthorized client's performing a Get on a particular table.
451   * @param env Coprocessor Environment
452   * @param e   Throwable object thrown by coprocessor.
453   * @exception IOException Exception
454   */
455  // Note to devs: Class comments of all observers ({@link MasterObserver}, {@link WALObserver},
456  // etc) mention this nuance of our exception handling so that coprocessor can throw appropriate
457  // exceptions depending on situation. If any changes are made to this logic, make sure to
458  // update all classes' comments.
459  protected void handleCoprocessorThrowable(final E env, final Throwable e) throws IOException {
460    if (e instanceof IOException) {
461      throw (IOException) e;
462    }
463    // If we got here, e is not an IOException. A loaded coprocessor has a
464    // fatal bug, and the server (master or regionserver) should remove the
465    // faulty coprocessor from its set of active coprocessors. Setting
466    // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
467    // which may be useful in development and testing environments where
468    // 'failing fast' for error analysis is desired.
469    if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
470      // server is configured to abort.
471      abortServer(env, e);
472    } else {
473      // If available, pull a table name out of the environment
474      if (env instanceof RegionCoprocessorEnvironment) {
475        String tableName =
476          ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable().getNameAsString();
477        LOG.error("Removing coprocessor '" + env.toString() + "' from table '" + tableName + "'",
478          e);
479      } else {
480        LOG.error("Removing coprocessor '" + env.toString() + "' from " + "environment", e);
481      }
482
483      coprocEnvironments.remove(env);
484      try {
485        shutdown(env);
486      } catch (Exception x) {
487        LOG.error("Uncaught exception when shutting down coprocessor '" + env.toString() + "'", x);
488      }
489      throw new DoNotRetryIOException("Coprocessor: '" + env.toString() + "' threw: '" + e
490        + "' and has been removed from the active " + "coprocessor set.", e);
491    }
492  }
493
494  /**
495   * Implementations defined function to get an observer of type {@code O} from a coprocessor of
496   * type {@code C}. Concrete implementations of CoprocessorHost define one getter for each observer
497   * they can handle. For e.g. RegionCoprocessorHost will use 3 getters, one for each of
498   * RegionObserver, EndpointObserver and BulkLoadObserver. These getters are used by
499   * {@code ObserverOperation} to get appropriate observer from the coprocessor.
500   */
501  @FunctionalInterface
502  public interface ObserverGetter<C, O> extends Function<C, Optional<O>> {
503  }
504
505  private abstract class ObserverOperation<O> extends ObserverContextImpl<E> {
506    ObserverGetter<C, O> observerGetter;
507
508    ObserverOperation(ObserverGetter<C, O> observerGetter) {
509      this(observerGetter, (ObserverRpcCallContext) null);
510    }
511
512    ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
513      this(observerGetter, createRpcCallContext(user), false);
514    }
515
516    ObserverOperation(ObserverGetter<C, O> observerGetter, ObserverRpcCallContext rpcCallContext) {
517      this(observerGetter, rpcCallContext, false);
518    }
519
520    ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
521      this(observerGetter, (ObserverRpcCallContext) null, bypassable);
522    }
523
524    ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
525      this(observerGetter, createRpcCallContext(user), bypassable);
526    }
527
528    ObserverOperation(ObserverGetter<C, O> observerGetter, ObserverRpcCallContext rpcCallContext,
529      boolean bypassable) {
530      super(rpcCallContext != null ? rpcCallContext : createRpcCallContext(), bypassable);
531      this.observerGetter = observerGetter;
532    }
533
534    private static ObserverRpcCallContext createRpcCallContext() {
535      return RpcServer.getRequestUser()
536        .map(user -> new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes()))
537        .orElse(null);
538    }
539
540    private static ObserverRpcCallContext createRpcCallContext(User user) {
541      if (user == null) {
542        return null;
543      } else {
544        return new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes());
545      }
546    }
547
548    abstract void callObserver() throws IOException;
549
550    protected void postEnvCall() {
551    }
552  }
553
554  // Can't derive ObserverOperation from ObserverOperationWithResult (R = Void) because then all
555  // ObserverCaller implementations will have to have a return statement.
556  // O = observer, E = environment, C = coprocessor, R=result type
557  public abstract class ObserverOperationWithoutResult<O> extends ObserverOperation<O> {
558    protected abstract void call(O observer) throws IOException;
559
560    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter) {
561      super(observerGetter);
562    }
563
564    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user) {
565      super(observerGetter, user);
566    }
567
568    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter,
569      ObserverRpcCallContext rpcCallContext) {
570      super(observerGetter, rpcCallContext);
571    }
572
573    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
574      boolean bypassable) {
575      super(observerGetter, user, bypassable);
576    }
577
578    public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter,
579      ObserverRpcCallContext rpcCallContext, boolean bypassable) {
580      super(observerGetter, rpcCallContext, bypassable);
581    }
582
583    /**
584     * In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
585     * has BulkLoadObserver, RegionObserver, etc), some implementations may not need all observers,
586     * in which case they will return null for that observer's getter. We simply ignore such cases.
587     */
588    @Override
589    void callObserver() throws IOException {
590      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
591      if (observer.isPresent()) {
592        call(observer.get());
593      }
594    }
595  }
596
597  public abstract class ObserverOperationWithResult<O, R> extends ObserverOperation<O> {
598    protected abstract R call(O observer) throws IOException;
599
600    private R result;
601
602    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result) {
603      this(observerGetter, result, false);
604    }
605
606    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
607      boolean bypassable) {
608      this(observerGetter, result, (ObserverRpcCallContext) null, bypassable);
609    }
610
611    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) {
612      this(observerGetter, result, user, false);
613    }
614
615    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
616      ObserverRpcCallContext rpcCallContext) {
617      this(observerGetter, result, rpcCallContext, false);
618    }
619
620    public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
621      boolean bypassable) {
622      super(observerGetter, user, bypassable);
623      this.result = result;
624    }
625
626    private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
627      ObserverRpcCallContext rpcCallContext, boolean bypassable) {
628      super(observerGetter, rpcCallContext, bypassable);
629      this.result = result;
630    }
631
632    protected R getResult() {
633      return this.result;
634    }
635
636    @Override
637    void callObserver() throws IOException {
638      Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
639      if (observer.isPresent()) {
640        result = call(observer.get());
641      }
642    }
643  }
644
645  //////////////////////////////////////////////////////////////////////////////////////////
646  // Functions to execute observer hooks and handle results (if any)
647  //////////////////////////////////////////////////////////////////////////////////////////
648
649  /**
650   * Do not call with an observerOperation that is null! Have the caller check.
651   */
652  protected <O, R> R execOperationWithResult(
653    final ObserverOperationWithResult<O, R> observerOperation) throws IOException {
654    boolean bypass = execOperation(observerOperation);
655    R result = observerOperation.getResult();
656    return bypass == observerOperation.isBypassable() ? result : null;
657  }
658
659  /**
660   * @return True if we are to bypass (Can only be <code>true</code> if
661   *         ObserverOperation#isBypassable().
662   */
663  protected <O> boolean execOperation(final ObserverOperation<O> observerOperation)
664    throws IOException {
665    boolean bypass = false;
666    if (observerOperation == null) {
667      return bypass;
668    }
669    List<E> envs = coprocEnvironments.get();
670    for (E env : envs) {
671      observerOperation.prepare(env);
672      Thread currentThread = Thread.currentThread();
673      ClassLoader cl = currentThread.getContextClassLoader();
674      try {
675        currentThread.setContextClassLoader(env.getClassLoader());
676        observerOperation.callObserver();
677      } catch (Throwable e) {
678        handleCoprocessorThrowable(env, e);
679      } finally {
680        currentThread.setContextClassLoader(cl);
681      }
682      // Internal to shouldBypass, it checks if obeserverOperation#isBypassable().
683      bypass |= observerOperation.shouldBypass();
684      observerOperation.postEnvCall();
685      if (bypass) {
686        // If CP says bypass, skip out w/o calling any following CPs; they might ruin our response.
687        // In hbase1, this used to be called 'complete'. In hbase2, we unite bypass and 'complete'.
688        break;
689      }
690    }
691    return bypass;
692  }
693
694  /**
695   * Coprocessor classes can be configured in any order, based on that priority is set and chained
696   * in a sorted order. Should be used preStop*() hooks i.e. when master/regionserver is going down.
697   * This function first calls coprocessor methods (using ObserverOperation.call()) and then
698   * shutdowns the environment in postEnvCall(). <br>
699   * Need to execute all coprocessor methods first then postEnvCall(), otherwise some coprocessors
700   * may remain shutdown if any exception occurs during next coprocessor execution which prevent
701   * master/regionserver stop or cluster shutdown. (Refer:
702   * <a href="https://issues.apache.org/jira/browse/HBASE-16663">HBASE-16663</a>
703   * @return true if bypaas coprocessor execution, false if not.
704   */
705  protected <O> boolean execShutdown(final ObserverOperation<O> observerOperation)
706    throws IOException {
707    if (observerOperation == null) return false;
708    boolean bypass = false;
709    List<E> envs = coprocEnvironments.get();
710    // Iterate the coprocessors and execute ObserverOperation's call()
711    for (E env : envs) {
712      observerOperation.prepare(env);
713      Thread currentThread = Thread.currentThread();
714      ClassLoader cl = currentThread.getContextClassLoader();
715      try {
716        currentThread.setContextClassLoader(env.getClassLoader());
717        observerOperation.callObserver();
718      } catch (Throwable e) {
719        handleCoprocessorThrowable(env, e);
720      } finally {
721        currentThread.setContextClassLoader(cl);
722      }
723      bypass |= observerOperation.shouldBypass();
724    }
725
726    // Iterate the coprocessors and execute ObserverOperation's postEnvCall()
727    for (E env : envs) {
728      observerOperation.prepare(env);
729      observerOperation.postEnvCall();
730    }
731    return bypass;
732  }
733}