View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentSkipListSet;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Abortable;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.HTableInterface;
50  import org.apache.hadoop.hbase.client.HTableWrapper;
51  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
52  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
53  import org.apache.hadoop.hbase.util.VersionInfo;
54  
55  /**
56   * Provides the common setup framework and runtime services for coprocessor
57   * invocation from HBase services.
58   * @param <E> the specific environment extension that a concrete implementation
59   * provides
60   */
61  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
62  @InterfaceStability.Evolving
63  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
64    public static final String REGION_COPROCESSOR_CONF_KEY =
65        "hbase.coprocessor.region.classes";
66    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
67        "hbase.coprocessor.regionserver.classes";
68    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
69        "hbase.coprocessor.user.region.classes";
70    public static final String MASTER_COPROCESSOR_CONF_KEY =
71        "hbase.coprocessor.master.classes";
72    public static final String WAL_COPROCESSOR_CONF_KEY =
73      "hbase.coprocessor.wal.classes";
74    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
75    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
76  
77    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
78    protected Abortable abortable;
79    /** Ordered set of loaded coprocessors with lock */
80    protected SortedSet<E> coprocessors =
81        new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
82    protected Configuration conf;
83    // unique file prefix to use for local copies of jars when classloading
84    protected String pathPrefix;
85    protected AtomicInteger loadSequence = new AtomicInteger();
86  
87    public CoprocessorHost(Abortable abortable) {
88      this.abortable = abortable;
89      this.pathPrefix = UUID.randomUUID().toString();
90    }
91  
92    /**
93     * Not to be confused with the per-object _coprocessors_ (above),
94     * coprocessorNames is static and stores the set of all coprocessors ever
95     * loaded by any thread in this JVM. It is strictly additive: coprocessors are
96     * added to coprocessorNames, by loadInstance() but are never removed, since
97     * the intention is to preserve a history of all loaded coprocessors for
98     * diagnosis in case of server crash (HBASE-4014).
99     */
100   private static Set<String> coprocessorNames =
101       Collections.synchronizedSet(new HashSet<String>());
102   public static Set<String> getLoadedCoprocessors() {
103       return coprocessorNames;
104   }
105 
106   /**
107    * Used to create a parameter to the HServerLoad constructor so that
108    * HServerLoad can provide information about the coprocessors loaded by this
109    * regionserver.
110    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
111    * to master).
112    */
113   public Set<String> getCoprocessors() {
114     Set<String> returnValue = new TreeSet<String>();
115     for(CoprocessorEnvironment e: coprocessors) {
116       returnValue.add(e.getInstance().getClass().getSimpleName());
117     }
118     return returnValue;
119   }
120 
121   /**
122    * Load system coprocessors. Read the class names from configuration.
123    * Called by constructor.
124    */
125   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
126     Class<?> implClass = null;
127 
128     // load default coprocessors from configure file
129     String[] defaultCPClasses = conf.getStrings(confKey);
130     if (defaultCPClasses == null || defaultCPClasses.length == 0)
131       return;
132 
133     int priority = Coprocessor.PRIORITY_SYSTEM;
134     List<E> configured = new ArrayList<E>();
135     for (String className : defaultCPClasses) {
136       className = className.trim();
137       if (findCoprocessor(className) != null) {
138         continue;
139       }
140       ClassLoader cl = this.getClass().getClassLoader();
141       Thread.currentThread().setContextClassLoader(cl);
142       try {
143         implClass = cl.loadClass(className);
144         configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
145         LOG.info("System coprocessor " + className + " was loaded " +
146             "successfully with priority (" + priority++ + ").");
147       } catch (Throwable t) {
148         // We always abort if system coprocessors cannot be loaded
149         abortServer(className, t);
150       }
151     }
152 
153     // add entire set to the collection for COW efficiency
154     coprocessors.addAll(configured);
155   }
156 
157   /**
158    * Load a coprocessor implementation into the host
159    * @param path path to implementation jar
160    * @param className the main class name
161    * @param priority chaining priority
162    * @param conf configuration for coprocessor
163    * @throws java.io.IOException Exception
164    */
165   public E load(Path path, String className, int priority,
166       Configuration conf) throws IOException {
167     Class<?> implClass = null;
168     LOG.debug("Loading coprocessor class " + className + " with path " +
169         path + " and priority " + priority);
170 
171     ClassLoader cl = null;
172     if (path == null) {
173       try {
174         implClass = getClass().getClassLoader().loadClass(className);
175       } catch (ClassNotFoundException e) {
176         throw new IOException("No jar path specified for " + className);
177       }
178     } else {
179       cl = CoprocessorClassLoader.getClassLoader(
180         path, getClass().getClassLoader(), pathPrefix, conf);
181       try {
182         implClass = cl.loadClass(className);
183       } catch (ClassNotFoundException e) {
184         throw new IOException("Cannot load external coprocessor class " + className, e);
185       }
186     }
187 
188     //load custom code for coprocessor
189     Thread currentThread = Thread.currentThread();
190     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
191     try{
192       // switch temporarily to the thread classloader for custom CP
193       currentThread.setContextClassLoader(cl);
194       E cpInstance = loadInstance(implClass, priority, conf);
195       return cpInstance;
196     } finally {
197       // restore the fresh (host) classloader
198       currentThread.setContextClassLoader(hostClassLoader);
199     }
200   }
201 
202   /**
203    * @param implClass Implementation class
204    * @param priority priority
205    * @param conf configuration
206    * @throws java.io.IOException Exception
207    */
208   public void load(Class<?> implClass, int priority, Configuration conf)
209       throws IOException {
210     E env = loadInstance(implClass, priority, conf);
211     coprocessors.add(env);
212   }
213 
214   /**
215    * @param implClass Implementation class
216    * @param priority priority
217    * @param conf configuration
218    * @throws java.io.IOException Exception
219    */
220   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
221       throws IOException {
222     if (!Coprocessor.class.isAssignableFrom(implClass)) {
223       throw new IOException("Configured class " + implClass.getName() + " must implement "
224           + Coprocessor.class.getName() + " interface ");
225     }
226 
227     // create the instance
228     Coprocessor impl;
229     Object o = null;
230     try {
231       o = implClass.newInstance();
232       impl = (Coprocessor)o;
233     } catch (InstantiationException e) {
234       throw new IOException(e);
235     } catch (IllegalAccessException e) {
236       throw new IOException(e);
237     }
238     // create the environment
239     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
240     if (env instanceof Environment) {
241       ((Environment)env).startup();
242     }
243     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
244     // if server (master or regionserver) aborts.
245     coprocessorNames.add(implClass.getName());
246     return env;
247   }
248 
249   /**
250    * Called when a new Coprocessor class is loaded
251    */
252   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
253       int priority, int sequence, Configuration conf);
254 
255   public void shutdown(CoprocessorEnvironment e) {
256     if (e instanceof Environment) {
257       if (LOG.isDebugEnabled()) {
258         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
259       }
260       ((Environment)e).shutdown();
261     } else {
262       LOG.warn("Shutdown called on unknown environment: "+
263           e.getClass().getName());
264     }
265   }
266 
267   /**
268    * Find a coprocessor implementation by class name
269    * @param className the class name
270    * @return the coprocessor, or null if not found
271    */
272   public Coprocessor findCoprocessor(String className) {
273     for (E env: coprocessors) {
274       if (env.getInstance().getClass().getName().equals(className) ||
275           env.getInstance().getClass().getSimpleName().equals(className)) {
276         return env.getInstance();
277       }
278     }
279     return null;
280   }
281 
282   /**
283    * Find list of coprocessors that extend/implement the given class/interface
284    * @param cls the class/interface to look for
285    * @return the list of coprocessors, or null if not found
286    */
287   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
288     ArrayList<T> ret = new ArrayList<T>();
289 
290     for (E env: coprocessors) {
291       Coprocessor cp = env.getInstance();
292 
293       if(cp != null) {
294         if (cls.isAssignableFrom(cp.getClass())) {
295           ret.add((T)cp);
296         }
297       }
298     }
299     return ret;
300   }
301 
302   /**
303    * Find a coprocessor environment by class name
304    * @param className the class name
305    * @return the coprocessor, or null if not found
306    */
307   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
308     for (E env: coprocessors) {
309       if (env.getInstance().getClass().getName().equals(className) ||
310           env.getInstance().getClass().getSimpleName().equals(className)) {
311         return env;
312       }
313     }
314     return null;
315   }
316 
317   /**
318    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
319    * jar files.
320    * @return A set of ClassLoader instances
321    */
322   Set<ClassLoader> getExternalClassLoaders() {
323     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
324     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
325     for (E env : coprocessors) {
326       ClassLoader cl = env.getInstance().getClass().getClassLoader();
327       if (cl != systemClassLoader){
328         //do not include system classloader
329         externalClassLoaders.add(cl);
330       }
331     }
332     return externalClassLoaders;
333   }
334 
335   /**
336    * Environment priority comparator.
337    * Coprocessors are chained in sorted order.
338    */
339   static class EnvironmentPriorityComparator
340       implements Comparator<CoprocessorEnvironment> {
341     public int compare(final CoprocessorEnvironment env1,
342         final CoprocessorEnvironment env2) {
343       if (env1.getPriority() < env2.getPriority()) {
344         return -1;
345       } else if (env1.getPriority() > env2.getPriority()) {
346         return 1;
347       }
348       if (env1.getLoadSequence() < env2.getLoadSequence()) {
349         return -1;
350       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
351         return 1;
352       }
353       return 0;
354     }
355   }
356 
357   /**
358    * Encapsulation of the environment of each coprocessor
359    */
360   public static class Environment implements CoprocessorEnvironment {
361 
362     /** The coprocessor */
363     public Coprocessor impl;
364     /** Chaining priority */
365     protected int priority = Coprocessor.PRIORITY_USER;
366     /** Current coprocessor state */
367     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
368     /** Accounting for tables opened by the coprocessor */
369     protected List<HTableInterface> openTables =
370       Collections.synchronizedList(new ArrayList<HTableInterface>());
371     private int seq;
372     private Configuration conf;
373     private ClassLoader classLoader;
374 
375     /**
376      * Constructor
377      * @param impl the coprocessor instance
378      * @param priority chaining priority
379      */
380     public Environment(final Coprocessor impl, final int priority,
381         final int seq, final Configuration conf) {
382       this.impl = impl;
383       this.classLoader = impl.getClass().getClassLoader();
384       this.priority = priority;
385       this.state = Coprocessor.State.INSTALLED;
386       this.seq = seq;
387       this.conf = conf;
388     }
389 
390     /** Initialize the environment */
391     public void startup() throws IOException {
392       if (state == Coprocessor.State.INSTALLED ||
393           state == Coprocessor.State.STOPPED) {
394         state = Coprocessor.State.STARTING;
395         Thread currentThread = Thread.currentThread();
396         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
397         try {
398           currentThread.setContextClassLoader(this.getClassLoader());
399           impl.start(this);
400           state = Coprocessor.State.ACTIVE;
401         } finally {
402           currentThread.setContextClassLoader(hostClassLoader);
403         }
404       } else {
405         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
406             " because not inactive (state="+state.toString()+")");
407       }
408     }
409 
410     /** Clean up the environment */
411     protected void shutdown() {
412       if (state == Coprocessor.State.ACTIVE) {
413         state = Coprocessor.State.STOPPING;
414         Thread currentThread = Thread.currentThread();
415         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
416         try {
417           currentThread.setContextClassLoader(this.getClassLoader());
418           impl.stop(this);
419           state = Coprocessor.State.STOPPED;
420         } catch (IOException ioe) {
421           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
422         } finally {
423           currentThread.setContextClassLoader(hostClassLoader);
424         }
425       } else {
426         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
427             " because not active (state="+state.toString()+")");
428       }
429       // clean up any table references
430       for (HTableInterface table: openTables) {
431         try {
432           ((HTableWrapper)table).internalClose();
433         } catch (IOException e) {
434           // nothing can be done here
435           LOG.warn("Failed to close " +
436               table.getName(), e);
437         }
438       }
439     }
440 
441     @Override
442     public Coprocessor getInstance() {
443       return impl;
444     }
445 
446     @Override
447     public ClassLoader getClassLoader() {
448       return classLoader;
449     }
450 
451     @Override
452     public int getPriority() {
453       return priority;
454     }
455 
456     @Override
457     public int getLoadSequence() {
458       return seq;
459     }
460 
461     /** @return the coprocessor environment version */
462     @Override
463     public int getVersion() {
464       return Coprocessor.VERSION;
465     }
466 
467     /** @return the HBase release */
468     @Override
469     public String getHBaseVersion() {
470       return VersionInfo.getVersion();
471     }
472 
473     @Override
474     public Configuration getConfiguration() {
475       return conf;
476     }
477 
478     /**
479      * Open a table from within the Coprocessor environment
480      * @param tableName the table name
481      * @return an interface for manipulating the table
482      * @exception java.io.IOException Exception
483      */
484     @Override
485     public HTableInterface getTable(TableName tableName) throws IOException {
486       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
487     }
488 
489     /**
490      * Open a table from within the Coprocessor environment
491      * @param tableName the table name
492      * @return an interface for manipulating the table
493      * @exception java.io.IOException Exception
494      */
495     @Override
496     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
497       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
498     }
499   }
500 
501   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
502     abortServer(environment.getInstance().getClass().getName(), e);
503   }
504 
505   protected void abortServer(final String coprocessorName, final Throwable e) {
506     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
507     LOG.error(message, e);
508     if (abortable != null) {
509       abortable.abort(message, e);
510     } else {
511       LOG.warn("No available Abortable, process was not aborted");
512     }
513   }
514 
515   /**
516    * This is used by coprocessor hooks which are declared to throw IOException
517    * (or its subtypes). For such hooks, we should handle throwable objects
518    * depending on the Throwable's type. Those which are instances of
519    * IOException should be passed on to the client. This is in conformance with
520    * the HBase idiom regarding IOException: that it represents a circumstance
521    * that should be passed along to the client for its own handling. For
522    * example, a coprocessor that implements access controls would throw a
523    * subclass of IOException, such as AccessDeniedException, in its preGet()
524    * method to prevent an unauthorized client's performing a Get on a particular
525    * table.
526    * @param env Coprocessor Environment
527    * @param e Throwable object thrown by coprocessor.
528    * @exception IOException Exception
529    */
530   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
531       throws IOException {
532     if (e instanceof IOException) {
533       throw (IOException)e;
534     }
535     // If we got here, e is not an IOException. A loaded coprocessor has a
536     // fatal bug, and the server (master or regionserver) should remove the
537     // faulty coprocessor from its set of active coprocessors. Setting
538     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
539     // which may be useful in development and testing environments where
540     // 'failing fast' for error analysis is desired.
541     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
542       // server is configured to abort.
543       abortServer(env, e);
544     } else {
545       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
546           "environment because it threw:  " + e,e);
547       coprocessors.remove(env);
548       try {
549         shutdown(env);
550       } catch (Exception x) {
551         LOG.error("Uncaught exception when shutting down coprocessor '"
552             + env.toString() + "'", x);
553       }
554       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
555           "' threw: '" + e + "' and has been removed from the active " +
556           "coprocessor set.", e);
557     }
558   }
559 
560   /**
561    * Used to gracefully handle fallback to deprecated methods when we
562    * evolve coprocessor APIs.
563    *
564    * When a particular Coprocessor API is updated to change methods, hosts can support fallback
565    * to the deprecated API by using this method to determine if an instance implements the new API.
566    * In the event that said support is partial, then in the face of a runtime issue that prevents
567    * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
568    *
569    * For examples of this in action, see the implementation of
570    * <ul>
571    *   <li>{@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
572    *   <li>{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost}
573    * </ul>
574    *
575    * @param clazz Coprocessor you wish to evaluate
576    * @param methodName the name of the non-deprecated method version
577    * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
578    *     declared.
579    */
580   @InterfaceAudience.Private
581   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
582       final String methodName, final Class<?>... parameterTypes) {
583     boolean useLegacy;
584     // Use reflection to see if they implement the non-deprecated version
585     try {
586       clazz.getDeclaredMethod(methodName, parameterTypes);
587       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
588           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
589       useLegacy = false;
590     } catch (NoSuchMethodException exception) {
591       useLegacy = true;
592     } catch (SecurityException exception) {
593       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
594           "' requires legacy support; assuming it does. If you get later errors about legacy " +
595           "coprocessor use, consider updating your security policy to allow access to the package" +
596           " and declared members of your implementation.");
597       LOG.debug("Details of Security Manager rejection.", exception);
598       useLegacy = true;
599     }
600     return useLegacy;
601   }
602 
603   /**
604    * Used to limit legacy handling to once per Coprocessor class per classloader.
605    */
606   private static final Set<Class<? extends Coprocessor>> legacyWarning =
607       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
608           new Comparator<Class<? extends Coprocessor>>() {
609             @Override
610             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
611               if (c1.equals(c2)) {
612                 return 0;
613               }
614               return c1.getName().compareTo(c2.getName());
615             }
616           });
617 
618   /**
619    * limits the amount of logging to once per coprocessor class.
620    * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
621    * prevents properly supporting the legacy version of a coprocessor API.
622    * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
623    */
624   @InterfaceAudience.Private
625   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
626     if(legacyWarning.add(clazz)) {
627       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
628           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
629           "'. Details of the problem: " + message);
630     }
631   }
632 }