View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.TreeSet;
30  import java.util.UUID;
31  import java.util.concurrent.ConcurrentSkipListSet;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.Abortable;
42  import org.apache.hadoop.hbase.Coprocessor;
43  import org.apache.hadoop.hbase.CoprocessorEnvironment;
44  import org.apache.hadoop.hbase.DoNotRetryIOException;
45  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.HTable;
48  import org.apache.hadoop.hbase.client.HTableWrapper;
49  import org.apache.hadoop.hbase.client.Table;
50  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
51  import org.apache.hadoop.hbase.util.SortedList;
52  import org.apache.hadoop.hbase.util.VersionInfo;
53  
54  /**
55   * Provides the common setup framework and runtime services for coprocessor
56   * invocation from HBase services.
57   * @param <E> the specific environment extension that a concrete implementation
58   * provides
59   */
60  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
61  @InterfaceStability.Evolving
62  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
63    public static final String REGION_COPROCESSOR_CONF_KEY =
64        "hbase.coprocessor.region.classes";
65    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
66        "hbase.coprocessor.regionserver.classes";
67    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
68        "hbase.coprocessor.user.region.classes";
69    public static final String MASTER_COPROCESSOR_CONF_KEY =
70        "hbase.coprocessor.master.classes";
71    public static final String WAL_COPROCESSOR_CONF_KEY =
72      "hbase.coprocessor.wal.classes";
73    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
74    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
75    public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
76    public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
77    public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
78      "hbase.coprocessor.user.enabled";
79    public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
80  
81    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
82    protected Abortable abortable;
83    /** Ordered set of loaded coprocessors with lock */
84    protected SortedList<E> coprocessors =
85        new SortedList<E>(new EnvironmentPriorityComparator());
86    protected Configuration conf;
87    // unique file prefix to use for local copies of jars when classloading
88    protected String pathPrefix;
89    protected AtomicInteger loadSequence = new AtomicInteger();
90  
91    public CoprocessorHost(Abortable abortable) {
92      this.abortable = abortable;
93      this.pathPrefix = UUID.randomUUID().toString();
94    }
95  
96    /**
97     * Not to be confused with the per-object _coprocessors_ (above),
98     * coprocessorNames is static and stores the set of all coprocessors ever
99     * loaded by any thread in this JVM. It is strictly additive: coprocessors are
100    * added to coprocessorNames, by loadInstance() but are never removed, since
101    * the intention is to preserve a history of all loaded coprocessors for
102    * diagnosis in case of server crash (HBASE-4014).
103    */
104   private static Set<String> coprocessorNames =
105       Collections.synchronizedSet(new HashSet<String>());
106 
107   public static Set<String> getLoadedCoprocessors() {
108     synchronized (coprocessorNames) {
109       return new HashSet(coprocessorNames);
110     }
111   }
112 
113   /**
114    * Used to create a parameter to the HServerLoad constructor so that
115    * HServerLoad can provide information about the coprocessors loaded by this
116    * regionserver.
117    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
118    * to master).
119    */
120   public Set<String> getCoprocessors() {
121     Set<String> returnValue = new TreeSet<String>();
122     for (CoprocessorEnvironment e: coprocessors) {
123       returnValue.add(e.getInstance().getClass().getSimpleName());
124     }
125     return returnValue;
126   }
127 
128   /**
129    * Load system coprocessors once only. Read the class names from configuration.
130    * Called by constructor.
131    */
132   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
133     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
134       DEFAULT_COPROCESSORS_ENABLED);
135     if (!coprocessorsEnabled) {
136       return;
137     }
138 
139     Class<?> implClass = null;
140 
141     // load default coprocessors from configure file
142     String[] defaultCPClasses = conf.getStrings(confKey);
143     if (defaultCPClasses == null || defaultCPClasses.length == 0)
144       return;
145 
146     int priority = Coprocessor.PRIORITY_SYSTEM;
147     for (String className : defaultCPClasses) {
148       className = className.trim();
149       if (findCoprocessor(className) != null) {
150         // If already loaded will just continue
151         LOG.warn("Attempted duplicate loading of " + className + "; skipped");
152         continue;
153       }
154       ClassLoader cl = this.getClass().getClassLoader();
155       Thread.currentThread().setContextClassLoader(cl);
156       try {
157         implClass = cl.loadClass(className);
158         // Add coprocessors as we go to guard against case where a coprocessor is specified twice
159         // in the configuration
160         this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
161         LOG.info("System coprocessor " + className + " was loaded " +
162             "successfully with priority (" + priority++ + ").");
163       } catch (Throwable t) {
164         // We always abort if system coprocessors cannot be loaded
165         abortServer(className, t);
166       }
167     }
168   }
169 
170   /**
171    * Load a coprocessor implementation into the host
172    * @param path path to implementation jar
173    * @param className the main class name
174    * @param priority chaining priority
175    * @param conf configuration for coprocessor
176    * @throws java.io.IOException Exception
177    */
178   public E load(Path path, String className, int priority,
179       Configuration conf) throws IOException {
180     Class<?> implClass = null;
181     LOG.debug("Loading coprocessor class " + className + " with path " +
182         path + " and priority " + priority);
183 
184     ClassLoader cl = null;
185     if (path == null) {
186       try {
187         implClass = getClass().getClassLoader().loadClass(className);
188       } catch (ClassNotFoundException e) {
189         throw new IOException("No jar path specified for " + className);
190       }
191     } else {
192       cl = CoprocessorClassLoader.getClassLoader(
193         path, getClass().getClassLoader(), pathPrefix, conf);
194       try {
195         implClass = cl.loadClass(className);
196       } catch (ClassNotFoundException e) {
197         throw new IOException("Cannot load external coprocessor class " + className, e);
198       }
199     }
200 
201     //load custom code for coprocessor
202     Thread currentThread = Thread.currentThread();
203     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
204     try{
205       // switch temporarily to the thread classloader for custom CP
206       currentThread.setContextClassLoader(cl);
207       E cpInstance = loadInstance(implClass, priority, conf);
208       return cpInstance;
209     } finally {
210       // restore the fresh (host) classloader
211       currentThread.setContextClassLoader(hostClassLoader);
212     }
213   }
214 
215   /**
216    * @param implClass Implementation class
217    * @param priority priority
218    * @param conf configuration
219    * @throws java.io.IOException Exception
220    */
221   public void load(Class<?> implClass, int priority, Configuration conf)
222       throws IOException {
223     E env = loadInstance(implClass, priority, conf);
224     coprocessors.add(env);
225   }
226 
227   /**
228    * @param implClass Implementation class
229    * @param priority priority
230    * @param conf configuration
231    * @throws java.io.IOException Exception
232    */
233   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
234       throws IOException {
235     if (!Coprocessor.class.isAssignableFrom(implClass)) {
236       throw new IOException("Configured class " + implClass.getName() + " must implement "
237           + Coprocessor.class.getName() + " interface ");
238     }
239 
240     // create the instance
241     Coprocessor impl;
242     Object o = null;
243     try {
244       o = implClass.newInstance();
245       impl = (Coprocessor)o;
246     } catch (InstantiationException e) {
247       throw new IOException(e);
248     } catch (IllegalAccessException e) {
249       throw new IOException(e);
250     }
251     // create the environment
252     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
253     if (env instanceof Environment) {
254       ((Environment)env).startup();
255     }
256     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
257     // if server (master or regionserver) aborts.
258     coprocessorNames.add(implClass.getName());
259     return env;
260   }
261 
262   /**
263    * Called when a new Coprocessor class is loaded
264    */
265   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
266       int priority, int sequence, Configuration conf);
267 
268   public void shutdown(CoprocessorEnvironment e) {
269     if (e instanceof Environment) {
270       if (LOG.isDebugEnabled()) {
271         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
272       }
273       ((Environment)e).shutdown();
274     } else {
275       LOG.warn("Shutdown called on unknown environment: "+
276           e.getClass().getName());
277     }
278   }
279 
280   /**
281    * Find a coprocessor implementation by class name
282    * @param className the class name
283    * @return the coprocessor, or null if not found
284    */
285   public Coprocessor findCoprocessor(String className) {
286     for (E env: coprocessors) {
287       if (env.getInstance().getClass().getName().equals(className) ||
288           env.getInstance().getClass().getSimpleName().equals(className)) {
289         return env.getInstance();
290       }
291     }
292     return null;
293   }
294 
295   /**
296    * Find list of coprocessors that extend/implement the given class/interface
297    * @param cls the class/interface to look for
298    * @return the list of coprocessors, or null if not found
299    */
300   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
301     ArrayList<T> ret = new ArrayList<T>();
302 
303     for (E env: coprocessors) {
304       Coprocessor cp = env.getInstance();
305 
306       if(cp != null) {
307         if (cls.isAssignableFrom(cp.getClass())) {
308           ret.add((T)cp);
309         }
310       }
311     }
312     return ret;
313   }
314 
315   /**
316    * Find a coprocessor environment by class name
317    * @param className the class name
318    * @return the coprocessor, or null if not found
319    */
320   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
321     for (E env: coprocessors) {
322       if (env.getInstance().getClass().getName().equals(className) ||
323           env.getInstance().getClass().getSimpleName().equals(className)) {
324         return env;
325       }
326     }
327     return null;
328   }
329 
330   /**
331    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
332    * jar files.
333    * @return A set of ClassLoader instances
334    */
335   Set<ClassLoader> getExternalClassLoaders() {
336     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
337     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
338     for (E env : coprocessors) {
339       ClassLoader cl = env.getInstance().getClass().getClassLoader();
340       if (cl != systemClassLoader){
341         //do not include system classloader
342         externalClassLoaders.add(cl);
343       }
344     }
345     return externalClassLoaders;
346   }
347 
348   /**
349    * Environment priority comparator.
350    * Coprocessors are chained in sorted order.
351    */
352   static class EnvironmentPriorityComparator
353       implements Comparator<CoprocessorEnvironment> {
354     @Override
355     public int compare(final CoprocessorEnvironment env1,
356         final CoprocessorEnvironment env2) {
357       if (env1.getPriority() < env2.getPriority()) {
358         return -1;
359       } else if (env1.getPriority() > env2.getPriority()) {
360         return 1;
361       }
362       if (env1.getLoadSequence() < env2.getLoadSequence()) {
363         return -1;
364       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
365         return 1;
366       }
367       return 0;
368     }
369   }
370 
371   /**
372    * Encapsulation of the environment of each coprocessor
373    */
374   public static class Environment implements CoprocessorEnvironment {
375 
376     /** The coprocessor */
377     public Coprocessor impl;
378     /** Chaining priority */
379     protected int priority = Coprocessor.PRIORITY_USER;
380     /** Current coprocessor state */
381     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
382     /** Accounting for tables opened by the coprocessor */
383     protected List<Table> openTables =
384       Collections.synchronizedList(new ArrayList<Table>());
385     private int seq;
386     private Configuration conf;
387     private ClassLoader classLoader;
388 
389     /**
390      * Constructor
391      * @param impl the coprocessor instance
392      * @param priority chaining priority
393      */
394     public Environment(final Coprocessor impl, final int priority,
395         final int seq, final Configuration conf) {
396       this.impl = impl;
397       this.classLoader = impl.getClass().getClassLoader();
398       this.priority = priority;
399       this.state = Coprocessor.State.INSTALLED;
400       this.seq = seq;
401       this.conf = conf;
402     }
403 
404     /** Initialize the environment */
405     public void startup() throws IOException {
406       if (state == Coprocessor.State.INSTALLED ||
407           state == Coprocessor.State.STOPPED) {
408         state = Coprocessor.State.STARTING;
409         Thread currentThread = Thread.currentThread();
410         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
411         try {
412           currentThread.setContextClassLoader(this.getClassLoader());
413           impl.start(this);
414           state = Coprocessor.State.ACTIVE;
415         } finally {
416           currentThread.setContextClassLoader(hostClassLoader);
417         }
418       } else {
419         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
420             " because not inactive (state="+state.toString()+")");
421       }
422     }
423 
424     /** Clean up the environment */
425     protected void shutdown() {
426       if (state == Coprocessor.State.ACTIVE) {
427         state = Coprocessor.State.STOPPING;
428         Thread currentThread = Thread.currentThread();
429         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
430         try {
431           currentThread.setContextClassLoader(this.getClassLoader());
432           impl.stop(this);
433           state = Coprocessor.State.STOPPED;
434         } catch (IOException ioe) {
435           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
436         } finally {
437           currentThread.setContextClassLoader(hostClassLoader);
438         }
439       } else {
440         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
441             " because not active (state="+state.toString()+")");
442       }
443       synchronized (openTables) {
444         // clean up any table references
445         for (Table table: openTables) {
446           try {
447             ((HTableWrapper)table).internalClose();
448           } catch (IOException e) {
449             // nothing can be done here
450             LOG.warn("Failed to close " +
451                 table.getName(), e);
452           }
453         }
454       }
455     }
456 
457     @Override
458     public Coprocessor getInstance() {
459       return impl;
460     }
461 
462     @Override
463     public ClassLoader getClassLoader() {
464       return classLoader;
465     }
466 
467     @Override
468     public int getPriority() {
469       return priority;
470     }
471 
472     @Override
473     public int getLoadSequence() {
474       return seq;
475     }
476 
477     /** @return the coprocessor environment version */
478     @Override
479     public int getVersion() {
480       return Coprocessor.VERSION;
481     }
482 
483     /** @return the HBase release */
484     @Override
485     public String getHBaseVersion() {
486       return VersionInfo.getVersion();
487     }
488 
489     @Override
490     public Configuration getConfiguration() {
491       return conf;
492     }
493 
494     /**
495      * Open a table from within the Coprocessor environment
496      * @param tableName the table name
497      * @return an interface for manipulating the table
498      * @exception java.io.IOException Exception
499      */
500     @Override
501     public Table getTable(TableName tableName) throws IOException {
502       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
503     }
504 
505     /**
506      * Open a table from within the Coprocessor environment
507      * @param tableName the table name
508      * @return an interface for manipulating the table
509      * @exception java.io.IOException Exception
510      */
511     @Override
512     public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
513       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
514     }
515   }
516 
517   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
518     abortServer(environment.getInstance().getClass().getName(), e);
519   }
520 
521   protected void abortServer(final String coprocessorName, final Throwable e) {
522     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
523     LOG.error(message, e);
524     if (abortable != null) {
525       abortable.abort(message, e);
526     } else {
527       LOG.warn("No available Abortable, process was not aborted");
528     }
529   }
530 
531   /**
532    * This is used by coprocessor hooks which are declared to throw IOException
533    * (or its subtypes). For such hooks, we should handle throwable objects
534    * depending on the Throwable's type. Those which are instances of
535    * IOException should be passed on to the client. This is in conformance with
536    * the HBase idiom regarding IOException: that it represents a circumstance
537    * that should be passed along to the client for its own handling. For
538    * example, a coprocessor that implements access controls would throw a
539    * subclass of IOException, such as AccessDeniedException, in its preGet()
540    * method to prevent an unauthorized client's performing a Get on a particular
541    * table.
542    * @param env Coprocessor Environment
543    * @param e Throwable object thrown by coprocessor.
544    * @exception IOException Exception
545    */
546   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
547       throws IOException {
548     if (e instanceof IOException) {
549       throw (IOException)e;
550     }
551     // If we got here, e is not an IOException. A loaded coprocessor has a
552     // fatal bug, and the server (master or regionserver) should remove the
553     // faulty coprocessor from its set of active coprocessors. Setting
554     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
555     // which may be useful in development and testing environments where
556     // 'failing fast' for error analysis is desired.
557     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
558       // server is configured to abort.
559       abortServer(env, e);
560     } else {
561       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
562           "environment because it threw:  " + e,e);
563       coprocessors.remove(env);
564       try {
565         shutdown(env);
566       } catch (Exception x) {
567         LOG.error("Uncaught exception when shutting down coprocessor '"
568             + env.toString() + "'", x);
569       }
570       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
571           "' threw: '" + e + "' and has been removed from the active " +
572           "coprocessor set.", e);
573     }
574   }
575 
576   /**
577    * Used to gracefully handle fallback to deprecated methods when we
578    * evolve coprocessor APIs.
579    *
580    * When a particular Coprocessor API is updated to change methods, hosts can support fallback
581    * to the deprecated API by using this method to determine if an instance implements the new API.
582    * In the event that said support is partial, then in the face of a runtime issue that prevents
583    * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
584    *
585    * For examples of this in action, see the implementation of
586    * <ul>
587    *   <li>{@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
588    *   <li>{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost}
589    * </ul>
590    *
591    * @param clazz Coprocessor you wish to evaluate
592    * @param methodName the name of the non-deprecated method version
593    * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
594    *     declared.
595    */
596   @InterfaceAudience.Private
597   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
598       final String methodName, final Class<?>... parameterTypes) {
599     boolean useLegacy;
600     // Use reflection to see if they implement the non-deprecated version
601     try {
602       clazz.getDeclaredMethod(methodName, parameterTypes);
603       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
604           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
605       useLegacy = false;
606     } catch (NoSuchMethodException exception) {
607       useLegacy = true;
608     } catch (SecurityException exception) {
609       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
610           "' requires legacy support; assuming it does. If you get later errors about legacy " +
611           "coprocessor use, consider updating your security policy to allow access to the package" +
612           " and declared members of your implementation.");
613       LOG.debug("Details of Security Manager rejection.", exception);
614       useLegacy = true;
615     }
616     return useLegacy;
617   }
618 
619   /**
620    * Used to limit legacy handling to once per Coprocessor class per classloader.
621    */
622   private static final Set<Class<? extends Coprocessor>> legacyWarning =
623       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
624           new Comparator<Class<? extends Coprocessor>>() {
625             @Override
626             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
627               if (c1.equals(c2)) {
628                 return 0;
629               }
630               return c1.getName().compareTo(c2.getName());
631             }
632           });
633 
634   /**
635    * limits the amount of logging to once per coprocessor class.
636    * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
637    * prevents properly supporting the legacy version of a coprocessor API.
638    * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
639    */
640   @InterfaceAudience.Private
641   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
642     if(legacyWarning.add(clazz)) {
643       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
644           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
645           "'. Details of the problem: " + message);
646     }
647   }
648 }