1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.SortedSet;
31 import java.util.TreeSet;
32 import java.util.UUID;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.classification.InterfaceAudience;
37 import org.apache.hadoop.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.Coprocessor;
41 import org.apache.hadoop.hbase.CoprocessorEnvironment;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.Server;
44 import org.apache.hadoop.hbase.client.Append;
45 import org.apache.hadoop.hbase.client.Delete;
46 import org.apache.hadoop.hbase.client.Durability;
47 import org.apache.hadoop.hbase.client.Get;
48 import org.apache.hadoop.hbase.client.HTable;
49 import org.apache.hadoop.hbase.client.HTableInterface;
50 import org.apache.hadoop.hbase.client.Increment;
51 import org.apache.hadoop.hbase.client.Put;
52 import org.apache.hadoop.hbase.client.Result;
53 import org.apache.hadoop.hbase.client.ResultScanner;
54 import org.apache.hadoop.hbase.client.Row;
55 import org.apache.hadoop.hbase.client.RowMutations;
56 import org.apache.hadoop.hbase.client.Scan;
57 import org.apache.hadoop.hbase.client.coprocessor.Batch;
58 import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
59 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
62 import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
63 import org.apache.hadoop.hbase.util.VersionInfo;
64
65 import com.google.protobuf.Service;
66 import com.google.protobuf.ServiceException;
67
68
69
70
71
72
73
74 @InterfaceAudience.Public
75 @InterfaceStability.Evolving
76 public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
77 public static final String REGION_COPROCESSOR_CONF_KEY =
78 "hbase.coprocessor.region.classes";
79 public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
80 "hbase.coprocessor.regionserver.classes";
81 public static final String USER_REGION_COPROCESSOR_CONF_KEY =
82 "hbase.coprocessor.user.region.classes";
83 public static final String MASTER_COPROCESSOR_CONF_KEY =
84 "hbase.coprocessor.master.classes";
85 public static final String WAL_COPROCESSOR_CONF_KEY =
86 "hbase.coprocessor.wal.classes";
87
88 private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
89
90 protected SortedSet<E> coprocessors =
91 new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
92 protected Configuration conf;
93
94 protected String pathPrefix;
95 protected volatile int loadSequence;
96
97 public CoprocessorHost() {
98 pathPrefix = UUID.randomUUID().toString();
99 }
100
101
102
103
104
105
106
107
108
109 private static Set<String> coprocessorNames =
110 Collections.synchronizedSet(new HashSet<String>());
111 public static Set<String> getLoadedCoprocessors() {
112 return coprocessorNames;
113 }
114
115
116
117
118
119
120
121
122 public Set<String> getCoprocessors() {
123 Set<String> returnValue = new TreeSet<String>();
124 for(CoprocessorEnvironment e: coprocessors) {
125 returnValue.add(e.getInstance().getClass().getSimpleName());
126 }
127 return returnValue;
128 }
129
130
131
132
133
134 protected void loadSystemCoprocessors(Configuration conf, String confKey) {
135 Class<?> implClass = null;
136
137
138 String[] defaultCPClasses = conf.getStrings(confKey);
139 if (defaultCPClasses == null || defaultCPClasses.length == 0)
140 return;
141
142 int priority = Coprocessor.PRIORITY_SYSTEM;
143 List<E> configured = new ArrayList<E>();
144 for (String className : defaultCPClasses) {
145 className = className.trim();
146 if (findCoprocessor(className) != null) {
147 continue;
148 }
149 ClassLoader cl = this.getClass().getClassLoader();
150 Thread.currentThread().setContextClassLoader(cl);
151 try {
152 implClass = cl.loadClass(className);
153 configured.add(loadInstance(implClass, Coprocessor.PRIORITY_SYSTEM, conf));
154 LOG.info("System coprocessor " + className + " was loaded " +
155 "successfully with priority (" + priority++ + ").");
156 } catch (ClassNotFoundException e) {
157 LOG.warn("Class " + className + " cannot be found. " +
158 e.getMessage());
159 } catch (IOException e) {
160 LOG.warn("Load coprocessor " + className + " failed. " +
161 e.getMessage());
162 }
163 }
164
165
166 coprocessors.addAll(configured);
167 }
168
169
170
171
172
173
174
175
176
177 public E load(Path path, String className, int priority,
178 Configuration conf) throws IOException {
179 Class<?> implClass = null;
180 LOG.debug("Loading coprocessor class " + className + " with path " +
181 path + " and priority " + priority);
182
183 ClassLoader cl = null;
184 if (path == null) {
185 try {
186 implClass = getClass().getClassLoader().loadClass(className);
187 } catch (ClassNotFoundException e) {
188 throw new IOException("No jar path specified for " + className);
189 }
190 } else {
191 cl = CoprocessorClassLoader.getClassLoader(
192 path, getClass().getClassLoader(), pathPrefix, conf);
193 try {
194 implClass = cl.loadClass(className);
195 } catch (ClassNotFoundException e) {
196 throw new IOException("Cannot load external coprocessor class " + className, e);
197 }
198 }
199
200
201 Thread currentThread = Thread.currentThread();
202 ClassLoader hostClassLoader = currentThread.getContextClassLoader();
203 try{
204
205 currentThread.setContextClassLoader(cl);
206 E cpInstance = loadInstance(implClass, priority, conf);
207 return cpInstance;
208 } finally {
209
210 currentThread.setContextClassLoader(hostClassLoader);
211 }
212 }
213
214
215
216
217
218
219
220 public void load(Class<?> implClass, int priority, Configuration conf)
221 throws IOException {
222 E env = loadInstance(implClass, priority, conf);
223 coprocessors.add(env);
224 }
225
226
227
228
229
230
231
232 public E loadInstance(Class<?> implClass, int priority, Configuration conf)
233 throws IOException {
234 if (!Coprocessor.class.isAssignableFrom(implClass)) {
235 throw new IOException("Configured class " + implClass.getName() + " must implement "
236 + Coprocessor.class.getName() + " interface ");
237 }
238
239
240 Coprocessor impl;
241 Object o = null;
242 try {
243 o = implClass.newInstance();
244 impl = (Coprocessor)o;
245 } catch (InstantiationException e) {
246 throw new IOException(e);
247 } catch (IllegalAccessException e) {
248 throw new IOException(e);
249 }
250
251 E env = createEnvironment(implClass, impl, priority, ++loadSequence, conf);
252 if (env instanceof Environment) {
253 ((Environment)env).startup();
254 }
255
256
257 coprocessorNames.add(implClass.getName());
258 return env;
259 }
260
261
262
263
264 public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
265 int priority, int sequence, Configuration conf);
266
267 public void shutdown(CoprocessorEnvironment e) {
268 if (e instanceof Environment) {
269 ((Environment)e).shutdown();
270 } else {
271 LOG.warn("Shutdown called on unknown environment: "+
272 e.getClass().getName());
273 }
274 }
275
276
277
278
279
280
281 public Coprocessor findCoprocessor(String className) {
282
283 for (E env: coprocessors) {
284 if (env.getInstance().getClass().getName().equals(className) ||
285 env.getInstance().getClass().getSimpleName().equals(className)) {
286 return env.getInstance();
287 }
288 }
289 return null;
290 }
291
292
293
294
295
296
297 Set<ClassLoader> getExternalClassLoaders() {
298 Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>();
299 final ClassLoader systemClassLoader = this.getClass().getClassLoader();
300 for (E env : coprocessors) {
301 ClassLoader cl = env.getInstance().getClass().getClassLoader();
302 if (cl != systemClassLoader ){
303
304 externalClassLoaders.add(cl);
305 }
306 }
307 return externalClassLoaders;
308 }
309
310
311
312
313
314
315 public CoprocessorEnvironment findCoprocessorEnvironment(String className) {
316
317 for (E env: coprocessors) {
318 if (env.getInstance().getClass().getName().equals(className) ||
319 env.getInstance().getClass().getSimpleName().equals(className)) {
320 return env;
321 }
322 }
323 return null;
324 }
325
326
327
328
329
330 static class EnvironmentPriorityComparator
331 implements Comparator<CoprocessorEnvironment> {
332 public int compare(final CoprocessorEnvironment env1,
333 final CoprocessorEnvironment env2) {
334 if (env1.getPriority() < env2.getPriority()) {
335 return -1;
336 } else if (env1.getPriority() > env2.getPriority()) {
337 return 1;
338 }
339 if (env1.getLoadSequence() < env2.getLoadSequence()) {
340 return -1;
341 } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
342 return 1;
343 }
344 return 0;
345 }
346 }
347
348
349
350
351 public static class Environment implements CoprocessorEnvironment {
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 class HTableWrapper implements HTableInterface {
368
369 private byte[] tableName;
370 private HTable table;
371
372 public HTableWrapper(byte[] tableName) throws IOException {
373 this.tableName = tableName;
374 this.table = new HTable(conf, tableName);
375 openTables.add(this);
376 }
377
378 void internalClose() throws IOException {
379 table.close();
380 }
381
382 public Configuration getConfiguration() {
383 return table.getConfiguration();
384 }
385
386 public void close() throws IOException {
387 try {
388 internalClose();
389 } finally {
390 openTables.remove(this);
391 }
392 }
393
394 public Result getRowOrBefore(byte[] row, byte[] family)
395 throws IOException {
396 return table.getRowOrBefore(row, family);
397 }
398
399 public Result get(Get get) throws IOException {
400 return table.get(get);
401 }
402
403 public boolean exists(Get get) throws IOException {
404 return table.exists(get);
405 }
406
407 public Boolean[] exists(List<Get> gets) throws IOException{
408 return table.exists(gets);
409 }
410
411 public void put(Put put) throws IOException {
412 table.put(put);
413 }
414
415 public void put(List<Put> puts) throws IOException {
416 table.put(puts);
417 }
418
419 public void delete(Delete delete) throws IOException {
420 table.delete(delete);
421 }
422
423 public void delete(List<Delete> deletes) throws IOException {
424 table.delete(deletes);
425 }
426
427 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
428 byte[] value, Put put) throws IOException {
429 return table.checkAndPut(row, family, qualifier, value, put);
430 }
431
432 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
433 byte[] value, Delete delete) throws IOException {
434 return table.checkAndDelete(row, family, qualifier, value, delete);
435 }
436
437 public long incrementColumnValue(byte[] row, byte[] family,
438 byte[] qualifier, long amount) throws IOException {
439 return table.incrementColumnValue(row, family, qualifier, amount);
440 }
441
442 public long incrementColumnValue(byte[] row, byte[] family,
443 byte[] qualifier, long amount, Durability durability)
444 throws IOException {
445 return table.incrementColumnValue(row, family, qualifier, amount,
446 durability);
447 }
448
449 @Override
450 public Result append(Append append) throws IOException {
451 return table.append(append);
452 }
453
454 @Override
455 public Result increment(Increment increment) throws IOException {
456 return table.increment(increment);
457 }
458
459 public void flushCommits() throws IOException {
460 table.flushCommits();
461 }
462
463 public boolean isAutoFlush() {
464 return table.isAutoFlush();
465 }
466
467 public ResultScanner getScanner(Scan scan) throws IOException {
468 return table.getScanner(scan);
469 }
470
471 public ResultScanner getScanner(byte[] family) throws IOException {
472 return table.getScanner(family);
473 }
474
475 public ResultScanner getScanner(byte[] family, byte[] qualifier)
476 throws IOException {
477 return table.getScanner(family, qualifier);
478 }
479
480 public HTableDescriptor getTableDescriptor() throws IOException {
481 return table.getTableDescriptor();
482 }
483
484 public byte[] getTableName() {
485 return tableName;
486 }
487
488 @Override
489 public void batch(List<? extends Row> actions, Object[] results)
490 throws IOException, InterruptedException {
491 table.batch(actions, results);
492 }
493
494 @Override
495 public Object[] batch(List<? extends Row> actions)
496 throws IOException, InterruptedException {
497 return table.batch(actions);
498 }
499
500 @Override
501 public <R> void batchCallback(List<? extends Row> actions, Object[] results,
502 Batch.Callback<R> callback) throws IOException, InterruptedException {
503 table.batchCallback(actions, results, callback);
504 }
505
506 @Override
507 public <R> Object[] batchCallback(List<? extends Row> actions,
508 Batch.Callback<R> callback) throws IOException, InterruptedException {
509 return table.batchCallback(actions, callback);
510 }
511
512 @Override
513 public Result[] get(List<Get> gets) throws IOException {
514 return table.get(gets);
515 }
516
517 @Override
518 public CoprocessorRpcChannel coprocessorService(byte[] row) {
519 return table.coprocessorService(row);
520 }
521
522 @Override
523 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
524 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
525 throws ServiceException, Throwable {
526 return table.coprocessorService(service, startKey, endKey, callable);
527 }
528
529 @Override
530 public <T extends Service, R> void coprocessorService(Class<T> service,
531 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
532 throws ServiceException, Throwable {
533 table.coprocessorService(service, startKey, endKey, callable, callback);
534 }
535
536 @Override
537 public void mutateRow(RowMutations rm) throws IOException {
538 table.mutateRow(rm);
539 }
540
541 @Override
542 public void setAutoFlush(boolean autoFlush) {
543 table.setAutoFlush(autoFlush);
544 }
545
546 @Override
547 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
548 table.setAutoFlush(autoFlush, clearBufferOnFail);
549 }
550
551 @Override
552 public long getWriteBufferSize() {
553 return table.getWriteBufferSize();
554 }
555
556 @Override
557 public void setWriteBufferSize(long writeBufferSize) throws IOException {
558 table.setWriteBufferSize(writeBufferSize);
559 }
560 }
561
562
563 public Coprocessor impl;
564
565 protected int priority = Coprocessor.PRIORITY_USER;
566
567 Coprocessor.State state = Coprocessor.State.UNINSTALLED;
568
569 protected List<HTableInterface> openTables =
570 Collections.synchronizedList(new ArrayList<HTableInterface>());
571 private int seq;
572 private Configuration conf;
573
574
575
576
577
578
579 public Environment(final Coprocessor impl, final int priority,
580 final int seq, final Configuration conf) {
581 this.impl = impl;
582 this.priority = priority;
583 this.state = Coprocessor.State.INSTALLED;
584 this.seq = seq;
585 this.conf = conf;
586 }
587
588
589 public void startup() {
590 if (state == Coprocessor.State.INSTALLED ||
591 state == Coprocessor.State.STOPPED) {
592 state = Coprocessor.State.STARTING;
593 try {
594 impl.start(this);
595 state = Coprocessor.State.ACTIVE;
596 } catch (IOException ioe) {
597 LOG.error("Error starting coprocessor "+impl.getClass().getName(), ioe);
598 }
599 } else {
600 LOG.warn("Not starting coprocessor "+impl.getClass().getName()+
601 " because not inactive (state="+state.toString()+")");
602 }
603 }
604
605
606 protected void shutdown() {
607 if (state == Coprocessor.State.ACTIVE) {
608 state = Coprocessor.State.STOPPING;
609 try {
610 impl.stop(this);
611 state = Coprocessor.State.STOPPED;
612 } catch (IOException ioe) {
613 LOG.error("Error stopping coprocessor "+impl.getClass().getName(), ioe);
614 }
615 } else {
616 LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
617 " because not active (state="+state.toString()+")");
618 }
619
620 for (HTableInterface table: openTables) {
621 try {
622 ((HTableWrapper)table).internalClose();
623 } catch (IOException e) {
624
625 LOG.warn("Failed to close " +
626 Bytes.toStringBinary(table.getTableName()), e);
627 }
628 }
629 }
630
631 @Override
632 public Coprocessor getInstance() {
633 return impl;
634 }
635
636 @Override
637 public int getPriority() {
638 return priority;
639 }
640
641 @Override
642 public int getLoadSequence() {
643 return seq;
644 }
645
646
647 @Override
648 public int getVersion() {
649 return Coprocessor.VERSION;
650 }
651
652
653 @Override
654 public String getHBaseVersion() {
655 return VersionInfo.getVersion();
656 }
657
658 @Override
659 public Configuration getConfiguration() {
660 return conf;
661 }
662
663
664
665
666
667
668
669 @Override
670 public HTableInterface getTable(byte[] tableName) throws IOException {
671 return new HTableWrapper(tableName);
672 }
673 }
674
675 protected void abortServer(final String service,
676 final Server server,
677 final CoprocessorEnvironment environment,
678 final Throwable e) {
679 String coprocessorName = (environment.getInstance()).toString();
680 server.abort("Aborting service: " + service + " running on : "
681 + server.getServerName() + " because coprocessor: "
682 + coprocessorName + " threw an exception.", e);
683 }
684
685 protected void abortServer(final CoprocessorEnvironment environment,
686 final Throwable e) {
687 String coprocessorName = (environment.getInstance()).toString();
688 LOG.error("The coprocessor: " + coprocessorName + " threw an unexpected " +
689 "exception: " + e + ", but there's no specific implementation of " +
690 " abortServer() for this coprocessor's environment.");
691 }
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709 protected void handleCoprocessorThrowable(final CoprocessorEnvironment env,
710 final Throwable e)
711 throws IOException {
712 if (e instanceof IOException) {
713 throw (IOException)e;
714 }
715
716
717
718
719
720
721 if (env.getConfiguration().getBoolean("hbase.coprocessor.abortonerror",false)) {
722
723 abortServer(env, e);
724 } else {
725 LOG.error("Removing coprocessor '" + env.toString() + "' from " +
726 "environment because it threw: " + e,e);
727 coprocessors.remove(env);
728 throw new DoNotRetryIOException("Coprocessor: '" + env.toString() +
729 "' threw: '" + e + "' and has been removed" + "from the active " +
730 "coprocessor set.", e);
731 }
732 }
733 }
734
735