View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentSkipListSet;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Abortable;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.HTableWrapper;
50  import org.apache.hadoop.hbase.client.Table;
51  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
52  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
53  import org.apache.hadoop.hbase.util.VersionInfo;
54  
55  /**
56   * Provides the common setup framework and runtime services for coprocessor
57   * invocation from HBase services.
58   * @param <E> the specific environment extension that a concrete implementation
59   * provides
60   */
61  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
62  @InterfaceStability.Evolving
63  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
64    public static final String REGION_COPROCESSOR_CONF_KEY =
65        "hbase.coprocessor.region.classes";
66    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
67        "hbase.coprocessor.regionserver.classes";
68    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
69        "hbase.coprocessor.user.region.classes";
70    public static final String MASTER_COPROCESSOR_CONF_KEY =
71        "hbase.coprocessor.master.classes";
72    public static final String WAL_COPROCESSOR_CONF_KEY =
73      "hbase.coprocessor.wal.classes";
74    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
75    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
76    public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
77    public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
78    public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
79      "hbase.coprocessor.user.enabled";
80    public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
81  
82    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
83    protected Abortable abortable;
84    /** Ordered set of loaded coprocessors with lock */
85    protected SortedSet<E> coprocessors =
86        new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
87    protected Configuration conf;
88    // unique file prefix to use for local copies of jars when classloading
89    protected String pathPrefix;
90    protected AtomicInteger loadSequence = new AtomicInteger();
91  
92    public CoprocessorHost(Abortable abortable) {
93      this.abortable = abortable;
94      this.pathPrefix = UUID.randomUUID().toString();
95    }
96  
97    /**
98     * Not to be confused with the per-object _coprocessors_ (above),
99     * coprocessorNames is static and stores the set of all coprocessors ever
100    * loaded by any thread in this JVM. It is strictly additive: coprocessors are
101    * added to coprocessorNames, by loadInstance() but are never removed, since
102    * the intention is to preserve a history of all loaded coprocessors for
103    * diagnosis in case of server crash (HBASE-4014).
104    */
105   private static Set<String> coprocessorNames =
106       Collections.synchronizedSet(new HashSet<String>());
107   public static Set<String> getLoadedCoprocessors() {
108       return coprocessorNames;
109   }
110 
111   /**
112    * Used to create a parameter to the HServerLoad constructor so that
113    * HServerLoad can provide information about the coprocessors loaded by this
114    * regionserver.
115    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
116    * to master).
117    */
118   public Set<String> getCoprocessors() {
119     Set<String> returnValue = new TreeSet<String>();
120     for(CoprocessorEnvironment e: coprocessors) {
121       returnValue.add(e.getInstance().getClass().getSimpleName());
122     }
123     return returnValue;
124   }
125 
126   /**
127    * Load system coprocessors. Read the class names from configuration.
128    * Called by constructor.
129    */
130   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
131     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
132       DEFAULT_COPROCESSORS_ENABLED);
133     if (!coprocessorsEnabled) {
134       return;
135     }
136 
137     Class<?> implClass = null;
138 
139     // load default coprocessors from configure file
140     String[] defaultCPClasses = conf.getStrings(confKey);
141     if (defaultCPClasses == null || defaultCPClasses.length == 0)
142       return;
143 
144     int priority = Coprocessor.PRIORITY_SYSTEM;
145     List<E> configured = new ArrayList<E>();
146     for (String className : defaultCPClasses) {
147       className = className.trim();
148       if (findCoprocessor(className) != null) {
149         continue;
150       }
151       ClassLoader cl = this.getClass().getClassLoader();
152       Thread.currentThread().setContextClassLoader(cl);
153       try {
154         implClass = cl.loadClass(className);
155         configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
156         LOG.info("System coprocessor " + className + " was loaded " +
157             "successfully with priority (" + priority++ + ").");
158       } catch (Throwable t) {
159         // We always abort if system coprocessors cannot be loaded
160         abortServer(className, t);
161       }
162     }
163 
164     // add entire set to the collection for COW efficiency
165     coprocessors.addAll(configured);
166   }
167 
168   /**
169    * Load a coprocessor implementation into the host
170    * @param path path to implementation jar
171    * @param className the main class name
172    * @param priority chaining priority
173    * @param conf configuration for coprocessor
174    * @throws java.io.IOException Exception
175    */
176   public E load(Path path, String className, int priority,
177       Configuration conf) throws IOException {
178     Class<?> implClass = null;
179     LOG.debug("Loading coprocessor class " + className + " with path " +
180         path + " and priority " + priority);
181 
182     ClassLoader cl = null;
183     if (path == null) {
184       try {
185         implClass = getClass().getClassLoader().loadClass(className);
186       } catch (ClassNotFoundException e) {
187         throw new IOException("No jar path specified for " + className);
188       }
189     } else {
190       cl = CoprocessorClassLoader.getClassLoader(
191         path, getClass().getClassLoader(), pathPrefix, conf);
192       try {
193         implClass = cl.loadClass(className);
194       } catch (ClassNotFoundException e) {
195         throw new IOException("Cannot load external coprocessor class " + className, e);
196       }
197     }
198 
199     //load custom code for coprocessor
200     Thread currentThread = Thread.currentThread();
201     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
202     try{
203       // switch temporarily to the thread classloader for custom CP
204       currentThread.setContextClassLoader(cl);
205       E cpInstance = loadInstance(implClass, priority, conf);
206       return cpInstance;
207     } finally {
208       // restore the fresh (host) classloader
209       currentThread.setContextClassLoader(hostClassLoader);
210     }
211   }
212 
213   /**
214    * @param implClass Implementation class
215    * @param priority priority
216    * @param conf configuration
217    * @throws java.io.IOException Exception
218    */
219   public void load(Class<?> implClass, int priority, Configuration conf)
220       throws IOException {
221     E env = loadInstance(implClass, priority, conf);
222     coprocessors.add(env);
223   }
224 
225   /**
226    * @param implClass Implementation class
227    * @param priority priority
228    * @param conf configuration
229    * @throws java.io.IOException Exception
230    */
231   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
232       throws IOException {
233     if (!Coprocessor.class.isAssignableFrom(implClass)) {
234       throw new IOException("Configured class " + implClass.getName() + " must implement "
235           + Coprocessor.class.getName() + " interface ");
236     }
237 
238     // create the instance
239     Coprocessor impl;
240     Object o = null;
241     try {
242       o = implClass.newInstance();
243       impl = (Coprocessor)o;
244     } catch (InstantiationException e) {
245       throw new IOException(e);
246     } catch (IllegalAccessException e) {
247       throw new IOException(e);
248     }
249     // create the environment
250     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
251     if (env instanceof Environment) {
252       ((Environment)env).startup();
253     }
254     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
255     // if server (master or regionserver) aborts.
256     coprocessorNames.add(implClass.getName());
257     return env;
258   }
259 
260   /**
261    * Called when a new Coprocessor class is loaded
262    */
263   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
264       int priority, int sequence, Configuration conf);
265 
266   public void shutdown(CoprocessorEnvironment e) {
267     if (e instanceof Environment) {
268       if (LOG.isDebugEnabled()) {
269         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
270       }
271       ((Environment)e).shutdown();
272     } else {
273       LOG.warn("Shutdown called on unknown environment: "+
274           e.getClass().getName());
275     }
276   }
277 
278   /**
279    * Find a coprocessor implementation by class name
280    * @param className the class name
281    * @return the coprocessor, or null if not found
282    */
283   public Coprocessor findCoprocessor(String className) {
284     for (E env: coprocessors) {
285       if (env.getInstance().getClass().getName().equals(className) ||
286           env.getInstance().getClass().getSimpleName().equals(className)) {
287         return env.getInstance();
288       }
289     }
290     return null;
291   }
292 
293   /**
294    * Find list of coprocessors that extend/implement the given class/interface
295    * @param cls the class/interface to look for
296    * @return the list of coprocessors, or null if not found
297    */
298   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
299     ArrayList<T> ret = new ArrayList<T>();
300 
301     for (E env: coprocessors) {
302       Coprocessor cp = env.getInstance();
303 
304       if(cp != null) {
305         if (cls.isAssignableFrom(cp.getClass())) {
306           ret.add((T)cp);
307         }
308       }
309     }
310     return ret;
311   }
312 
313   /**
314    * Find a coprocessor environment by class name
315    * @param className the class name
316    * @return the coprocessor, or null if not found
317    */
318   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
319     for (E env: coprocessors) {
320       if (env.getInstance().getClass().getName().equals(className) ||
321           env.getInstance().getClass().getSimpleName().equals(className)) {
322         return env;
323       }
324     }
325     return null;
326   }
327 
328   /**
329    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
330    * jar files.
331    * @return A set of ClassLoader instances
332    */
333   Set<ClassLoader> getExternalClassLoaders() {
334     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
335     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
336     for (E env : coprocessors) {
337       ClassLoader cl = env.getInstance().getClass().getClassLoader();
338       if (cl != systemClassLoader){
339         //do not include system classloader
340         externalClassLoaders.add(cl);
341       }
342     }
343     return externalClassLoaders;
344   }
345 
346   /**
347    * Environment priority comparator.
348    * Coprocessors are chained in sorted order.
349    */
350   static class EnvironmentPriorityComparator
351       implements Comparator<CoprocessorEnvironment> {
352     public int compare(final CoprocessorEnvironment env1,
353         final CoprocessorEnvironment env2) {
354       if (env1.getPriority() < env2.getPriority()) {
355         return -1;
356       } else if (env1.getPriority() > env2.getPriority()) {
357         return 1;
358       }
359       if (env1.getLoadSequence() < env2.getLoadSequence()) {
360         return -1;
361       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
362         return 1;
363       }
364       return 0;
365     }
366   }
367 
368   /**
369    * Encapsulation of the environment of each coprocessor
370    */
371   public static class Environment implements CoprocessorEnvironment {
372 
373     /** The coprocessor */
374     public Coprocessor impl;
375     /** Chaining priority */
376     protected int priority = Coprocessor.PRIORITY_USER;
377     /** Current coprocessor state */
378     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
379     /** Accounting for tables opened by the coprocessor */
380     protected List<Table> openTables =
381       Collections.synchronizedList(new ArrayList<Table>());
382     private int seq;
383     private Configuration conf;
384     private ClassLoader classLoader;
385 
386     /**
387      * Constructor
388      * @param impl the coprocessor instance
389      * @param priority chaining priority
390      */
391     public Environment(final Coprocessor impl, final int priority,
392         final int seq, final Configuration conf) {
393       this.impl = impl;
394       this.classLoader = impl.getClass().getClassLoader();
395       this.priority = priority;
396       this.state = Coprocessor.State.INSTALLED;
397       this.seq = seq;
398       this.conf = conf;
399     }
400 
401     /** Initialize the environment */
402     public void startup() throws IOException {
403       if (state == Coprocessor.State.INSTALLED ||
404           state == Coprocessor.State.STOPPED) {
405         state = Coprocessor.State.STARTING;
406         Thread currentThread = Thread.currentThread();
407         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
408         try {
409           currentThread.setContextClassLoader(this.getClassLoader());
410           impl.start(this);
411           state = Coprocessor.State.ACTIVE;
412         } finally {
413           currentThread.setContextClassLoader(hostClassLoader);
414         }
415       } else {
416         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
417             " because not inactive (state="+state.toString()+")");
418       }
419     }
420 
421     /** Clean up the environment */
422     protected void shutdown() {
423       if (state == Coprocessor.State.ACTIVE) {
424         state = Coprocessor.State.STOPPING;
425         Thread currentThread = Thread.currentThread();
426         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
427         try {
428           currentThread.setContextClassLoader(this.getClassLoader());
429           impl.stop(this);
430           state = Coprocessor.State.STOPPED;
431         } catch (IOException ioe) {
432           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
433         } finally {
434           currentThread.setContextClassLoader(hostClassLoader);
435         }
436       } else {
437         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
438             " because not active (state="+state.toString()+")");
439       }
440       // clean up any table references
441       for (Table table: openTables) {
442         try {
443           ((HTableWrapper)table).internalClose();
444         } catch (IOException e) {
445           // nothing can be done here
446           LOG.warn("Failed to close " +
447               table.getName(), e);
448         }
449       }
450     }
451 
452     @Override
453     public Coprocessor getInstance() {
454       return impl;
455     }
456 
457     @Override
458     public ClassLoader getClassLoader() {
459       return classLoader;
460     }
461 
462     @Override
463     public int getPriority() {
464       return priority;
465     }
466 
467     @Override
468     public int getLoadSequence() {
469       return seq;
470     }
471 
472     /** @return the coprocessor environment version */
473     @Override
474     public int getVersion() {
475       return Coprocessor.VERSION;
476     }
477 
478     /** @return the HBase release */
479     @Override
480     public String getHBaseVersion() {
481       return VersionInfo.getVersion();
482     }
483 
484     @Override
485     public Configuration getConfiguration() {
486       return conf;
487     }
488 
489     /**
490      * Open a table from within the Coprocessor environment
491      * @param tableName the table name
492      * @return an interface for manipulating the table
493      * @exception java.io.IOException Exception
494      */
495     @Override
496     public Table getTable(TableName tableName) throws IOException {
497       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
498     }
499 
500     /**
501      * Open a table from within the Coprocessor environment
502      * @param tableName the table name
503      * @return an interface for manipulating the table
504      * @exception java.io.IOException Exception
505      */
506     @Override
507     public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
508       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
509     }
510   }
511 
512   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
513     abortServer(environment.getInstance().getClass().getName(), e);
514   }
515 
516   protected void abortServer(final String coprocessorName, final Throwable e) {
517     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
518     LOG.error(message, e);
519     if (abortable != null) {
520       abortable.abort(message, e);
521     } else {
522       LOG.warn("No available Abortable, process was not aborted");
523     }
524   }
525 
526   /**
527    * This is used by coprocessor hooks which are declared to throw IOException
528    * (or its subtypes). For such hooks, we should handle throwable objects
529    * depending on the Throwable's type. Those which are instances of
530    * IOException should be passed on to the client. This is in conformance with
531    * the HBase idiom regarding IOException: that it represents a circumstance
532    * that should be passed along to the client for its own handling. For
533    * example, a coprocessor that implements access controls would throw a
534    * subclass of IOException, such as AccessDeniedException, in its preGet()
535    * method to prevent an unauthorized client's performing a Get on a particular
536    * table.
537    * @param env Coprocessor Environment
538    * @param e Throwable object thrown by coprocessor.
539    * @exception IOException Exception
540    */
541   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
542       throws IOException {
543     if (e instanceof IOException) {
544       throw (IOException)e;
545     }
546     // If we got here, e is not an IOException. A loaded coprocessor has a
547     // fatal bug, and the server (master or regionserver) should remove the
548     // faulty coprocessor from its set of active coprocessors. Setting
549     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
550     // which may be useful in development and testing environments where
551     // 'failing fast' for error analysis is desired.
552     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
553       // server is configured to abort.
554       abortServer(env, e);
555     } else {
556       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
557           "environment because it threw:  " + e,e);
558       coprocessors.remove(env);
559       try {
560         shutdown(env);
561       } catch (Exception x) {
562         LOG.error("Uncaught exception when shutting down coprocessor '"
563             + env.toString() + "'", x);
564       }
565       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
566           "' threw: '" + e + "' and has been removed from the active " +
567           "coprocessor set.", e);
568     }
569   }
570 
571   /**
572    * Used to gracefully handle fallback to deprecated methods when we
573    * evolve coprocessor APIs.
574    *
575    * When a particular Coprocessor API is updated to change methods, hosts can support fallback
576    * to the deprecated API by using this method to determine if an instance implements the new API.
577    * In the event that said support is partial, then in the face of a runtime issue that prevents
578    * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
579    *
580    * For examples of this in action, see the implementation of
581    * <ul>
582    *   <li>{@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
583    *   <li>{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost}
584    * </ul>
585    *
586    * @param clazz Coprocessor you wish to evaluate
587    * @param methodName the name of the non-deprecated method version
588    * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
589    *     declared.
590    */
591   @InterfaceAudience.Private
592   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
593       final String methodName, final Class<?>... parameterTypes) {
594     boolean useLegacy;
595     // Use reflection to see if they implement the non-deprecated version
596     try {
597       clazz.getDeclaredMethod(methodName, parameterTypes);
598       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
599           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
600       useLegacy = false;
601     } catch (NoSuchMethodException exception) {
602       useLegacy = true;
603     } catch (SecurityException exception) {
604       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
605           "' requires legacy support; assuming it does. If you get later errors about legacy " +
606           "coprocessor use, consider updating your security policy to allow access to the package" +
607           " and declared members of your implementation.");
608       LOG.debug("Details of Security Manager rejection.", exception);
609       useLegacy = true;
610     }
611     return useLegacy;
612   }
613 
614   /**
615    * Used to limit legacy handling to once per Coprocessor class per classloader.
616    */
617   private static final Set<Class<? extends Coprocessor>> legacyWarning =
618       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
619           new Comparator<Class<? extends Coprocessor>>() {
620             @Override
621             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
622               if (c1.equals(c2)) {
623                 return 0;
624               }
625               return c1.getName().compareTo(c2.getName());
626             }
627           });
628 
629   /**
630    * limits the amount of logging to once per coprocessor class.
631    * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
632    * prevents properly supporting the legacy version of a coprocessor API.
633    * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
634    */
635   @InterfaceAudience.Private
636   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
637     if(legacyWarning.add(clazz)) {
638       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
639           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
640           "'. Details of the problem: " + message);
641     }
642   }
643 }