1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.SortedSet;
30 import java.util.TreeSet;
31 import java.util.UUID;
32 import java.util.concurrent.ConcurrentSkipListSet;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.classification.InterfaceStability;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Abortable;
43 import org.apache.hadoop.hbase.Coprocessor;
44 import org.apache.hadoop.hbase.CoprocessorEnvironment;
45 import org.apache.hadoop.hbase.DoNotRetryIOException;
46 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.HTable;
49 import org.apache.hadoop.hbase.client.HTableInterface;
50 import org.apache.hadoop.hbase.client.HTableWrapper;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
53 import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
54 import org.apache.hadoop.hbase.util.VersionInfo;
55
56
57
58
59
60
61
62 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
63 @InterfaceStability.Evolving
64 public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
65 public static final String REGION_COPROCESSOR_CONF_KEY =
66 "hbase.coprocessor.region.classes";
67 public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
68 "hbase.coprocessor.regionserver.classes";
69 public static final String USER_REGION_COPROCESSOR_CONF_KEY =
70 "hbase.coprocessor.user.region.classes";
71 public static final String MASTER_COPROCESSOR_CONF_KEY =
72 "hbase.coprocessor.master.classes";
73 public static final String WAL_COPROCESSOR_CONF_KEY =
74 "hbase.coprocessor.wal.classes";
75 public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
76 public static final boolean DEFAULT_ABORT_ON_ERROR = true;
77 public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
78 public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
79 public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
80 "hbase.coprocessor.user.enabled";
81 public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
82
83 private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
84 protected Abortable abortable;
85
86 protected SortedSet<E> coprocessors =
87 new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
88 protected Configuration conf;
89
90 protected String pathPrefix;
91 protected AtomicInteger loadSequence = new AtomicInteger();
92
93 public CoprocessorHost(Abortable abortable) {
94 this.abortable = abortable;
95 this.pathPrefix = UUID.randomUUID().toString();
96 }
97
98
99
100
101
102
103
104
105
106 private static Set<String> coprocessorNames =
107 Collections.synchronizedSet(new HashSet<String>());
108
109 public static Set<String> getLoadedCoprocessors() {
110 synchronized (coprocessorNames) {
111 return new HashSet(coprocessorNames);
112 }
113 }
114
115
116
117
118
119
120
121
122 public Set<String> getCoprocessors() {
123 Set<String> returnValue = new TreeSet<String>();
124 for (CoprocessorEnvironment e: coprocessors) {
125 returnValue.add(e.getInstance().getClass().getSimpleName());
126 }
127 return returnValue;
128 }
129
130
131
132
133
134 protected void loadSystemCoprocessors(Configuration conf, String confKey) {
135 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
136 DEFAULT_COPROCESSORS_ENABLED);
137 if (!coprocessorsEnabled) {
138 return;
139 }
140
141 Class<?> implClass = null;
142
143
144 String[] defaultCPClasses = conf.getStrings(confKey);
145 if (defaultCPClasses == null || defaultCPClasses.length == 0)
146 return;
147
148 int priority = Coprocessor.PRIORITY_SYSTEM;
149 for (String className : defaultCPClasses) {
150 className = className.trim();
151 if (findCoprocessor(className) != null) {
152
153 LOG.warn("Attempted duplicate loading of " + className + "; skipped");
154 continue;
155 }
156 ClassLoader cl = this.getClass().getClassLoader();
157 Thread.currentThread().setContextClassLoader(cl);
158 try {
159 implClass = cl.loadClass(className);
160
161
162 this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
163 LOG.info("System coprocessor " + className + " was loaded " +
164 "successfully with priority (" + priority++ + ").");
165 } catch (Throwable t) {
166
167 abortServer(className, t);
168 }
169 }
170 }
171
172
173
174
175
176
177
178
179
180 public E load(Path path, String className, int priority,
181 Configuration conf) throws IOException {
182 Class<?> implClass = null;
183 LOG.debug("Loading coprocessor class " + className + " with path " +
184 path + " and priority " + priority);
185
186 ClassLoader cl = null;
187 if (path == null) {
188 try {
189 implClass = getClass().getClassLoader().loadClass(className);
190 } catch (ClassNotFoundException e) {
191 throw new IOException("No jar path specified for " + className);
192 }
193 } else {
194 cl = CoprocessorClassLoader.getClassLoader(
195 path, getClass().getClassLoader(), pathPrefix, conf);
196 try {
197 implClass = cl.loadClass(className);
198 } catch (ClassNotFoundException e) {
199 throw new IOException("Cannot load external coprocessor class " + className, e);
200 }
201 }
202
203
204 Thread currentThread = Thread.currentThread();
205 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
206 try{
207
208 currentThread.setContextClassLoader(cl);
209 E cpInstance = loadInstance(implClass, priority, conf);
210 return cpInstance;
211 } finally {
212
213 currentThread.setContextClassLoader(hostClassLoader);
214 }
215 }
216
217
218
219
220
221
222
223 public void load(Class<?> implClass, int priority, Configuration conf)
224 throws IOException {
225 E env = loadInstance(implClass, priority, conf);
226 coprocessors.add(env);
227 }
228
229
230
231
232
233
234
235 public E loadInstance(Class<?> implClass, int priority, Configuration conf)
236 throws IOException {
237 if (!Coprocessor.class.isAssignableFrom(implClass)) {
238 throw new IOException("Configured class " + implClass.getName() + " must implement "
239 + Coprocessor.class.getName() + " interface ");
240 }
241
242
243 Coprocessor impl;
244 Object o = null;
245 try {
246 o = implClass.newInstance();
247 impl = (Coprocessor)o;
248 } catch (InstantiationException e) {
249 throw new IOException(e);
250 } catch (IllegalAccessException e) {
251 throw new IOException(e);
252 }
253
254 E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
255 if (env instanceof Environment) {
256 ((Environment)env).startup();
257 }
258
259
260 coprocessorNames.add(implClass.getName());
261 return env;
262 }
263
264
265
266
267 public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
268 int priority, int sequence, Configuration conf);
269
270 public void shutdown(CoprocessorEnvironment e) {
271 if (e instanceof Environment) {
272 if (LOG.isDebugEnabled()) {
273 LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
274 }
275 ((Environment)e).shutdown();
276 } else {
277 LOG.warn("Shutdown called on unknown environment: "+
278 e.getClass().getName());
279 }
280 }
281
282
283
284
285
286
287 public Coprocessor findCoprocessor(String className) {
288 for (E env: coprocessors) {
289 if (env.getInstance().getClass().getName().equals(className) ||
290 env.getInstance().getClass().getSimpleName().equals(className)) {
291 return env.getInstance();
292 }
293 }
294 return null;
295 }
296
297
298
299
300
301
302 public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
303 ArrayList<T> ret = new ArrayList<T>();
304
305 for (E env: coprocessors) {
306 Coprocessor cp = env.getInstance();
307
308 if(cp != null) {
309 if (cls.isAssignableFrom(cp.getClass())) {
310 ret.add((T)cp);
311 }
312 }
313 }
314 return ret;
315 }
316
317
318
319
320
321
322 public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
323 for (E env: coprocessors) {
324 if (env.getInstance().getClass().getName().equals(className) ||
325 env.getInstance().getClass().getSimpleName().equals(className)) {
326 return env;
327 }
328 }
329 return null;
330 }
331
332
333
334
335
336
337 Set<ClassLoader> getExternalClassLoaders() {
338 Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
339 final ClassLoader systemClassLoader = this.getClass().getClassLoader();
340 for (E env : coprocessors) {
341 ClassLoader cl = env.getInstance().getClass().getClassLoader();
342 if (cl != systemClassLoader ){
343
344 externalClassLoaders.add(cl);
345 }
346 }
347 return externalClassLoaders;
348 }
349
350
351
352
353
354 static class EnvironmentPriorityComparator
355 implements Comparator<CoprocessorEnvironment> {
356 @Override
357 public int compare(final CoprocessorEnvironment env1,
358 final CoprocessorEnvironment env2) {
359 if (env1.getPriority() < env2.getPriority()) {
360 return -1;
361 } else if (env1.getPriority() > env2.getPriority()) {
362 return 1;
363 }
364 if (env1.getLoadSequence() < env2.getLoadSequence()) {
365 return -1;
366 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
367 return 1;
368 }
369 return 0;
370 }
371 }
372
373
374
375
376 public static class Environment implements CoprocessorEnvironment {
377
378
379 public Coprocessor impl;
380
381 protected int priority = Coprocessor.PRIORITY_USER;
382
383 Coprocessor.State state = Coprocessor.State.UNINSTALLED;
384
385 protected List<HTableInterface> openTables =
386 Collections.synchronizedList(new ArrayList<HTableInterface>());
387 private int seq;
388 private Configuration conf;
389 private ClassLoader classLoader;
390
391
392
393
394
395
396 public Environment(final Coprocessor impl, final int priority,
397 final int seq, final Configuration conf) {
398 this.impl = impl;
399 this.classLoader = impl.getClass().getClassLoader();
400 this.priority = priority;
401 this.state = Coprocessor.State.INSTALLED;
402 this.seq = seq;
403 this.conf = conf;
404 }
405
406
407 public void startup() throws IOException {
408 if (state == Coprocessor.State.INSTALLED ||
409 state == Coprocessor.State.STOPPED) {
410 state = Coprocessor.State.STARTING;
411 Thread currentThread = Thread.currentThread();
412 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
413 try {
414 currentThread.setContextClassLoader(this.getClassLoader());
415 impl.start(this);
416 state = Coprocessor.State.ACTIVE;
417 } finally {
418 currentThread.setContextClassLoader(hostClassLoader);
419 }
420 } else {
421 LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
422 " because not inactive (state="+state.toString()+")");
423 }
424 }
425
426
427 protected void shutdown() {
428 if (state == Coprocessor.State.ACTIVE) {
429 state = Coprocessor.State.STOPPING;
430 Thread currentThread = Thread.currentThread();
431 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
432 try {
433 currentThread.setContextClassLoader(this.getClassLoader());
434 impl.stop(this);
435 state = Coprocessor.State.STOPPED;
436 } catch (IOException ioe) {
437 LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
438 } finally {
439 currentThread.setContextClassLoader(hostClassLoader);
440 }
441 } else {
442 LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
443 " because not active (state="+state.toString()+")");
444 }
445 synchronized (openTables) {
446
447 for (HTableInterface table: openTables) {
448 try {
449 ((HTableWrapper)table).internalClose();
450 } catch (IOException e) {
451
452 LOG.warn("Failed to close " +
453 Bytes.toStringBinary(table.getTableName()), e);
454 }
455 }
456 }
457 }
458
459 @Override
460 public Coprocessor getInstance() {
461 return impl;
462 }
463
464 @Override
465 public ClassLoader getClassLoader() {
466 return classLoader;
467 }
468
469 @Override
470 public int getPriority() {
471 return priority;
472 }
473
474 @Override
475 public int getLoadSequence() {
476 return seq;
477 }
478
479
480 @Override
481 public int getVersion() {
482 return Coprocessor.VERSION;
483 }
484
485
486 @Override
487 public String getHBaseVersion() {
488 return VersionInfo.getVersion();
489 }
490
491 @Override
492 public Configuration getConfiguration() {
493 return conf;
494 }
495
496
497
498
499
500
501
502 @Override
503 public HTableInterface getTable(TableName tableName) throws IOException {
504 return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
505 }
506
507
508
509
510
511
512
513 @Override
514 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
515 return HTableWrapper.createWrapper(openTables, tableName, this, pool);
516 }
517 }
518
519 protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
520 abortServer(environment.getInstance().getClass().getName(), e);
521 }
522
523 protected void abortServer(final String coprocessorName, final Throwable e) {
524 String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
525 LOG.error(message, e);
526 if (abortable != null) {
527 abortable.abort(message, e);
528 } else {
529 LOG.warn("No available Abortable, process was not aborted");
530 }
531 }
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548 protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
549 throws IOException {
550 if (e instanceof IOException) {
551 throw (IOException)e;
552 }
553
554
555
556
557
558
559 if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
560
561 abortServer(env, e);
562 } else {
563 LOG.error("Removing coprocessor '" + env.toString() + "' from " +
564 "environment because it threw: " + e,e);
565 coprocessors.remove(env);
566 try {
567 shutdown(env);
568 } catch (Exception x) {
569 LOG.error("Uncaught exception when shutting down coprocessor '"
570 + env.toString() + "'", x);
571 }
572 throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
573 "' threw: '" + e + "' and has been removed from the active " +
574 "coprocessor set.", e);
575 }
576 }
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598 @InterfaceAudience.Private
599 protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
600 final String methodName, final Class<?>... parameterTypes) {
601 boolean useLegacy;
602
603 try {
604 clazz.getDeclaredMethod(methodName, parameterTypes);
605 LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
606 "signature. Skipping legacy support for invocations in '" + clazz +"'.");
607 useLegacy = false;
608 } catch (NoSuchMethodException exception) {
609 useLegacy = true;
610 } catch (SecurityException exception) {
611 LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
612 "' requires legacy support; assuming it does. If you get later errors about legacy " +
613 "coprocessor use, consider updating your security policy to allow access to the package" +
614 " and declared members of your implementation.");
615 LOG.debug("Details of Security Manager rejection.", exception);
616 useLegacy = true;
617 }
618 return useLegacy;
619 }
620
621
622
623
624 private static final Set<Class<? extends Coprocessor>> legacyWarning =
625 new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
626 new Comparator<Class<? extends Coprocessor>>() {
627 @Override
628 public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
629 if (c1.equals(c2)) {
630 return 0;
631 }
632 return c1.getName().compareTo(c2.getName());
633 }
634 });
635
636
637
638
639
640
641
642 @InterfaceAudience.Private
643 protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
644 if(legacyWarning.add(clazz)) {
645 LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
646 " deprecated API. Your coprocessor will not see these events. Please update '" + clazz +
647 "'. Details of the problem: " + message);
648 }
649 }
650 }