View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentSkipListSet;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Abortable;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.HTableInterface;
50  import org.apache.hadoop.hbase.client.HTableWrapper;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
53  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
54  import org.apache.hadoop.hbase.util.VersionInfo;
55  
56  /**
57   * Provides the common setup framework and runtime services for coprocessor
58   * invocation from HBase services.
59   * @param <E> the specific environment extension that a concrete implementation
60   * provides
61   */
62  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
63  @InterfaceStability.Evolving
64  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
65    public static final String REGION_COPROCESSOR_CONF_KEY =
66        "hbase.coprocessor.region.classes";
67    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
68        "hbase.coprocessor.regionserver.classes";
69    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
70        "hbase.coprocessor.user.region.classes";
71    public static final String MASTER_COPROCESSOR_CONF_KEY =
72        "hbase.coprocessor.master.classes";
73    public static final String WAL_COPROCESSOR_CONF_KEY =
74      "hbase.coprocessor.wal.classes";
75    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
76    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
77    public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
78    public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
79    public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
80      "hbase.coprocessor.user.enabled";
81    public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
82  
83    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
84    protected Abortable abortable;
85    /** Ordered set of loaded coprocessors with lock */
86    protected SortedSet<E> coprocessors =
87        new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
88    protected Configuration conf;
89    // unique file prefix to use for local copies of jars when classloading
90    protected String pathPrefix;
91    protected AtomicInteger loadSequence = new AtomicInteger();
92  
93    public CoprocessorHost(Abortable abortable) {
94      this.abortable = abortable;
95      this.pathPrefix = UUID.randomUUID().toString();
96    }
97  
98    /**
99     * Not to be confused with the per-object _coprocessors_ (above),
100    * coprocessorNames is static and stores the set of all coprocessors ever
101    * loaded by any thread in this JVM. It is strictly additive: coprocessors are
102    * added to coprocessorNames, by loadInstance() but are never removed, since
103    * the intention is to preserve a history of all loaded coprocessors for
104    * diagnosis in case of server crash (HBASE-4014).
105    */
106   private static Set<String> coprocessorNames =
107       Collections.synchronizedSet(new HashSet<String>());
108 
109   public static Set<String> getLoadedCoprocessors() {
110     synchronized (coprocessorNames) {
111       return new HashSet(coprocessorNames);
112     }
113   }
114 
115   /**
116    * Used to create a parameter to the HServerLoad constructor so that
117    * HServerLoad can provide information about the coprocessors loaded by this
118    * regionserver.
119    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
120    * to master).
121    */
122   public Set<String> getCoprocessors() {
123     Set<String> returnValue = new TreeSet<String>();
124     for (CoprocessorEnvironment e: coprocessors) {
125       returnValue.add(e.getInstance().getClass().getSimpleName());
126     }
127     return returnValue;
128   }
129 
130   /**
131    * Load system coprocessors once only. Read the class names from configuration.
132    * Called by constructor.
133    */
134   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
135     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
136       DEFAULT_COPROCESSORS_ENABLED);
137     if (!coprocessorsEnabled) {
138       return;
139     }
140 
141     Class<?> implClass = null;
142 
143     // load default coprocessors from configure file
144     String[] defaultCPClasses = conf.getStrings(confKey);
145     if (defaultCPClasses == null || defaultCPClasses.length == 0)
146       return;
147 
148     int priority = Coprocessor.PRIORITY_SYSTEM;
149     for (String className : defaultCPClasses) {
150       className = className.trim();
151       if (findCoprocessor(className) != null) {
152         // If already loaded will just continue
153         LOG.warn("Attempted duplicate loading of " + className + "; skipped");
154         continue;
155       }
156       ClassLoader cl = this.getClass().getClassLoader();
157       Thread.currentThread().setContextClassLoader(cl);
158       try {
159         implClass = cl.loadClass(className);
160         // Add coprocessors as we go to guard against case where a coprocessor is specified twice
161         // in the configuration
162         this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
163         LOG.info("System coprocessor " + className + " was loaded " +
164             "successfully with priority (" + priority++ + ").");
165       } catch (Throwable t) {
166         // We always abort if system coprocessors cannot be loaded
167         abortServer(className, t);
168       }
169     }
170   }
171 
172   /**
173    * Load a coprocessor implementation into the host
174    * @param path path to implementation jar
175    * @param className the main class name
176    * @param priority chaining priority
177    * @param conf configuration for coprocessor
178    * @throws java.io.IOException Exception
179    */
180   public E load(Path path, String className, int priority,
181       Configuration conf) throws IOException {
182     Class<?> implClass = null;
183     LOG.debug("Loading coprocessor class " + className + " with path " +
184         path + " and priority " + priority);
185 
186     ClassLoader cl = null;
187     if (path == null) {
188       try {
189         implClass = getClass().getClassLoader().loadClass(className);
190       } catch (ClassNotFoundException e) {
191         throw new IOException("No jar path specified for " + className);
192       }
193     } else {
194       cl = CoprocessorClassLoader.getClassLoader(
195         path, getClass().getClassLoader(), pathPrefix, conf);
196       try {
197         implClass = cl.loadClass(className);
198       } catch (ClassNotFoundException e) {
199         throw new IOException("Cannot load external coprocessor class " + className, e);
200       }
201     }
202 
203     //load custom code for coprocessor
204     Thread currentThread = Thread.currentThread();
205     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
206     try{
207       // switch temporarily to the thread classloader for custom CP
208       currentThread.setContextClassLoader(cl);
209       E cpInstance = loadInstance(implClass, priority, conf);
210       return cpInstance;
211     } finally {
212       // restore the fresh (host) classloader
213       currentThread.setContextClassLoader(hostClassLoader);
214     }
215   }
216 
217   /**
218    * @param implClass Implementation class
219    * @param priority priority
220    * @param conf configuration
221    * @throws java.io.IOException Exception
222    */
223   public void load(Class<?> implClass, int priority, Configuration conf)
224       throws IOException {
225     E env = loadInstance(implClass, priority, conf);
226     coprocessors.add(env);
227   }
228 
229   /**
230    * @param implClass Implementation class
231    * @param priority priority
232    * @param conf configuration
233    * @throws java.io.IOException Exception
234    */
235   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
236       throws IOException {
237     if (!Coprocessor.class.isAssignableFrom(implClass)) {
238       throw new IOException("Configured class " + implClass.getName() + " must implement "
239           + Coprocessor.class.getName() + " interface ");
240     }
241 
242     // create the instance
243     Coprocessor impl;
244     Object o = null;
245     try {
246       o = implClass.newInstance();
247       impl = (Coprocessor)o;
248     } catch (InstantiationException e) {
249       throw new IOException(e);
250     } catch (IllegalAccessException e) {
251       throw new IOException(e);
252     }
253     // create the environment
254     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
255     if (env instanceof Environment) {
256       ((Environment)env).startup();
257     }
258     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
259     // if server (master or regionserver) aborts.
260     coprocessorNames.add(implClass.getName());
261     return env;
262   }
263 
264   /**
265    * Called when a new Coprocessor class is loaded
266    */
267   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
268       int priority, int sequence, Configuration conf);
269 
270   public void shutdown(CoprocessorEnvironment e) {
271     if (e instanceof Environment) {
272       if (LOG.isDebugEnabled()) {
273         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
274       }
275       ((Environment)e).shutdown();
276     } else {
277       LOG.warn("Shutdown called on unknown environment: "+
278           e.getClass().getName());
279     }
280   }
281 
282   /**
283    * Find a coprocessor implementation by class name
284    * @param className the class name
285    * @return the coprocessor, or null if not found
286    */
287   public Coprocessor findCoprocessor(String className) {
288     for (E env: coprocessors) {
289       if (env.getInstance().getClass().getName().equals(className) ||
290           env.getInstance().getClass().getSimpleName().equals(className)) {
291         return env.getInstance();
292       }
293     }
294     return null;
295   }
296 
297   /**
298    * Find list of coprocessors that extend/implement the given class/interface
299    * @param cls the class/interface to look for
300    * @return the list of coprocessors, or null if not found
301    */
302   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
303     ArrayList<T> ret = new ArrayList<T>();
304 
305     for (E env: coprocessors) {
306       Coprocessor cp = env.getInstance();
307 
308       if(cp != null) {
309         if (cls.isAssignableFrom(cp.getClass())) {
310           ret.add((T)cp);
311         }
312       }
313     }
314     return ret;
315   }
316 
317   /**
318    * Find a coprocessor environment by class name
319    * @param className the class name
320    * @return the coprocessor, or null if not found
321    */
322   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
323     for (E env: coprocessors) {
324       if (env.getInstance().getClass().getName().equals(className) ||
325           env.getInstance().getClass().getSimpleName().equals(className)) {
326         return env;
327       }
328     }
329     return null;
330   }
331 
332   /**
333    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
334    * jar files.
335    * @return A set of ClassLoader instances
336    */
337   Set<ClassLoader> getExternalClassLoaders() {
338     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
339     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
340     for (E env : coprocessors) {
341       ClassLoader cl = env.getInstance().getClass().getClassLoader();
342       if (cl != systemClassLoader ){
343         //do not include system classloader
344         externalClassLoaders.add(cl);
345       }
346     }
347     return externalClassLoaders;
348   }
349 
350   /**
351    * Environment priority comparator.
352    * Coprocessors are chained in sorted order.
353    */
354   static class EnvironmentPriorityComparator
355       implements Comparator<CoprocessorEnvironment> {
356     @Override
357     public int compare(final CoprocessorEnvironment env1,
358         final CoprocessorEnvironment env2) {
359       if (env1.getPriority() < env2.getPriority()) {
360         return -1;
361       } else if (env1.getPriority() > env2.getPriority()) {
362         return 1;
363       }
364       if (env1.getLoadSequence() < env2.getLoadSequence()) {
365         return -1;
366       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
367         return 1;
368       }
369       return 0;
370     }
371   }
372 
373   /**
374    * Encapsulation of the environment of each coprocessor
375    */
376   public static class Environment implements CoprocessorEnvironment {
377 
378     /** The coprocessor */
379     public Coprocessor impl;
380     /** Chaining priority */
381     protected int priority = Coprocessor.PRIORITY_USER;
382     /** Current coprocessor state */
383     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
384     /** Accounting for tables opened by the coprocessor */
385     protected List<HTableInterface> openTables =
386       Collections.synchronizedList(new ArrayList<HTableInterface>());
387     private int seq;
388     private Configuration conf;
389     private ClassLoader classLoader;
390 
391     /**
392      * Constructor
393      * @param impl the coprocessor instance
394      * @param priority chaining priority
395      */
396     public Environment(final Coprocessor impl, final int priority,
397         final int seq, final Configuration conf) {
398       this.impl = impl;
399       this.classLoader = impl.getClass().getClassLoader();
400       this.priority = priority;
401       this.state = Coprocessor.State.INSTALLED;
402       this.seq = seq;
403       this.conf = conf;
404     }
405 
406     /** Initialize the environment */
407     public void startup() throws IOException {
408       if (state == Coprocessor.State.INSTALLED ||
409           state == Coprocessor.State.STOPPED) {
410         state = Coprocessor.State.STARTING;
411         Thread currentThread = Thread.currentThread();
412         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
413         try {
414           currentThread.setContextClassLoader(this.getClassLoader());
415           impl.start(this);
416           state = Coprocessor.State.ACTIVE;
417         } finally {
418           currentThread.setContextClassLoader(hostClassLoader);
419         }
420       } else {
421         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
422             " because not inactive (state="+state.toString()+")");
423       }
424     }
425 
426     /** Clean up the environment */
427     protected void shutdown() {
428       if (state == Coprocessor.State.ACTIVE) {
429         state = Coprocessor.State.STOPPING;
430         Thread currentThread = Thread.currentThread();
431         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
432         try {
433           currentThread.setContextClassLoader(this.getClassLoader());
434           impl.stop(this);
435           state = Coprocessor.State.STOPPED;
436         } catch (IOException ioe) {
437           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
438         } finally {
439           currentThread.setContextClassLoader(hostClassLoader);
440         }
441       } else {
442         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
443             " because not active (state="+state.toString()+")");
444       }
445       synchronized (openTables) {
446         // clean up any table references
447         for (HTableInterface table: openTables) {
448           try {
449             ((HTableWrapper)table).internalClose();
450           } catch (IOException e) {
451             // nothing can be done here
452             LOG.warn("Failed to close " +
453                 Bytes.toStringBinary(table.getTableName()), e);
454           }
455         }
456       }
457     }
458 
459     @Override
460     public Coprocessor getInstance() {
461       return impl;
462     }
463 
464     @Override
465     public ClassLoader getClassLoader() {
466       return classLoader;
467     }
468 
469     @Override
470     public int getPriority() {
471       return priority;
472     }
473 
474     @Override
475     public int getLoadSequence() {
476       return seq;
477     }
478 
479     /** @return the coprocessor environment version */
480     @Override
481     public int getVersion() {
482       return Coprocessor.VERSION;
483     }
484 
485     /** @return the HBase release */
486     @Override
487     public String getHBaseVersion() {
488       return VersionInfo.getVersion();
489     }
490 
491     @Override
492     public Configuration getConfiguration() {
493       return conf;
494     }
495 
496     /**
497      * Open a table from within the Coprocessor environment
498      * @param tableName the table name
499      * @return an interface for manipulating the table
500      * @exception java.io.IOException Exception
501      */
502     @Override
503     public HTableInterface getTable(TableName tableName) throws IOException {
504       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
505     }
506 
507     /**
508      * Open a table from within the Coprocessor environment
509      * @param tableName the table name
510      * @return an interface for manipulating the table
511      * @exception java.io.IOException Exception
512      */
513     @Override
514     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
515       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
516     }
517   }
518 
519   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
520     abortServer(environment.getInstance().getClass().getName(), e);
521   }
522 
523   protected void abortServer(final String coprocessorName, final Throwable e) {
524     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
525     LOG.error(message, e);
526     if (abortable != null) {
527       abortable.abort(message, e);
528     } else {
529       LOG.warn("No available Abortable, process was not aborted");
530     }
531   }
532 
533   /**
534    * This is used by coprocessor hooks which are declared to throw IOException
535    * (or its subtypes). For such hooks, we should handle throwable objects
536    * depending on the Throwable's type. Those which are instances of
537    * IOException should be passed on to the client. This is in conformance with
538    * the HBase idiom regarding IOException: that it represents a circumstance
539    * that should be passed along to the client for its own handling. For
540    * example, a coprocessor that implements access controls would throw a
541    * subclass of IOException, such as AccessDeniedException, in its preGet()
542    * method to prevent an unauthorized client's performing a Get on a particular
543    * table.
544    * @param env Coprocessor Environment
545    * @param e Throwable object thrown by coprocessor.
546    * @exception IOException Exception
547    */
548   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
549       throws IOException {
550     if (e instanceof IOException) {
551       throw (IOException)e;
552     }
553     // If we got here, e is not an IOException. A loaded coprocessor has a
554     // fatal bug, and the server (master or regionserver) should remove the
555     // faulty coprocessor from its set of active coprocessors. Setting
556     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
557     // which may be useful in development and testing environments where
558     // 'failing fast' for error analysis is desired.
559     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
560       // server is configured to abort.
561       abortServer(env, e);
562     } else {
563       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
564           "environment because it threw:  " + e,e);
565       coprocessors.remove(env);
566       try {
567         shutdown(env);
568       } catch (Exception x) {
569         LOG.error("Uncaught exception when shutting down coprocessor '"
570             + env.toString() + "'", x);
571       }
572       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
573           "' threw: '" + e + "' and has been removed from the active " +
574           "coprocessor set.", e);
575     }
576   }
577 
578   /**
579    * Used to gracefully handle fallback to deprecated methods when we
580    * evolve coprocessor APIs.
581    *
582    * When a particular Coprocessor API is updated to change methods, hosts can support fallback
583    * to the deprecated API by using this method to determine if an instance implements the new API.
584    * In the event that said support is partial, then in the face of a runtime issue that prevents
585    * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
586    *
587    * For examples of this in action, see the implementation of
588    * <ul>
589    *   <li>{@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
590    *   <li>{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost}
591    * </ul>
592    *
593    * @param clazz Coprocessor you wish to evaluate
594    * @param methodName the name of the non-deprecated method version
595    * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
596    *     declared.
597    */
598   @InterfaceAudience.Private
599   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
600       final String methodName, final Class<?>... parameterTypes) {
601     boolean useLegacy;
602     // Use reflection to see if they implement the non-deprecated version
603     try {
604       clazz.getDeclaredMethod(methodName, parameterTypes);
605       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
606           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
607       useLegacy = false;
608     } catch (NoSuchMethodException exception) {
609       useLegacy = true;
610     } catch (SecurityException exception) {
611       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
612           "' requires legacy support; assuming it does. If you get later errors about legacy " +
613           "coprocessor use, consider updating your security policy to allow access to the package" +
614           " and declared members of your implementation.");
615       LOG.debug("Details of Security Manager rejection.", exception);
616       useLegacy = true;
617     }
618     return useLegacy;
619   }
620 
621   /**
622    * Used to limit legacy handling to once per Coprocessor class per classloader.
623    */
624   private static final Set<Class<? extends Coprocessor>> legacyWarning =
625       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
626           new Comparator<Class<? extends Coprocessor>>() {
627             @Override
628             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
629               if (c1.equals(c2)) {
630                 return 0;
631               }
632               return c1.getName().compareTo(c2.getName());
633             }
634           });
635 
636   /**
637    * limits the amount of logging to once per coprocessor class.
638    * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
639    * prevents properly supporting the legacy version of a coprocessor API.
640    * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
641    */
642   @InterfaceAudience.Private
643   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
644     if(legacyWarning.add(clazz)) {
645       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
646           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
647           "'. Details of the problem: " + message);
648     }
649   }
650 }