1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.UUID;
31 import java.util.concurrent.ConcurrentSkipListSet;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.Abortable;
42 import org.apache.hadoop.hbase.Coprocessor;
43 import org.apache.hadoop.hbase.CoprocessorEnvironment;
44 import org.apache.hadoop.hbase.DoNotRetryIOException;
45 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.HTable;
48 import org.apache.hadoop.hbase.client.HTableInterface;
49 import org.apache.hadoop.hbase.client.HTableWrapper;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
52 import org.apache.hadoop.hbase.util.SortedList;
53 import org.apache.hadoop.hbase.util.VersionInfo;
54
55
56
57
58
59
60
61 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
62 @InterfaceStability.Evolving
63 public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
64 public static final String REGION_COPROCESSOR_CONF_KEY =
65 "hbase.coprocessor.region.classes";
66 public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
67 "hbase.coprocessor.regionserver.classes";
68 public static final String USER_REGION_COPROCESSOR_CONF_KEY =
69 "hbase.coprocessor.user.region.classes";
70 public static final String MASTER_COPROCESSOR_CONF_KEY =
71 "hbase.coprocessor.master.classes";
72 public static final String WAL_COPROCESSOR_CONF_KEY =
73 "hbase.coprocessor.wal.classes";
74 public static final String ABORT_ON_ERROR_KEY = "hbase.coprocessor.abortonerror";
75 public static final boolean DEFAULT_ABORT_ON_ERROR = true;
76 public static final String COPROCESSORS_ENABLED_CONF_KEY = "hbase.coprocessor.enabled";
77 public static final boolean DEFAULT_COPROCESSORS_ENABLED = true;
78 public static final String USER_COPROCESSORS_ENABLED_CONF_KEY =
79 "hbase.coprocessor.user.enabled";
80 public static final boolean DEFAULT_USER_COPROCESSORS_ENABLED = true;
81
82 private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
83 protected Abortable abortable;
84
85 protected SortedList<E> coprocessors =
86 new SortedList<E>(new EnvironmentPriorityComparator());
87 protected Configuration conf;
88
89 protected String pathPrefix;
90 protected AtomicInteger loadSequence = new AtomicInteger();
91
92 public CoprocessorHost(Abortable abortable) {
93 this.abortable = abortable;
94 this.pathPrefix = UUID.randomUUID().toString();
95 }
96
97
98
99
100
101
102
103
104
105 private static Set<String> coprocessorNames =
106 Collections.synchronizedSet(new HashSet<String>());
107
108 public static Set<String> getLoadedCoprocessors() {
109 synchronized (coprocessorNames) {
110 return new HashSet(coprocessorNames);
111 }
112 }
113
114
115
116
117
118
119
120
121 public Set<String> getCoprocessors() {
122 Set<String> returnValue = new TreeSet<String>();
123 for (CoprocessorEnvironment e: coprocessors) {
124 returnValue.add(e.getInstance().getClass().getSimpleName());
125 }
126 return returnValue;
127 }
128
129
130
131
132
133 protected void loadSystemCoprocessors(Configuration conf, String confKey) {
134 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
135 DEFAULT_COPROCESSORS_ENABLED);
136 if (!coprocessorsEnabled) {
137 return;
138 }
139
140 Class<?> implClass = null;
141
142
143 String[] defaultCPClasses = conf.getStrings(confKey);
144 if (defaultCPClasses == null || defaultCPClasses.length == 0)
145 return;
146
147 int priority = Coprocessor.PRIORITY_SYSTEM;
148 for (String className : defaultCPClasses) {
149 className = className.trim();
150 if (findCoprocessor(className) != null) {
151
152 LOG.warn("Attempted duplicate loading of " + className + "; skipped");
153 continue;
154 }
155 ClassLoader cl = this.getClass().getClassLoader();
156 Thread.currentThread().setContextClassLoader(cl);
157 try {
158 implClass = cl.loadClass(className);
159
160
161 this.coprocessors.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
162 LOG.info("System coprocessor " + className + " was loaded " +
163 "successfully with priority (" + priority++ + ").");
164 } catch (Throwable t) {
165
166 abortServer(className, t);
167 }
168 }
169 }
170
171
172
173
174
175
176
177
178
179 public E load(Path path, String className, int priority,
180 Configuration conf) throws IOException {
181 Class<?> implClass = null;
182 LOG.debug("Loading coprocessor class " + className + " with path " +
183 path + " and priority " + priority);
184
185 ClassLoader cl = null;
186 if (path == null) {
187 try {
188 implClass = getClass().getClassLoader().loadClass(className);
189 } catch (ClassNotFoundException e) {
190 throw new IOException("No jar path specified for " + className);
191 }
192 } else {
193 cl = CoprocessorClassLoader.getClassLoader(
194 path, getClass().getClassLoader(), pathPrefix, conf);
195 try {
196 implClass = cl.loadClass(className);
197 } catch (ClassNotFoundException e) {
198 throw new IOException("Cannot load external coprocessor class " + className, e);
199 }
200 }
201
202
203 Thread currentThread = Thread.currentThread();
204 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
205 try{
206
207 currentThread.setContextClassLoader(cl);
208 E cpInstance = loadInstance(implClass, priority, conf);
209 return cpInstance;
210 } finally {
211
212 currentThread.setContextClassLoader(hostClassLoader);
213 }
214 }
215
216
217
218
219
220
221
222 public void load(Class<?> implClass, int priority, Configuration conf)
223 throws IOException {
224 E env = loadInstance(implClass, priority, conf);
225 coprocessors.add(env);
226 }
227
228
229
230
231
232
233
234 public E loadInstance(Class<?> implClass, int priority, Configuration conf)
235 throws IOException {
236 if (!Coprocessor.class.isAssignableFrom(implClass)) {
237 throw new IOException("Configured class " + implClass.getName() + " must implement "
238 + Coprocessor.class.getName() + " interface ");
239 }
240
241
242 Coprocessor impl;
243 Object o = null;
244 try {
245 o = implClass.newInstance();
246 impl = (Coprocessor)o;
247 } catch (InstantiationException e) {
248 throw new IOException(e);
249 } catch (IllegalAccessException e) {
250 throw new IOException(e);
251 }
252
253 E env = createEnvironment(implClass, impl, priority, loadSequence.incrementAndGet(), conf);
254 if (env instanceof Environment) {
255 ((Environment)env).startup();
256 }
257
258
259 coprocessorNames.add(implClass.getName());
260 return env;
261 }
262
263
264
265
266 public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
267 int priority, int sequence, Configuration conf);
268
269 public void shutdown(CoprocessorEnvironment e) {
270 if (e instanceof Environment) {
271 if (LOG.isDebugEnabled()) {
272 LOG.debug("Stop coprocessor " + e.getInstance().getClass().getName());
273 }
274 ((Environment)e).shutdown();
275 } else {
276 LOG.warn("Shutdown called on unknown environment: "+
277 e.getClass().getName());
278 }
279 }
280
281
282
283
284
285
286 public Coprocessor findCoprocessor(String className) {
287 for (E env: coprocessors) {
288 if (env.getInstance().getClass().getName().equals(className) ||
289 env.getInstance().getClass().getSimpleName().equals(className)) {
290 return env.getInstance();
291 }
292 }
293 return null;
294 }
295
296
297
298
299
300
301 public <T extends Coprocessor> List<T> findCoprocessors(Class<T> cls) {
302 ArrayList<T> ret = new ArrayList<T>();
303
304 for (E env: coprocessors) {
305 Coprocessor cp = env.getInstance();
306
307 if(cp != null) {
308 if (cls.isAssignableFrom(cp.getClass())) {
309 ret.add((T)cp);
310 }
311 }
312 }
313 return ret;
314 }
315
316
317
318
319
320
321 public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
322 for (E env: coprocessors) {
323 if (env.getInstance().getClass().getName().equals(className) ||
324 env.getInstance().getClass().getSimpleName().equals(className)) {
325 return env;
326 }
327 }
328 return null;
329 }
330
331
332
333
334
335
336 Set<ClassLoader> getExternalClassLoaders() {
337 Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
338 final ClassLoader systemClassLoader = this.getClass().getClassLoader();
339 for (E env : coprocessors) {
340 ClassLoader cl = env.getInstance().getClass().getClassLoader();
341 if (cl != systemClassLoader ){
342
343 externalClassLoaders.add(cl);
344 }
345 }
346 return externalClassLoaders;
347 }
348
349
350
351
352
353 static class EnvironmentPriorityComparator
354 implements Comparator<CoprocessorEnvironment> {
355 @Override
356 public int compare(final CoprocessorEnvironment env1,
357 final CoprocessorEnvironment env2) {
358 if (env1.getPriority() < env2.getPriority()) {
359 return -1;
360 } else if (env1.getPriority() > env2.getPriority()) {
361 return 1;
362 }
363 if (env1.getLoadSequence() < env2.getLoadSequence()) {
364 return -1;
365 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
366 return 1;
367 }
368 return 0;
369 }
370 }
371
372
373
374
375 public static class Environment implements CoprocessorEnvironment {
376
377
378 public Coprocessor impl;
379
380 protected int priority = Coprocessor.PRIORITY_USER;
381
382 Coprocessor.State state = Coprocessor.State.UNINSTALLED;
383
384 protected List<HTableInterface> openTables =
385 Collections.synchronizedList(new ArrayList<HTableInterface>());
386 private int seq;
387 private Configuration conf;
388 private ClassLoader classLoader;
389
390
391
392
393
394
395 public Environment(final Coprocessor impl, final int priority,
396 final int seq, final Configuration conf) {
397 this.impl = impl;
398 this.classLoader = impl.getClass().getClassLoader();
399 this.priority = priority;
400 this.state = Coprocessor.State.INSTALLED;
401 this.seq = seq;
402 this.conf = conf;
403 }
404
405
406 public void startup() throws IOException {
407 if (state == Coprocessor.State.INSTALLED ||
408 state == Coprocessor.State.STOPPED) {
409 state = Coprocessor.State.STARTING;
410 Thread currentThread = Thread.currentThread();
411 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
412 try {
413 currentThread.setContextClassLoader(this.getClassLoader());
414 impl.start(this);
415 state = Coprocessor.State.ACTIVE;
416 } finally {
417 currentThread.setContextClassLoader(hostClassLoader);
418 }
419 } else {
420 LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
421 " because not inactive (state="+state.toString()+")");
422 }
423 }
424
425
426 protected void shutdown() {
427 if (state == Coprocessor.State.ACTIVE) {
428 state = Coprocessor.State.STOPPING;
429 Thread currentThread = Thread.currentThread();
430 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
431 try {
432 currentThread.setContextClassLoader(this.getClassLoader());
433 impl.stop(this);
434 state = Coprocessor.State.STOPPED;
435 } catch (IOException ioe) {
436 LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
437 } finally {
438 currentThread.setContextClassLoader(hostClassLoader);
439 }
440 } else {
441 LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
442 " because not active (state="+state.toString()+")");
443 }
444 synchronized (openTables) {
445
446 for (HTableInterface table: openTables) {
447 try {
448 ((HTableWrapper)table).internalClose();
449 } catch (IOException e) {
450
451 LOG.warn("Failed to close " +
452 Bytes.toStringBinary(table.getTableName()), e);
453 }
454 }
455 }
456 }
457
458 @Override
459 public Coprocessor getInstance() {
460 return impl;
461 }
462
463 @Override
464 public ClassLoader getClassLoader() {
465 return classLoader;
466 }
467
468 @Override
469 public int getPriority() {
470 return priority;
471 }
472
473 @Override
474 public int getLoadSequence() {
475 return seq;
476 }
477
478
479 @Override
480 public int getVersion() {
481 return Coprocessor.VERSION;
482 }
483
484
485 @Override
486 public String getHBaseVersion() {
487 return VersionInfo.getVersion();
488 }
489
490 @Override
491 public Configuration getConfiguration() {
492 return conf;
493 }
494
495
496
497
498
499
500
501 @Override
502 public HTableInterface getTable(TableName tableName) throws IOException {
503 return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
504 }
505
506
507
508
509
510
511
512 @Override
513 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
514 return HTableWrapper.createWrapper(openTables, tableName, this, pool);
515 }
516 }
517
518 protected void abortServer(final CoprocessorEnvironment environment, final Throwable e) {
519 abortServer(environment.getInstance().getClass().getName(), e);
520 }
521
522 protected void abortServer(final String coprocessorName, final Throwable e) {
523 String message = "The coprocessor " + coprocessorName + " threw " + e.toString();
524 LOG.error(message, e);
525 if (abortable != null) {
526 abortable.abort(message, e);
527 } else {
528 LOG.warn("No available Abortable, process was not aborted");
529 }
530 }
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547 protected void handleCoprocessorThrowable(final CoprocessorEnvironment env, final Throwable e)
548 throws IOException {
549 if (e instanceof IOException) {
550 throw (IOException)e;
551 }
552
553
554
555
556
557
558 if (env.getConfiguration().getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
559
560 abortServer(env, e);
561 } else {
562
563 if(env instanceof RegionCoprocessorEnvironment) {
564 String tableName = ((RegionCoprocessorEnvironment)env).getRegionInfo().getTable().getNameAsString();
565 LOG.error("Removing coprocessor '" + env.toString() + "' from table '"+ tableName + "'", e);
566 } else {
567 LOG.error("Removing coprocessor '" + env.toString() + "' from " +
568 "environment",e);
569 }
570
571 coprocessors.remove(env);
572 try {
573 shutdown(env);
574 } catch (Exception x) {
575 LOG.error("Uncaught exception when shutting down coprocessor '"
576 + env.toString() + "'", x);
577 }
578 throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
579 "' threw: '" + e + "' and has been removed from the active " +
580 "coprocessor set.", e);
581 }
582 }
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604 @InterfaceAudience.Private
605 protected static boolean useLegacyMethod(final Class<? extends Coprocessor> clazz,
606 final String methodName, final Class<?>... parameterTypes) {
607 boolean useLegacy;
608
609 try {
610 clazz.getDeclaredMethod(methodName, parameterTypes);
611 LOG.debug("Found an implementation of '" + methodName + "' that uses updated method " +
612 "signature. Skipping legacy support for invocations in '" + clazz +"'.");
613 useLegacy = false;
614 } catch (NoSuchMethodException exception) {
615 useLegacy = true;
616 } catch (SecurityException exception) {
617 LOG.warn("The Security Manager denied our attempt to detect if the coprocessor '" + clazz +
618 "' requires legacy support; assuming it does. If you get later errors about legacy " +
619 "coprocessor use, consider updating your security policy to allow access to the package" +
620 " and declared members of your implementation.");
621 LOG.debug("Details of Security Manager rejection.", exception);
622 useLegacy = true;
623 }
624 return useLegacy;
625 }
626
627
628
629
630 private static final Set<Class<? extends Coprocessor>> legacyWarning =
631 new ConcurrentSkipListSet<Class<? extends Coprocessor>>(
632 new Comparator<Class<? extends Coprocessor>>() {
633 @Override
634 public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
635 if (c1.equals(c2)) {
636 return 0;
637 }
638 return c1.getName().compareTo(c2.getName());
639 }
640 });
641
642
643
644
645
646
647
648 @InterfaceAudience.Private
649 protected void legacyWarning(final Class<? extends Coprocessor> clazz, final String message) {
650 if(legacyWarning.add(clazz)) {
651 LOG.error("You have a legacy coprocessor loaded and there are events we can't map to the " +
652 " deprecated API. Your coprocessor will not see these events. Please update '" + clazz +
653 "'. Details of the problem: " + message);
654 }
655 }
656 }