View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coprocessor;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.ConcurrentSkipListSet;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.classification.InterfaceStability;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Abortable;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.HTableInterface;
50  import org.apache.hadoop.hbase.client.HTableWrapper;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
53  import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
54  import org.apache.hadoop.hbase.util.VersionInfo;
55  
56  /**
57   * Provides the common setup framework and runtime services for coprocessor
58   * invocation from HBase services.
59   * @param <E> the specific environment extension that a concrete implementation
60   * provides
61   */
62  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
63  @InterfaceStability.Evolving
64  public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
65    public static final String REGION_COPROCESSOR_CONF_KEY =
66        "hbase.coprocessor.region.classes";
67    public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
68        "hbase.coprocessor.regionserver.classes";
69    public static final String USER_REGION_COPROCESSOR_CONF_KEY =
70        "hbase.coprocessor.user.region.classes";
71    public static final String MASTER_COPROCESSOR_CONF_KEY =
72        "hbase.coprocessor.master.classes";
73    public static final String WAL_COPROCESSOR_CONF_KEY =
74      "hbase.coprocessor.wal.classes";
75    public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
76    public static final boolean DEFAULT_ABORT_ON_ERROR = true;
77  
78    private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
79    protected Abortable abortable;
80    /** Ordered set of loaded coprocessors with lock */
81    protected SortedSet<E> coprocessors =
82        new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
83    protected Configuration conf;
84    // unique file prefix to use for local copies of jars when classloading
85    protected String pathPrefix;
86    protected AtomicInteger loadSequence = new AtomicInteger();
87  
88    public CoprocessorHost(Abortable abortable) {
89      this.abortable = abortable;
90      this.pathPrefix = UUID.randomUUID().toString();
91    }
92  
93    /**
94     * Not to be confused with the per-object _coprocessors_ (above),
95     * coprocessorNames is static and stores the set of all coprocessors ever
96     * loaded by any thread in this JVM. It is strictly additive: coprocessors are
97     * added to coprocessorNames, by loadInstance() but are never removed, since
98     * the intention is to preserve a history of all loaded coprocessors for
99     * diagnosis in case of server crash (HBASE-4014).
100    */
101   private static Set<String> coprocessorNames =
102       Collections.synchronizedSet(new HashSet<String>());
103   public static Set<String> getLoadedCoprocessors() {
104       return coprocessorNames;
105   }
106 
107   /**
108    * Used to create a parameter to the HServerLoad constructor so that
109    * HServerLoad can provide information about the coprocessors loaded by this
110    * regionserver.
111    * (HBASE-4070: Improve region server metrics to report loaded coprocessors
112    * to master).
113    */
114   public Set<String> getCoprocessors() {
115     Set<String> returnValue = new TreeSet<String>();
116     for(CoprocessorEnvironment e: coprocessors) {
117       returnValue.add(e.getInstance().getClass().getSimpleName());
118     }
119     return returnValue;
120   }
121 
122   /**
123    * Load system coprocessors. Read the class names from configuration.
124    * Called by constructor.
125    */
126   protected void loadSystemCoprocessors(Configuration conf, String confKey) {
127     Class<?> implClass = null;
128 
129     // load default coprocessors from configure file
130     String[] defaultCPClasses = conf.getStrings(confKey);
131     if (defaultCPClasses == null || defaultCPClasses.length == 0)
132       return;
133 
134     int priority = Coprocessor.PRIORITY_SYSTEM;
135     List<E> configured = new ArrayList<E>();
136     for (String className : defaultCPClasses) {
137       className = className.trim();
138       if (findCoprocessor(className) != null) {
139         continue;
140       }
141       ClassLoader cl = this.getClass().getClassLoader();
142       Thread.currentThread().setContextClassLoader(cl);
143       try {
144         implClass = cl.loadClass(className);
145         configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
146         LOG.info("System coprocessor " + className + " was loaded " +
147             "successfully with priority (" + priority++ + ").");
148       } catch (Throwable t) {
149         // We always abort if system coprocessors cannot be loaded
150         abortServer(className, t);
151       }
152     }
153 
154     // add entire set to the collection for COW efficiency
155     coprocessors.addAll(configured);
156   }
157 
158   /**
159    * Load a coprocessor implementation into the host
160    * @param path path to implementation jar
161    * @param className the main class name
162    * @param priority chaining priority
163    * @param conf configuration for coprocessor
164    * @throws java.io.IOException Exception
165    */
166   public E load(Path path, String className, int priority,
167       Configuration conf) throws IOException {
168     Class<?> implClass = null;
169     LOG.debug("Loading coprocessor class " + className + " with path " +
170         path + " and priority " + priority);
171 
172     ClassLoader cl = null;
173     if (path == null) {
174       try {
175         implClass = getClass().getClassLoader().loadClass(className);
176       } catch (ClassNotFoundException e) {
177         throw new IOException("No jar path specified for " + className);
178       }
179     } else {
180       cl = CoprocessorClassLoader.getClassLoader(
181         path, getClass().getClassLoader(), pathPrefix, conf);
182       try {
183         implClass = cl.loadClass(className);
184       } catch (ClassNotFoundException e) {
185         throw new IOException("Cannot load external coprocessor class " + className, e);
186       }
187     }
188 
189     //load custom code for coprocessor
190     Thread currentThread = Thread.currentThread();
191     ClassLoader hostClassLoader = currentThread.getContextClassLoader();
192     try{
193       // switch temporarily to the thread classloader for custom CP
194       currentThread.setContextClassLoader(cl);
195       E cpInstance = loadInstance(implClass, priority, conf);
196       return cpInstance;
197     } finally {
198       // restore the fresh (host) classloader
199       currentThread.setContextClassLoader(hostClassLoader);
200     }
201   }
202 
203   /**
204    * @param implClass Implementation class
205    * @param priority priority
206    * @param conf configuration
207    * @throws java.io.IOException Exception
208    */
209   public void load(Class<?> implClass, int priority, Configuration conf)
210       throws IOException {
211     E env = loadInstance(implClass, priority, conf);
212     coprocessors.add(env);
213   }
214 
215   /**
216    * @param implClass Implementation class
217    * @param priority priority
218    * @param conf configuration
219    * @throws java.io.IOException Exception
220    */
221   public E loadInstance(Class<?> implClass, int priority, Configuration conf)
222       throws IOException {
223     if (!Coprocessor.class.isAssignableFrom(implClass)) {
224       throw new IOException("Configured class " + implClass.getName() + " must implement "
225           + Coprocessor.class.getName() + " interface ");
226     }
227 
228     // create the instance
229     Coprocessor impl;
230     Object o = null;
231     try {
232       o = implClass.newInstance();
233       impl = (Coprocessor)o;
234     } catch (InstantiationException e) {
235       throw new IOException(e);
236     } catch (IllegalAccessException e) {
237       throw new IOException(e);
238     }
239     // create the environment
240     E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
241     if (env instanceof Environment) {
242       ((Environment)env).startup();
243     }
244     // HBASE-4014: maintain list of loaded coprocessors for later crash analysis
245     // if server (master or regionserver) aborts.
246     coprocessorNames.add(implClass.getName());
247     return env;
248   }
249 
250   /**
251    * Called when a new Coprocessor class is loaded
252    */
253   public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
254       int priority, int sequence, Configuration conf);
255 
256   public void shutdown(CoprocessorEnvironment e) {
257     if (e instanceof Environment) {
258       if (LOG.isDebugEnabled()) {
259         LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
260       }
261       ((Environment)e).shutdown();
262     } else {
263       LOG.warn("Shutdown called on unknown environment: "+
264           e.getClass().getName());
265     }
266   }
267 
268   /**
269    * Find a coprocessor implementation by class name
270    * @param className the class name
271    * @return the coprocessor, or null if not found
272    */
273   public Coprocessor findCoprocessor(String className) {
274     for (E env: coprocessors) {
275       if (env.getInstance().getClass().getName().equals(className) ||
276           env.getInstance().getClass().getSimpleName().equals(className)) {
277         return env.getInstance();
278       }
279     }
280     return null;
281   }
282 
283   /**
284    * Find list of coprocessors that extend/implement the given class/interface
285    * @param cls the class/interface to look for
286    * @return the list of coprocessors, or null if not found
287    */
288   public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
289     ArrayList<T> ret = new ArrayList<T>();
290 
291     for (E env: coprocessors) {
292       Coprocessor cp = env.getInstance();
293 
294       if(cp != null) {
295         if (cls.isAssignableFrom(cp.getClass())) {
296           ret.add((T)cp);
297         }
298       }
299     }
300     return ret;
301   }
302 
303   /**
304    * Find a coprocessor environment by class name
305    * @param className the class name
306    * @return the coprocessor, or null if not found
307    */
308   public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
309     for (E env: coprocessors) {
310       if (env.getInstance().getClass().getName().equals(className) ||
311           env.getInstance().getClass().getSimpleName().equals(className)) {
312         return env;
313       }
314     }
315     return null;
316   }
317 
318   /**
319    * Retrieves the set of classloaders used to instantiate Coprocessor classes defined in external
320    * jar files.
321    * @return A set of ClassLoader instances
322    */
323   Set<ClassLoader> getExternalClassLoaders() {
324     Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
325     final ClassLoader systemClassLoader = this.getClass().getClassLoader();
326     for (E env : coprocessors) {
327       ClassLoader cl = env.getInstance().getClass().getClassLoader();
328       if (cl != systemClassLoader ){
329         //do not include system classloader
330         externalClassLoaders.add(cl);
331       }
332     }
333     return externalClassLoaders;
334   }
335 
336   /**
337    * Environment priority comparator.
338    * Coprocessors are chained in sorted order.
339    */
340   static class EnvironmentPriorityComparator
341       implements Comparator<CoprocessorEnvironment> {
342     public int compare(final CoprocessorEnvironment env1,
343         final CoprocessorEnvironment env2) {
344       if (env1.getPriority() < env2.getPriority()) {
345         return -1;
346       } else if (env1.getPriority() > env2.getPriority()) {
347         return 1;
348       }
349       if (env1.getLoadSequence() < env2.getLoadSequence()) {
350         return -1;
351       } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
352         return 1;
353       }
354       return 0;
355     }
356   }
357 
358   /**
359    * Encapsulation of the environment of each coprocessor
360    */
361   public static class Environment implements CoprocessorEnvironment {
362 
363     /** The coprocessor */
364     public Coprocessor impl;
365     /** Chaining priority */
366     protected int priority = Coprocessor.PRIORITY_USER;
367     /** Current coprocessor state */
368     Coprocessor.State state = Coprocessor.State.UNINSTALLED;
369     /** Accounting for tables opened by the coprocessor */
370     protected List<HTableInterface> openTables =
371       Collections.synchronizedList(new ArrayList<HTableInterface>());
372     private int seq;
373     private Configuration conf;
374     private ClassLoader classLoader;
375 
376     /**
377      * Constructor
378      * @param impl the coprocessor instance
379      * @param priority chaining priority
380      */
381     public Environment(final Coprocessor impl, final int priority,
382         final int seq, final Configuration conf) {
383       this.impl = impl;
384       this.classLoader = impl.getClass().getClassLoader();
385       this.priority = priority;
386       this.state = Coprocessor.State.INSTALLED;
387       this.seq = seq;
388       this.conf = conf;
389     }
390 
391     /** Initialize the environment */
392     public void startup() throws IOException {
393       if (state == Coprocessor.State.INSTALLED ||
394           state == Coprocessor.State.STOPPED) {
395         state = Coprocessor.State.STARTING;
396         Thread currentThread = Thread.currentThread();
397         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
398         try {
399           currentThread.setContextClassLoader(this.getClassLoader());
400           impl.start(this);
401           state = Coprocessor.State.ACTIVE;
402         } finally {
403           currentThread.setContextClassLoader(hostClassLoader);
404         }
405       } else {
406         LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
407             " because not inactive (state="+state.toString()+")");
408       }
409     }
410 
411     /** Clean up the environment */
412     protected void shutdown() {
413       if (state == Coprocessor.State.ACTIVE) {
414         state = Coprocessor.State.STOPPING;
415         Thread currentThread = Thread.currentThread();
416         ClassLoader hostClassLoader = currentThread.getContextClassLoader();
417         try {
418           currentThread.setContextClassLoader(this.getClassLoader());
419           impl.stop(this);
420           state = Coprocessor.State.STOPPED;
421         } catch (IOException ioe) {
422           LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
423         } finally {
424           currentThread.setContextClassLoader(hostClassLoader);
425         }
426       } else {
427         LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
428             " because not active (state="+state.toString()+")");
429       }
430       // clean up any table references
431       for (HTableInterface table: openTables) {
432         try {
433           ((HTableWrapper)table).internalClose();
434         } catch (IOException e) {
435           // nothing can be done here
436           LOG.warn("Failed to close " +
437               Bytes.toStringBinary(table.getTableName()), e);
438         }
439       }
440     }
441 
442     @Override
443     public Coprocessor getInstance() {
444       return impl;
445     }
446 
447     @Override
448     public ClassLoader getClassLoader() {
449       return classLoader;
450     }
451 
452     @Override
453     public int getPriority() {
454       return priority;
455     }
456 
457     @Override
458     public int getLoadSequence() {
459       return seq;
460     }
461 
462     /** @return the coprocessor environment version */
463     @Override
464     public int getVersion() {
465       return Coprocessor.VERSION;
466     }
467 
468     /** @return the HBase release */
469     @Override
470     public String getHBaseVersion() {
471       return VersionInfo.getVersion();
472     }
473 
474     @Override
475     public Configuration getConfiguration() {
476       return conf;
477     }
478 
479     /**
480      * Open a table from within the Coprocessor environment
481      * @param tableName the table name
482      * @return an interface for manipulating the table
483      * @exception java.io.IOException Exception
484      */
485     @Override
486     public HTableInterface getTable(TableName tableName) throws IOException {
487       return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
488     }
489 
490     /**
491      * Open a table from within the Coprocessor environment
492      * @param tableName the table name
493      * @return an interface for manipulating the table
494      * @exception java.io.IOException Exception
495      */
496     @Override
497     public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
498       return HTableWrapper.createWrapper(openTables, tableName, this, pool);
499     }
500   }
501 
502   protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
503     abortServer(environment.getInstance().getClass().getName(), e);
504   }
505 
506   protected void abortServer(final String coprocessorName, final Throwable e) {
507     String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
508     LOG.error(message, e);
509     if (abortable != null) {
510       abortable.abort(message, e);
511     } else {
512       LOG.warn("No available Abortable, process was not aborted");
513     }
514   }
515 
516   /**
517    * This is used by coprocessor hooks which are declared to throw IOException
518    * (or its subtypes). For such hooks, we should handle throwable objects
519    * depending on the Throwable's type. Those which are instances of
520    * IOException should be passed on to the client. This is in conformance with
521    * the HBase idiom regarding IOException: that it represents a circumstance
522    * that should be passed along to the client for its own handling. For
523    * example, a coprocessor that implements access controls would throw a
524    * subclass of IOException, such as AccessDeniedException, in its preGet()
525    * method to prevent an unauthorized client's performing a Get on a particular
526    * table.
527    * @param env Coprocessor Environment
528    * @param e Throwable object thrown by coprocessor.
529    * @exception IOException Exception
530    */
531   protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
532       throws IOException {
533     if (e instanceof IOException) {
534       throw (IOException)e;
535     }
536     // If we got here, e is not an IOException. A loaded coprocessor has a
537     // fatal bug, and the server (master or regionserver) should remove the
538     // faulty coprocessor from its set of active coprocessors. Setting
539     // 'hbase.coprocessor.abortonerror' to true will cause abortServer(),
540     // which may be useful in development and testing environments where
541     // 'failing fast' for error analysis is desired.
542     if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
543       // server is configured to abort.
544       abortServer(env, e);
545     } else {
546       LOG.error("Removing coprocessor '" + env.toString() + "' from " +
547           "environment because it threw:  " + e,e);
548       coprocessors.remove(env);
549       try {
550         shutdown(env);
551       } catch (Exception x) {
552         LOG.error("Uncaught exception when shutting down coprocessor '"
553             + env.toString() + "'", x);
554       }
555       throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
556           "' threw: '" + e + "' and has been removed from the active " +
557           "coprocessor set.", e);
558     }
559   }
560 
561   /**
562    * Used to gracefully handle fallback to deprecated methods when we
563    * evolve coprocessor APIs.
564    *
565    * When a particular Coprocessor API is updated to change methods, hosts can support fallback
566    * to the deprecated API by using this method to determine if an instance implements the new API.
567    * In the event that said support is partial, then in the face of a runtime issue that prevents
568    * proper operation {@link #legacyWarning(Class, String)} should be used to let operators know.
569    *
570    * For examples of this in action, see the implementation of
571    * <ul>
572    *   <li>{@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
573    *   <li>{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost}
574    * </ul>
575    *
576    * @param clazz Coprocessor you wish to evaluate
577    * @param methodName the name of the non-deprecated method version
578    * @param parameterTypes the Class of the non-deprecated method's arguments in the order they are
579    *     declared.
580    */
581   @InterfaceAudience.Private
582   protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
583       final String methodName, final Class<?>... parameterTypes) {
584     boolean useLegacy;
585     // Use reflection to see if they implement the non-deprecated version
586     try {
587       clazz.getDeclaredMethod(methodName, parameterTypes);
588       LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
589           "signature. Skipping legacy support for invocations in '" + clazz +"'.");
590       useLegacy = false;
591     } catch (NoSuchMethodException exception) {
592       useLegacy = true;
593     } catch (SecurityException exception) {
594       LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
595           "' requires legacy support; assuming it does. If you get later errors about legacy " +
596           "coprocessor use, consider updating your security policy to allow access to the package" +
597           " and declared members of your implementation.");
598       LOG.debug("Details of Security Manager rejection.", exception);
599       useLegacy = true;
600     }
601     return useLegacy;
602   }
603 
604   /**
605    * Used to limit legacy handling to once per Coprocessor class per classloader.
606    */
607   private static final Set<Class<? extends Coprocessor>> legacyWarning =
608       new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
609           new Comparator<Class<? extends Coprocessor>>() {
610             @Override
611             public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
612               if (c1.equals(c2)) {
613                 return 0;
614               }
615               return c1.getName().compareTo(c2.getName());
616             }
617           });
618 
619   /**
620    * limits the amount of logging to once per coprocessor class.
621    * Used in concert with {@link #useLegacyMethod(Class, String, Class[])} when a runtime issue
622    * prevents properly supporting the legacy version of a coprocessor API.
623    * Since coprocessors can be in tight loops this serves to limit the amount of log spam we create.
624    */
625   @InterfaceAudience.Private
626   protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
627     if(legacyWarning.add(clazz)) {
628       LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
629           " deprecated API. Your coprocessor will not see these events.  Please update '" + clazz +
630           "'. Details of the problem: " + message);
631     }
632   }
633 }