View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentSkipListSet;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Abortable;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.HTableInterface;
50  import org.apache.hadoop.hbase.client.HTableWrapper;
51  import org.apache.hadoop.hbase.client.Table;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
54  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
55  import org.apache.hadoop.hbase.util.VersionInfo;
56  
57  /**
58   * Provides the common setup framework and runtime services for coprocessor
59   * invocation from HBase services.
60   * @param <E> the specific environment extension that a concrete implementation
61   * provides
62   */
63  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
64  @InterfaceStability.Evolving
65  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
66    public static final String REGION_COPROCESSOR_CONF_KEY =
67        "hbase.coprocessor.region.classes";
68    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
69        "hbase.coprocessor.regionserver.classes";
70    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
71        "hbase.coprocessor.user.region.classes";
72    public static final String MASTER_COPROCESSOR_CONF_KEY =
73        "hbase.coprocessor.master.classes";
74    public static final String WAL_COPROCESSOR_CONF_KEY =
75      "hbase.coprocessor.wal.classes";
76    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
77    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
78  
79    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
80    protected Abortable abortable;
81    /** Ordered set of loaded coprocessors with lock */
82    protected SortedSet<E> coprocessors =
83        new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
84    protected Configuration conf;
85    // unique file prefix to use for local copies of jars when classloading
86    protected String pathPrefix;
87    protected AtomicInteger loadSequence = new AtomicInteger();
88  
89    public CoprocessorHost(Abortable abortable) {
90      this.abortable = abortable;
91      this.pathPrefix = UUID.randomUUID().toString();
92    }
93  
94    /**
95     * Not to be confused with the per-object _coprocessors_ (above),
96     * coprocessorNames is static and stores the set of all coprocessors ever
97     * loaded by any thread in this JVM. It is strictly additive: coprocessors are
98     * added to coprocessorNames, by loadInstance() but are never removed, since
99     * the intention is to preserve a history of all loaded coprocessors for
100    * diagnosis in case of server crash (HBASE-4014).
101    */
102   private static Set<String> coprocessorNames =
103       Collections.synchronizedSet(new HashSet<String>());
104   public static Set<String> getLoadedCoprocessors() {
105       return coprocessorNames;
106   }
107 
108   /**
109    * Used to create a parameter to the HServerLoad constructor so that
110    * HServerLoad can provide information about the coprocessors loaded by this
111    * regionserver.
112    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
113    * to master).
114    */
115   public Set<String> getCoprocessors() {
116     Set<String> returnValue = new TreeSet<String>();
117     for(CoprocessorEnvironment e: coprocessors) {
118       returnValue.add(e.getInstance().getClass().getSimpleName());
119     }
120     return returnValue;
121   }
122 
123   /**
124    * Load system coprocessors. Read the class names from configuration.
125    * Called by constructor.
126    */
127   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
128     Class<?> implClass = null;
129 
130     // load default coprocessors from configure file
131     String[] defaultCPClasses = conf.getStrings(confKey);
132     if (defaultCPClasses == null || defaultCPClasses.length == 0)
133       return;
134 
135     int priority = Coprocessor.PRIORITY_SYSTEM;
136     List<E> configured = new ArrayList<E>();
137     for (String className : defaultCPClasses) {
138       className = className.trim();
139       if (findCoprocessor(className) != null) {
140         continue;
141       }
142       ClassLoader cl = this.getClass().getClassLoader();
143       Thread.currentThread().setContextClassLoader(cl);
144       try {
145         implClass = cl.loadClass(className);
146         configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
147         LOG.info("System coprocessor " + className + " was loaded " +
148             "successfully with priority (" + priority++ + ").");
149       } catch (Throwable t) {
150         // We always abort if system coprocessors cannot be loaded
151         abortServer(className, t);
152       }
153     }
154 
155     // add entire set to the collection for COW efficiency
156     coprocessors.addAll(configured);
157   }
158 
159   /**
160    * Load a coprocessor implementation into the host
161    * @param path path to implementation jar
162    * @param className the main class name
163    * @param priority chaining priority
164    * @param conf configuration for coprocessor
165    * @throws java.io.IOException Exception
166    */
167   public E load(Path path, String className, int priority,
168       Configuration conf) throws IOException {
169     Class<?> implClass = null;
170     LOG.debug("Loading coprocessor class " + className + " with path " +
171         path + " and priority " + priority);
172 
173     ClassLoader cl = null;
174     if (path == null) {
175       try {
176         implClass = getClass().getClassLoader().loadClass(className);
177       } catch (ClassNotFoundException e) {
178         throw new IOException("No jar path specified for " + className);
179       }
180     } else {
181       cl = CoprocessorClassLoader.getClassLoader(
182         path, getClass().getClassLoader(), pathPrefix, conf);
183       try {
184         implClass = cl.loadClass(className);
185       } catch (ClassNotFoundException e) {
186         throw new IOException("Cannot load external coprocessor class " + className, e);
187       }
188     }
189 
190     //load custom code for coprocessor
191     Thread currentThread = Thread.currentThread();
192     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
193     try{
194       // switch temporarily to the thread classloader for custom CP
195       currentThread.setContextClassLoader(cl);
196       E cpInstance = loadInstance(implClass, priority, conf);
197       return cpInstance;
198     } finally {
199       // restore the fresh (host) classloader
200       currentThread.setContextClassLoader(hostClassLoader);
201     }
202   }
203 
204   /**
205    * @param implClass Implementation class
206    * @param priority priority
207    * @param conf configuration
208    * @throws java.io.IOException Exception
209    */
210   public void load(Class<?> implClass, int priority, Configuration conf)
211       throws IOException {
212     E env = loadInstance(implClass, priority, conf);
213     coprocessors.add(env);
214   }
215 
216   /**
217    * @param implClass Implementation class
218    * @param priority priority
219    * @param conf configuration
220    * @throws java.io.IOException Exception
221    */
222   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
223       throws IOException {
224     if (!Coprocessor.class.isAssignableFrom(implClass)) {
225       throw new IOException("Configured class " + implClass.getName() + " must implement "
226           + Coprocessor.class.getName() + " interface ");
227     }
228 
229     // create the instance
230     Coprocessor impl;
231     Object o = null;
232     try {
233       o = implClass.newInstance();
234       impl = (Coprocessor)o;
235     } catch (InstantiationException e) {
236       throw new IOException(e);
237     } catch (IllegalAccessException e) {
238       throw new IOException(e);
239     }
240     // create the environment
241     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
242     if (env instanceof Environment) {
243       ((Environment)env).startup();
244     }
245     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
246     // if server (master or regionserver) aborts.
247     coprocessorNames.add(implClass.getName());
248     return env;
249   }
250 
251   /**
252    * Called when a new Coprocessor class is loaded
253    */
254   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
255       int priority, int sequence, Configuration conf);
256 
257   public void shutdown(CoprocessorEnvironment e) {
258     if (e instanceof Environment) {
259       if (LOG.isDebugEnabled()) {
260         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
261       }
262       ((Environment)e).shutdown();
263     } else {
264       LOG.warn("Shutdown called on unknown environment: "+
265           e.getClass().getName());
266     }
267   }
268 
269   /**
270    * Find a coprocessor implementation by class name
271    * @param className the class name
272    * @return the coprocessor, or null if not found
273    */
274   public Coprocessor findCoprocessor(String className) {
275     for (E env: coprocessors) {
276       if (env.getInstance().getClass().getName().equals(className) ||
277           env.getInstance().getClass().getSimpleName().equals(className)) {
278         return env.getInstance();
279       }
280     }
281     return null;
282   }
283 
284   /**
285    * Find list of coprocessors that extend/implement the given class/interface
286    * @param cls the class/interface to look for
287    * @return the list of coprocessors, or null if not found
288    */
289   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
290     ArrayList<T> ret = new ArrayList<T>();
291 
292     for (E env: coprocessors) {
293       Coprocessor cp = env.getInstance();
294 
295       if(cp != null) {
296         if (cls.isAssignableFrom(cp.getClass())) {
297           ret.add((T)cp);
298         }
299       }
300     }
301     return ret;
302   }
303 
304   /**
305    * Find a coprocessor environment by class name
306    * @param className the class name
307    * @return the coprocessor, or null if not found
308    */
309   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
310     for (E env: coprocessors) {
311       if (env.getInstance().getClass().getName().equals(className) ||
312           env.getInstance().getClass().getSimpleName().equals(className)) {
313         return env;
314       }
315     }
316     return null;
317   }
318 
319   /**
320    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
321    * jar files.
322    * @return A set of ClassLoader instances
323    */
324   Set<ClassLoader> getExternalClassLoaders() {
325     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
326     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
327     for (E env : coprocessors) {
328       ClassLoader cl = env.getInstance().getClass().getClassLoader();
329       if (cl != systemClassLoader ){
330         //do not include system classloader
331         externalClassLoaders.add(cl);
332       }
333     }
334     return externalClassLoaders;
335   }
336 
337   /**
338    * Environment priority comparator.
339    * Coprocessors are chained in sorted order.
340    */
341   static class EnvironmentPriorityComparator
342       implements Comparator<CoprocessorEnvironment> {
343     public int compare(final CoprocessorEnvironment env1,
344         final CoprocessorEnvironment env2) {
345       if (env1.getPriority() < env2.getPriority()) {
346         return -1;
347       } else if (env1.getPriority() > env2.getPriority()) {
348         return 1;
349       }
350       if (env1.getLoadSequence() < env2.getLoadSequence()) {
351         return -1;
352       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
353         return 1;
354       }
355       return 0;
356     }
357   }
358 
359   /**
360    * Encapsulation of the environment of each coprocessor
361    */
362   public static class Environment implements CoprocessorEnvironment {
363 
364     /** The coprocessor */
365     public Coprocessor impl;
366     /** Chaining priority */
367     protected int priority = Coprocessor.PRIORITY_USER;
368     /** Current coprocessor state */
369     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
370     /** Accounting for tables opened by the coprocessor */
371     protected List<HTableInterface> openTables =
372       Collections.synchronizedList(new ArrayList<HTableInterface>());
373     private int seq;
374     private Configuration conf;
375     private ClassLoader classLoader;
376 
377     /**
378      * Constructor
379      * @param impl the coprocessor instance
380      * @param priority chaining priority
381      */
382     public Environment(final Coprocessor impl, final int priority,
383         final int seq, final Configuration conf) {
384       this.impl = impl;
385       this.classLoader = impl.getClass().getClassLoader();
386       this.priority = priority;
387       this.state = Coprocessor.State.INSTALLED;
388       this.seq = seq;
389       this.conf = conf;
390     }
391 
392     /** Initialize the environment */
393     public void startup() throws IOException {
394       if (state == Coprocessor.State.INSTALLED ||
395           state == Coprocessor.State.STOPPED) {
396         state = Coprocessor.State.STARTING;
397         Thread currentThread = Thread.currentThread();
398         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
399         try {
400           currentThread.setContextClassLoader(this.getClassLoader());
401           impl.start(this);
402           state = Coprocessor.State.ACTIVE;
403         } finally {
404           currentThread.setContextClassLoader(hostClassLoader);
405         }
406       } else {
407         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
408             " because not inactive (state="+state.toString()+")");
409       }
410     }
411 
412     /** Clean up the environment */
413     protected void shutdown() {
414       if (state == Coprocessor.State.ACTIVE) {
415         state = Coprocessor.State.STOPPING;
416         Thread currentThread = Thread.currentThread();
417         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
418         try {
419           currentThread.setContextClassLoader(this.getClassLoader());
420           impl.stop(this);
421           state = Coprocessor.State.STOPPED;
422         } catch (IOException ioe) {
423           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
424         } finally {
425           currentThread.setContextClassLoader(hostClassLoader);
426         }
427       } else {
428         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
429             " because not active (state="+state.toString()+")");
430       }
431       // clean up any table references
432       for (HTableInterface table: openTables) {
433         try {
434           ((HTableWrapper)table).internalClose();
435         } catch (IOException e) {
436           // nothing can be done here
437           LOG.warn("Failed to close " +
438               Bytes.toStringBinary(table.getTableName()), e);
439         }
440       }
441     }
442 
443     @Override
444     public Coprocessor getInstance() {
445       return impl;
446     }
447 
448     @Override
449     public ClassLoader getClassLoader() {
450       return classLoader;
451     }
452 
453     @Override
454     public int getPriority() {
455       return priority;
456     }
457 
458     @Override
459     public int getLoadSequence() {
460       return seq;
461     }
462 
463     /** @return the coprocessor environment version */
464     @Override
465     public int getVersion() {
466       return Coprocessor.VERSION;
467     }
468 
469     /** @return the HBase release */
470     @Override
471     public String getHBaseVersion() {
472       return VersionInfo.getVersion();
473     }
474 
475     @Override
476     public Configuration getConfiguration() {
477       return conf;
478     }
479 
480     /**
481      * Open a table from within the Coprocessor environment
482      * @param tableName the table name
483      * @return an interface for manipulating the table
484      * @exception java.io.IOException Exception
485      */
486     @Override
487     public HTableInterface getTable(TableName tableName) throws IOException {
488       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
489     }
490 
491     /**
492      * Open a table from within the Coprocessor environment
493      * @param tableName the table name
494      * @return an interface for manipulating the table
495      * @exception java.io.IOException Exception
496      */
497     @Override
498     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
499       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
500     }
501   }
502 
503   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
504     abortServer(environment.getInstance().getClass().getName(), e);
505   }
506 
507   protected void abortServer(final String coprocessorName, final Throwable e) {
508     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
509     LOG.error(message, e);
510     if (abortable != null) {
511       abortable.abort(message, e);
512     } else {
513       LOG.warn("No available Abortable, process was not aborted");
514     }
515   }
516 
517   /**
518    * This is used by coprocessor hooks which are declared to throw IOException
519    * (or its subtypes). For such hooks, we should handle throwable objects
520    * depending on the Throwable's type. Those which are instances of
521    * IOException should be passed on to the client. This is in conformance with
522    * the HBase idiom regarding IOException: that it represents a circumstance
523    * that should be passed along to the client for its own handling. For
524    * example, a coprocessor that implements access controls would throw a
525    * subclass of IOException, such as AccessDeniedException, in its preGet()
526    * method to prevent an unauthorized client's performing a Get on a particular
527    * table.
528    * @param env Coprocessor Environment
529    * @param e Throwable object thrown by coprocessor.
530    * @exception IOException Exception
531    */
532   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
533       throws IOException {
534     if (e instanceof IOException) {
535       throw (IOException)e;
536     }
537     // If we got here, e is not an IOException. A loaded coprocessor has a
538     // fatal bug, and the server (master or regionserver) should remove the
539     // faulty coprocessor from its set of active coprocessors. Setting
540     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
541     // which may be useful in development and testing environments where
542     // 'failing fast' for error analysis is desired.
543     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
544       // server is configured to abort.
545       abortServer(env, e);
546     } else {
547       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
548           "environment because it threw:  " + e,e);
549       coprocessors.remove(env);
550       try {
551         shutdown(env);
552       } catch (Exception x) {
553         LOG.error("Uncaught exception when shutting down coprocessor '"
554             + env.toString() + "'", x);
555       }
556       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
557           "' threw: '" + e + "' and has been removed from the active " +
558           "coprocessor set.", e);
559     }
560   }
561 
562   /**
563    * Used to gracefully handle fallback to deprecated methods when we
564    * evolve coprocessor APIs.
565    *
566    * When a particular Coprocessor API is updated to change methods, hosts can support fallback
567    * to the deprecated API by using this method to determine if an instance implements the new API.
568    * In the event that said support is partial, then in the face of a runtime issue that prevents
569    * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
570    *
571    * For examples of this in action, see the implementation of
572    * <ul>
573    *   <li>{@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
574    *   <li>{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost}
575    * </ul>
576    *
577    * @param clazz Coprocessor you wish to evaluate
578    * @param methodName the name of the non-deprecated method version
579    * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
580    *     declared.
581    */
582   @InterfaceAudience.Private
583   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
584       final String methodName, final Class<?>... parameterTypes) {
585     boolean useLegacy;
586     // Use reflection to see if they implement the non-deprecated version
587     try {
588       clazz.getDeclaredMethod(methodName, parameterTypes);
589       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
590           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
591       useLegacy = false;
592     } catch (NoSuchMethodException exception) {
593       useLegacy = true;
594     } catch (SecurityException exception) {
595       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
596           "' requires legacy support; assuming it does. If you get later errors about legacy " +
597           "coprocessor use, consider updating your security policy to allow access to the package" +
598           " and declared members of your implementation.");
599       LOG.debug("Details of Security Manager rejection.", exception);
600       useLegacy = true;
601     }
602     return useLegacy;
603   }
604 
605   /**
606    * Used to limit legacy handling to once per Coprocessor class per classloader.
607    */
608   private static final Set<Class<? extends Coprocessor>> legacyWarning =
609       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
610           new Comparator<Class<? extends Coprocessor>>() {
611             @Override
612             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
613               if (c1.equals(c2)) {
614                 return 0;
615               }
616               return c1.getName().compareTo(c2.getName());
617             }
618           });
619 
620   /**
621    * limits the amount of logging to once per coprocessor class.
622    * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
623    * prevents properly supporting the legacy version of a coprocessor API.
624    * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
625    */
626   @InterfaceAudience.Private
627   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
628     if(legacyWarning.add(clazz)) {
629       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
630           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
631           "'. Details of the problem: " + message);
632     }
633   }
634 }