1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableSet;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.regex.Matcher;
31
32 import org.apache.commons.collections.map.AbstractReferenceMap;
33 import org.apache.commons.collections.map.ReferenceMap;
34 import org.apache.commons.lang.ClassUtils;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.Coprocessor;
44 import org.apache.hadoop.hbase.CoprocessorEnvironment;
45 import org.apache.hadoop.hbase.HBaseConfiguration;
46 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.HTableDescriptor;
50 import org.apache.hadoop.hbase.client.Append;
51 import org.apache.hadoop.hbase.client.Delete;
52 import org.apache.hadoop.hbase.client.Durability;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.Increment;
55 import org.apache.hadoop.hbase.client.Mutation;
56 import org.apache.hadoop.hbase.client.Put;
57 import org.apache.hadoop.hbase.client.Result;
58 import org.apache.hadoop.hbase.client.Scan;
59 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
60 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
61 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
62 import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
63 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
64 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
65 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
66 import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
67 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
68 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
69 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
70 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
71 import org.apache.hadoop.hbase.io.Reference;
72 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
73 import org.apache.hadoop.hbase.regionserver.Region.Operation;
74 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
75 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
76 import org.apache.hadoop.hbase.wal.WALKey;
77 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
78 import org.apache.hadoop.hbase.util.Bytes;
79 import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
80 import org.apache.hadoop.hbase.util.Pair;
81
82 import com.google.common.collect.ImmutableList;
83 import com.google.common.collect.Lists;
84 import com.google.protobuf.Message;
85 import com.google.protobuf.Service;
86
87
88
89
90
91 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
92 @InterfaceStability.Evolving
93 public class RegionCoprocessorHost
94 extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
95
96 private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
97
98 private static ReferenceMap sharedDataMap =
99 new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
100
101
102 private final boolean hasCustomPostScannerFilterRow;
103
104
105
106
107
108 static class RegionEnvironment extends CoprocessorHost.Environment
109 implements RegionCoprocessorEnvironment {
110
111 private Region region;
112 private RegionServerServices rsServices;
113 ConcurrentMap<String, Object> sharedData;
114 private final boolean useLegacyPre;
115 private final boolean useLegacyPost;
116
117
118
119
120
121
122 public RegionEnvironment(final Coprocessor impl, final int priority,
123 final int seq, final Configuration conf, final Region region,
124 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
125 super(impl, priority, seq, conf);
126 this.region = region;
127 this.rsServices = services;
128 this.sharedData = sharedData;
129
130
131
132
133 useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
134 HRegionInfo.class, WALKey.class, WALEdit.class);
135 useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
136 HRegionInfo.class, WALKey.class, WALEdit.class);
137 }
138
139
140 @Override
141 public Region getRegion() {
142 return region;
143 }
144
145
146 @Override
147 public RegionServerServices getRegionServerServices() {
148 return rsServices;
149 }
150
151 public void shutdown() {
152 super.shutdown();
153 }
154
155 @Override
156 public ConcurrentMap<String, Object> getSharedData() {
157 return sharedData;
158 }
159
160 @Override
161 public HRegionInfo getRegionInfo() {
162 return region.getRegionInfo();
163 }
164
165 }
166
167 static class TableCoprocessorAttribute {
168 private Path path;
169 private String className;
170 private int priority;
171 private Configuration conf;
172
173 public TableCoprocessorAttribute(Path path, String className, int priority,
174 Configuration conf) {
175 this.path = path;
176 this.className = className;
177 this.priority = priority;
178 this.conf = conf;
179 }
180
181 public Path getPath() {
182 return path;
183 }
184
185 public String getClassName() {
186 return className;
187 }
188
189 public int getPriority() {
190 return priority;
191 }
192
193 public Configuration getConf() {
194 return conf;
195 }
196 }
197
198
199 RegionServerServices rsServices;
200
201 Region region;
202
203
204
205
206
207
208
209 public RegionCoprocessorHost(final Region region,
210 final RegionServerServices rsServices, final Configuration conf) {
211 super(rsServices);
212 this.conf = conf;
213 this.rsServices = rsServices;
214 this.region = region;
215 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
216
217
218 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
219
220
221 if (!region.getRegionInfo().getTable().isSystemTable()) {
222 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
223 }
224
225
226 loadTableCoprocessors(conf);
227
228
229 boolean hasCustomPostScannerFilterRow = false;
230 out: for (RegionEnvironment env: coprocessors) {
231 if (env.getInstance() instanceof RegionObserver) {
232 Class<?> clazz = env.getInstance().getClass();
233 for(;;) {
234 if (clazz == null) {
235
236 hasCustomPostScannerFilterRow = true;
237 break out;
238 }
239 if (clazz == BaseRegionObserver.class) {
240
241 break;
242 }
243 try {
244 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
245 InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
246
247 hasCustomPostScannerFilterRow = true;
248 break out;
249 } catch (NoSuchMethodException ignore) {
250 }
251 clazz = clazz.getSuperclass();
252 }
253 }
254 }
255 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
256 }
257
258 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
259 HTableDescriptor htd) {
260 List<TableCoprocessorAttribute> result = Lists.newArrayList();
261 for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
262 String key = Bytes.toString(e.getKey().get()).trim();
263 if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
264 String spec = Bytes.toString(e.getValue().get()).trim();
265
266 try {
267 Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
268 if (matcher.matches()) {
269
270
271 Path path = matcher.group(1).trim().isEmpty() ?
272 null : new Path(matcher.group(1).trim());
273 String className = matcher.group(2).trim();
274 if (className.isEmpty()) {
275 LOG.error("Malformed table coprocessor specification: key=" +
276 key + ", spec: " + spec);
277 continue;
278 }
279 int priority = matcher.group(3).trim().isEmpty() ?
280 Coprocessor.PRIORITY_USER : Integer.parseInt(matcher.group(3));
281 String cfgSpec = null;
282 try {
283 cfgSpec = matcher.group(4);
284 } catch (IndexOutOfBoundsException ex) {
285
286 }
287 Configuration ourConf;
288 if (cfgSpec != null && !cfgSpec.trim().equals("|")) {
289 cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
290
291 ourConf = new Configuration(false);
292 HBaseConfiguration.merge(ourConf, conf);
293 Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
294 while (m.find()) {
295 ourConf.set(m.group(1), m.group(2));
296 }
297 } else {
298 ourConf = conf;
299 }
300 result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
301 } else {
302 LOG.error("Malformed table coprocessor specification: key=" + key +
303 ", spec: " + spec);
304 }
305 } catch (Exception ioe) {
306 LOG.error("Malformed table coprocessor specification: key=" + key +
307 ", spec: " + spec);
308 }
309 }
310 }
311 return result;
312 }
313
314
315
316
317
318
319
320
321 public static void testTableCoprocessorAttrs(final Configuration conf,
322 final HTableDescriptor htd) throws IOException {
323 String pathPrefix = UUID.randomUUID().toString();
324 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
325 if (attr.getPriority() < 0) {
326 throw new IOException("Priority for coprocessor " + attr.getClassName() +
327 " cannot be less than 0");
328 }
329 ClassLoader old = Thread.currentThread().getContextClassLoader();
330 try {
331 ClassLoader cl;
332 if (attr.getPath() != null) {
333 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
334 CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
335 } else {
336 cl = CoprocessorHost.class.getClassLoader();
337 }
338 Thread.currentThread().setContextClassLoader(cl);
339 cl.loadClass(attr.getClassName());
340 } catch (ClassNotFoundException e) {
341 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
342 } finally {
343 Thread.currentThread().setContextClassLoader(old);
344 }
345 }
346 }
347
348 void loadTableCoprocessors(final Configuration conf) {
349 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
350 DEFAULT_COPROCESSORS_ENABLED);
351 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
352 DEFAULT_USER_COPROCESSORS_ENABLED);
353 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
354 return;
355 }
356
357
358
359 List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
360 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
361 region.getTableDesc())) {
362
363 try {
364 RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
365 attr.getConf());
366 configured.add(env);
367 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
368 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
369 } catch (Throwable t) {
370
371 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
372 abortServer(attr.getClassName(), t);
373 } else {
374 LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
375 }
376 }
377 }
378
379 coprocessors.addAll(configured);
380 }
381
382 @Override
383 public RegionEnvironment createEnvironment(Class<?> implClass,
384 Coprocessor instance, int priority, int seq, Configuration conf) {
385
386
387
388
389
390 for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
391 Class<?> c = (Class<?>) itf;
392 if (CoprocessorService.class.isAssignableFrom(c)) {
393 region.registerService( ((CoprocessorService)instance).getService() );
394 }
395 }
396 ConcurrentMap<String, Object> classData;
397
398 synchronized (sharedDataMap) {
399
400
401 classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
402 if (classData == null) {
403 classData = new ConcurrentHashMap<String, Object>();
404 sharedDataMap.put(implClass.getName(), classData);
405 }
406 }
407 return new RegionEnvironment(instance, priority, seq, conf, region,
408 rsServices, classData);
409 }
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424 private void handleCoprocessorThrowableNoRethrow(
425 final CoprocessorEnvironment env, final Throwable e) {
426 try {
427 handleCoprocessorThrowable(env,e);
428 } catch (IOException ioe) {
429
430 LOG.warn(
431 "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
432 e + ". Ignoring.",e);
433 }
434 }
435
436
437
438
439
440
441 public void preOpen() throws IOException {
442 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
443 @Override
444 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
445 throws IOException {
446 oserver.preOpen(ctx);
447 }
448 });
449 }
450
451
452
453
454 public void postOpen() {
455 try {
456 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
457 @Override
458 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
459 throws IOException {
460 oserver.postOpen(ctx);
461 }
462 });
463 } catch (IOException e) {
464 LOG.warn(e);
465 }
466 }
467
468
469
470
471 public void postLogReplay() {
472 try {
473 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
474 @Override
475 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
476 throws IOException {
477 oserver.postLogReplay(ctx);
478 }
479 });
480 } catch (IOException e) {
481 LOG.warn(e);
482 }
483 }
484
485
486
487
488
489 public void preClose(final boolean abortRequested) throws IOException {
490 execOperation(false, new RegionOperation() {
491 @Override
492 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
493 throws IOException {
494 oserver.preClose(ctx, abortRequested);
495 }
496 });
497 }
498
499
500
501
502
503 public void postClose(final boolean abortRequested) {
504 try {
505 execOperation(false, new RegionOperation() {
506 @Override
507 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
508 throws IOException {
509 oserver.postClose(ctx, abortRequested);
510 }
511 public void postEnvCall(RegionEnvironment env) {
512 shutdown(env);
513 }
514 });
515 } catch (IOException e) {
516 LOG.warn(e);
517 }
518 }
519
520
521
522
523
524 public InternalScanner preCompactScannerOpen(final Store store,
525 final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
526 final CompactionRequest request) throws IOException {
527 return execOperationWithResult(null,
528 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
529 @Override
530 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
531 throws IOException {
532 setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
533 earliestPutTs, getResult(), request));
534 }
535 });
536 }
537
538
539
540
541
542
543
544
545
546
547 public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
548 final CompactionRequest request) throws IOException {
549 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
550 @Override
551 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
552 throws IOException {
553 oserver.preCompactSelection(ctx, store, candidates, request);
554 }
555 });
556 }
557
558
559
560
561
562
563
564
565 public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
566 final CompactionRequest request) {
567 try {
568 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
569 @Override
570 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
571 throws IOException {
572 oserver.postCompactSelection(ctx, store, selected, request);
573 }
574 });
575 } catch (IOException e) {
576 LOG.warn(e);
577 }
578 }
579
580
581
582
583
584
585
586
587
588 public InternalScanner preCompact(final Store store, final InternalScanner scanner,
589 final ScanType scanType, final CompactionRequest request) throws IOException {
590 return execOperationWithResult(false, scanner,
591 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
592 @Override
593 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
594 throws IOException {
595 setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
596 }
597 });
598 }
599
600
601
602
603
604
605
606
607 public void postCompact(final Store store, final StoreFile resultFile,
608 final CompactionRequest request) throws IOException {
609 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
610 @Override
611 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
612 throws IOException {
613 oserver.postCompact(ctx, store, resultFile, request);
614 }
615 });
616 }
617
618
619
620
621
622 public InternalScanner preFlush(final Store store, final InternalScanner scanner)
623 throws IOException {
624 return execOperationWithResult(false, scanner,
625 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
626 @Override
627 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
628 throws IOException {
629 setResult(oserver.preFlush(ctx, store, getResult()));
630 }
631 });
632 }
633
634
635
636
637
638 public void preFlush() throws IOException {
639 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
640 @Override
641 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
642 throws IOException {
643 oserver.preFlush(ctx);
644 }
645 });
646 }
647
648
649
650
651
652
653 public InternalScanner preFlushScannerOpen(final Store store,
654 final KeyValueScanner memstoreScanner) throws IOException {
655 return execOperationWithResult(null,
656 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
657 @Override
658 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
659 throws IOException {
660 setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
661 }
662 });
663 }
664
665
666
667
668
669 public void postFlush() throws IOException {
670 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
671 @Override
672 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
673 throws IOException {
674 oserver.postFlush(ctx);
675 }
676 });
677 }
678
679
680
681
682
683 public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
684 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
685 @Override
686 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
687 throws IOException {
688 oserver.postFlush(ctx, store, storeFile);
689 }
690 });
691 }
692
693
694
695
696
697
698 public void preSplit() throws IOException {
699 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
700 @Override
701 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
702 throws IOException {
703 oserver.preSplit(ctx);
704 }
705 });
706 }
707
708
709
710
711
712 public void preSplit(final byte[] splitRow) throws IOException {
713 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
714 @Override
715 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
716 throws IOException {
717 oserver.preSplit(ctx, splitRow);
718 }
719 });
720 }
721
722
723
724
725
726
727
728 public void postSplit(final Region l, final Region r) throws IOException {
729 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
730 @Override
731 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
732 throws IOException {
733 oserver.postSplit(ctx, l, r);
734 }
735 });
736 }
737
738 public boolean preSplitBeforePONR(final byte[] splitKey,
739 final List<Mutation> metaEntries) throws IOException {
740 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
741 @Override
742 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
743 throws IOException {
744 oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
745 }
746 });
747 }
748
749 public void preSplitAfterPONR() throws IOException {
750 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
751 @Override
752 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
753 throws IOException {
754 oserver.preSplitAfterPONR(ctx);
755 }
756 });
757 }
758
759
760
761
762
763 public void preRollBackSplit() throws IOException {
764 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
765 @Override
766 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
767 throws IOException {
768 oserver.preRollBackSplit(ctx);
769 }
770 });
771 }
772
773
774
775
776
777 public void postRollBackSplit() throws IOException {
778 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
779 @Override
780 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
781 throws IOException {
782 oserver.postRollBackSplit(ctx);
783 }
784 });
785 }
786
787
788
789
790
791 public void postCompleteSplit() throws IOException {
792 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
793 @Override
794 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
795 throws IOException {
796 oserver.postCompleteSplit(ctx);
797 }
798 });
799 }
800
801
802
803
804
805
806
807
808
809
810 public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
811 final Result result) throws IOException {
812 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
813 @Override
814 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
815 throws IOException {
816 oserver.preGetClosestRowBefore(ctx, row, family, result);
817 }
818 });
819 }
820
821
822
823
824
825
826
827 public void postGetClosestRowBefore(final byte[] row, final byte[] family,
828 final Result result) throws IOException {
829 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
830 @Override
831 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
832 throws IOException {
833 oserver.postGetClosestRowBefore(ctx, row, family, result);
834 }
835 });
836 }
837
838
839
840
841
842
843 public boolean preGet(final Get get, final List<Cell> results)
844 throws IOException {
845 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
846 @Override
847 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
848 throws IOException {
849 oserver.preGetOp(ctx, get, results);
850 }
851 });
852 }
853
854
855
856
857
858
859 public void postGet(final Get get, final List<Cell> results)
860 throws IOException {
861 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
862 @Override
863 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
864 throws IOException {
865 oserver.postGetOp(ctx, get, results);
866 }
867 });
868 }
869
870
871
872
873
874
875
876 public Boolean preExists(final Get get) throws IOException {
877 return execOperationWithResult(true, false,
878 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
879 @Override
880 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
881 throws IOException {
882 setResult(oserver.preExists(ctx, get, getResult()));
883 }
884 });
885 }
886
887
888
889
890
891
892
893 public boolean postExists(final Get get, boolean exists)
894 throws IOException {
895 return execOperationWithResult(exists,
896 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
897 @Override
898 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
899 throws IOException {
900 setResult(oserver.postExists(ctx, get, getResult()));
901 }
902 });
903 }
904
905
906
907
908
909
910
911
912 public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
913 throws IOException {
914 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
915 @Override
916 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
917 throws IOException {
918 oserver.prePut(ctx, put, edit, durability);
919 }
920 });
921 }
922
923
924
925
926
927
928
929
930
931
932
933 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
934 final Cell kv, final byte[] byteNow, final Get get) throws IOException {
935 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
936 @Override
937 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
938 throws IOException {
939 oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
940 }
941 });
942 }
943
944
945
946
947
948
949
950 public void postPut(final Put put, final WALEdit edit, final Durability durability)
951 throws IOException {
952 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
953 @Override
954 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
955 throws IOException {
956 oserver.postPut(ctx, put, edit, durability);
957 }
958 });
959 }
960
961
962
963
964
965
966
967
968 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
969 throws IOException {
970 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
971 @Override
972 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
973 throws IOException {
974 oserver.preDelete(ctx, delete, edit, durability);
975 }
976 });
977 }
978
979
980
981
982
983
984
985 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
986 throws IOException {
987 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
988 @Override
989 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
990 throws IOException {
991 oserver.postDelete(ctx, delete, edit, durability);
992 }
993 });
994 }
995
996
997
998
999
1000
1001 public boolean preBatchMutate(
1002 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1003 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1004 @Override
1005 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1006 throws IOException {
1007 oserver.preBatchMutate(ctx, miniBatchOp);
1008 }
1009 });
1010 }
1011
1012
1013
1014
1015
1016 public void postBatchMutate(
1017 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1018 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1019 @Override
1020 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1021 throws IOException {
1022 oserver.postBatchMutate(ctx, miniBatchOp);
1023 }
1024 });
1025 }
1026
1027 public void postBatchMutateIndispensably(
1028 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1029 throws IOException {
1030 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1031 @Override
1032 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1033 throws IOException {
1034 oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1035 }
1036 });
1037 }
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050 public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1051 final byte [] qualifier, final CompareOp compareOp,
1052 final ByteArrayComparable comparator, final Put put)
1053 throws IOException {
1054 return execOperationWithResult(true, false,
1055 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1056 @Override
1057 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1058 throws IOException {
1059 setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1060 compareOp, comparator, put, getResult()));
1061 }
1062 });
1063 }
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076 public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1077 final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1078 final Put put) throws IOException {
1079 return execOperationWithResult(true, false,
1080 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1081 @Override
1082 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1083 throws IOException {
1084 setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1085 compareOp, comparator, put, getResult()));
1086 }
1087 });
1088 }
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099 public boolean postCheckAndPut(final byte [] row, final byte [] family,
1100 final byte [] qualifier, final CompareOp compareOp,
1101 final ByteArrayComparable comparator, final Put put,
1102 boolean result) throws IOException {
1103 return execOperationWithResult(result,
1104 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1105 @Override
1106 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1107 throws IOException {
1108 setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1109 compareOp, comparator, put, getResult()));
1110 }
1111 });
1112 }
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125 public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1126 final byte [] qualifier, final CompareOp compareOp,
1127 final ByteArrayComparable comparator, final Delete delete)
1128 throws IOException {
1129 return execOperationWithResult(true, false,
1130 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1131 @Override
1132 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1133 throws IOException {
1134 setResult(oserver.preCheckAndDelete(ctx, row, family,
1135 qualifier, compareOp, comparator, delete, getResult()));
1136 }
1137 });
1138 }
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1152 final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1153 final Delete delete) throws IOException {
1154 return execOperationWithResult(true, false,
1155 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1156 @Override
1157 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1158 throws IOException {
1159 setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1160 family, qualifier, compareOp, comparator, delete, getResult()));
1161 }
1162 });
1163 }
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174 public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1175 final byte [] qualifier, final CompareOp compareOp,
1176 final ByteArrayComparable comparator, final Delete delete,
1177 boolean result) throws IOException {
1178 return execOperationWithResult(result,
1179 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1180 @Override
1181 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1182 throws IOException {
1183 setResult(oserver.postCheckAndDelete(ctx, row, family,
1184 qualifier, compareOp, comparator, delete, getResult()));
1185 }
1186 });
1187 }
1188
1189
1190
1191
1192
1193
1194
1195 public Result preAppend(final Append append) throws IOException {
1196 return execOperationWithResult(true, null,
1197 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1198 @Override
1199 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1200 throws IOException {
1201 setResult(oserver.preAppend(ctx, append));
1202 }
1203 });
1204 }
1205
1206
1207
1208
1209
1210
1211
1212 public Result preAppendAfterRowLock(final Append append) throws IOException {
1213 return execOperationWithResult(true, null,
1214 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1215 @Override
1216 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1217 throws IOException {
1218 setResult(oserver.preAppendAfterRowLock(ctx, append));
1219 }
1220 });
1221 }
1222
1223
1224
1225
1226
1227
1228
1229 public Result preIncrement(final Increment increment) throws IOException {
1230 return execOperationWithResult(true, null,
1231 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1232 @Override
1233 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1234 throws IOException {
1235 setResult(oserver.preIncrement(ctx, increment));
1236 }
1237 });
1238 }
1239
1240
1241
1242
1243
1244
1245
1246 public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1247 return execOperationWithResult(true, null,
1248 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1249 @Override
1250 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1251 throws IOException {
1252 setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1253 }
1254 });
1255 }
1256
1257
1258
1259
1260
1261
1262 public void postAppend(final Append append, final Result result) throws IOException {
1263 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1264 @Override
1265 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1266 throws IOException {
1267 oserver.postAppend(ctx, append, result);
1268 }
1269 });
1270 }
1271
1272
1273
1274
1275
1276
1277 public Result postIncrement(final Increment increment, Result result) throws IOException {
1278 return execOperationWithResult(result,
1279 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1280 @Override
1281 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1282 throws IOException {
1283 setResult(oserver.postIncrement(ctx, increment, getResult()));
1284 }
1285 });
1286 }
1287
1288
1289
1290
1291
1292
1293
1294 public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1295 return execOperationWithResult(true, null,
1296 coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1297 @Override
1298 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1299 throws IOException {
1300 setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1301 }
1302 });
1303 }
1304
1305
1306
1307
1308
1309
1310 public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1311 final NavigableSet<byte[]> targetCols) throws IOException {
1312 return execOperationWithResult(null,
1313 coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1314 @Override
1315 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1316 throws IOException {
1317 setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1318 }
1319 });
1320 }
1321
1322
1323
1324
1325
1326
1327
1328 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1329 return execOperationWithResult(s,
1330 coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1331 @Override
1332 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1333 throws IOException {
1334 setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1335 }
1336 });
1337 }
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347 public Boolean preScannerNext(final InternalScanner s,
1348 final List<Result> results, final int limit) throws IOException {
1349 return execOperationWithResult(true, false,
1350 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1351 @Override
1352 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1353 throws IOException {
1354 setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1355 }
1356 });
1357 }
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367 public boolean postScannerNext(final InternalScanner s,
1368 final List<Result> results, final int limit, boolean hasMore)
1369 throws IOException {
1370 return execOperationWithResult(hasMore,
1371 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1372 @Override
1373 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1374 throws IOException {
1375 setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1376 }
1377 });
1378 }
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390 public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1391 final int offset, final short length) throws IOException {
1392
1393 if (!hasCustomPostScannerFilterRow) return true;
1394 return execOperationWithResult(true,
1395 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1396 @Override
1397 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1398 throws IOException {
1399 setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1400 }
1401 });
1402 }
1403
1404
1405
1406
1407
1408
1409 public boolean preScannerClose(final InternalScanner s) throws IOException {
1410 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1411 @Override
1412 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1413 throws IOException {
1414 oserver.preScannerClose(ctx, s);
1415 }
1416 });
1417 }
1418
1419
1420
1421
1422 public void postScannerClose(final InternalScanner s) throws IOException {
1423 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1424 @Override
1425 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1426 throws IOException {
1427 oserver.postScannerClose(ctx, s);
1428 }
1429 });
1430 }
1431
1432
1433
1434
1435
1436
1437
1438
1439 public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1440 final WALEdit logEdit) throws IOException {
1441 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1442 @Override
1443 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1444 throws IOException {
1445
1446
1447 final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1448 if (env.useLegacyPre) {
1449 if (logKey instanceof HLogKey) {
1450 oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1451 } else {
1452 legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1453 }
1454 } else {
1455 oserver.preWALRestore(ctx, info, logKey, logEdit);
1456 }
1457 }
1458 });
1459 }
1460
1461
1462
1463
1464
1465 @Deprecated
1466 public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1467 final WALEdit logEdit) throws IOException {
1468 return preWALRestore(info, (WALKey)logKey, logEdit);
1469 }
1470
1471
1472
1473
1474
1475
1476
1477 public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1478 throws IOException {
1479 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1480 @Override
1481 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1482 throws IOException {
1483
1484
1485 final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1486 if (env.useLegacyPost) {
1487 if (logKey instanceof HLogKey) {
1488 oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1489 } else {
1490 legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1491 }
1492 } else {
1493 oserver.postWALRestore(ctx, info, logKey, logEdit);
1494 }
1495 }
1496 });
1497 }
1498
1499
1500
1501
1502 @Deprecated
1503 public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1504 throws IOException {
1505 postWALRestore(info, (WALKey)logKey, logEdit);
1506 }
1507
1508
1509
1510
1511
1512
1513 public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1514 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1515 @Override
1516 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1517 throws IOException {
1518 oserver.preBulkLoadHFile(ctx, familyPaths);
1519 }
1520 });
1521 }
1522
1523
1524
1525
1526
1527
1528
1529 public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1530 boolean hasLoaded) throws IOException {
1531 return execOperationWithResult(hasLoaded,
1532 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1533 @Override
1534 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1535 throws IOException {
1536 setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1537 }
1538 });
1539 }
1540
1541 public void postStartRegionOperation(final Operation op) throws IOException {
1542 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1543 @Override
1544 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1545 throws IOException {
1546 oserver.postStartRegionOperation(ctx, op);
1547 }
1548 });
1549 }
1550
1551 public void postCloseRegionOperation(final Operation op) throws IOException {
1552 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1553 @Override
1554 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1555 throws IOException {
1556 oserver.postCloseRegionOperation(ctx, op);
1557 }
1558 });
1559 }
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572 public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1573 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1574 final Reference r) throws IOException {
1575 return execOperationWithResult(null,
1576 coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1577 @Override
1578 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1579 throws IOException {
1580 setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1581 }
1582 });
1583 }
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596 public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1597 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1598 final Reference r, final StoreFile.Reader reader) throws IOException {
1599 return execOperationWithResult(reader,
1600 coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1601 @Override
1602 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1603 throws IOException {
1604 setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1605 }
1606 });
1607 }
1608
1609 public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1610 final Cell oldCell, Cell newCell) throws IOException {
1611 return execOperationWithResult(newCell,
1612 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1613 @Override
1614 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1615 throws IOException {
1616 setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1617 }
1618 });
1619 }
1620
1621 public Message preEndpointInvocation(final Service service, final String methodName,
1622 Message request) throws IOException {
1623 return execOperationWithResult(request,
1624 coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1625 @Override
1626 public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1627 throws IOException {
1628 setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1629 }
1630 });
1631 }
1632
1633 public void postEndpointInvocation(final Service service, final String methodName,
1634 final Message request, final Message.Builder responseBuilder) throws IOException {
1635 execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1636 @Override
1637 public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1638 throws IOException {
1639 oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1640 }
1641 });
1642 }
1643
1644 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1645 return execOperationWithResult(tracker,
1646 coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1647 @Override
1648 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1649 throws IOException {
1650 setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1651 }
1652 });
1653 }
1654
1655 private static abstract class CoprocessorOperation
1656 extends ObserverContext<RegionCoprocessorEnvironment> {
1657 public abstract void call(Coprocessor observer,
1658 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1659 public abstract boolean hasCall(Coprocessor observer);
1660 public void postEnvCall(RegionEnvironment env) { }
1661 }
1662
1663 private static abstract class RegionOperation extends CoprocessorOperation {
1664 public abstract void call(RegionObserver observer,
1665 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1666
1667 public boolean hasCall(Coprocessor observer) {
1668 return observer instanceof RegionObserver;
1669 }
1670
1671 public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1672 throws IOException {
1673 call((RegionObserver)observer, ctx);
1674 }
1675 }
1676
1677 private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1678 private T result = null;
1679 public void setResult(final T result) { this.result = result; }
1680 public T getResult() { return this.result; }
1681 }
1682
1683 private static abstract class EndpointOperation extends CoprocessorOperation {
1684 public abstract void call(EndpointObserver observer,
1685 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1686
1687 public boolean hasCall(Coprocessor observer) {
1688 return observer instanceof EndpointObserver;
1689 }
1690
1691 public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1692 throws IOException {
1693 call((EndpointObserver)observer, ctx);
1694 }
1695 }
1696
1697 private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1698 private T result = null;
1699 public void setResult(final T result) { this.result = result; }
1700 public T getResult() { return this.result; }
1701 }
1702
1703 private boolean execOperation(final CoprocessorOperation ctx)
1704 throws IOException {
1705 return execOperation(true, ctx);
1706 }
1707
1708 private <T> T execOperationWithResult(final T defaultValue,
1709 final RegionOperationWithResult<T> ctx) throws IOException {
1710 if (ctx == null) return defaultValue;
1711 ctx.setResult(defaultValue);
1712 execOperation(true, ctx);
1713 return ctx.getResult();
1714 }
1715
1716 private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1717 final RegionOperationWithResult<T> ctx) throws IOException {
1718 boolean bypass = false;
1719 T result = defaultValue;
1720 if (ctx != null) {
1721 ctx.setResult(defaultValue);
1722 bypass = execOperation(true, ctx);
1723 result = ctx.getResult();
1724 }
1725 return bypass == ifBypass ? result : null;
1726 }
1727
1728 private <T> T execOperationWithResult(final T defaultValue,
1729 final EndpointOperationWithResult<T> ctx) throws IOException {
1730 if (ctx == null) return defaultValue;
1731 ctx.setResult(defaultValue);
1732 execOperation(true, ctx);
1733 return ctx.getResult();
1734 }
1735
1736 private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1737 throws IOException {
1738 boolean bypass = false;
1739 List<RegionEnvironment> envs = coprocessors.get();
1740 for (int i = 0; i < envs.size(); i++) {
1741 RegionEnvironment env = envs.get(i);
1742 Coprocessor observer = env.getInstance();
1743 if (ctx.hasCall(observer)) {
1744 ctx.prepare(env);
1745 Thread currentThread = Thread.currentThread();
1746 ClassLoader cl = currentThread.getContextClassLoader();
1747 try {
1748 currentThread.setContextClassLoader(env.getClassLoader());
1749 ctx.call(observer, ctx);
1750 } catch (Throwable e) {
1751 handleCoprocessorThrowable(env, e);
1752 } finally {
1753 currentThread.setContextClassLoader(cl);
1754 }
1755 bypass |= ctx.shouldBypass();
1756 if (earlyExit && ctx.shouldComplete()) {
1757 break;
1758 }
1759 }
1760
1761 ctx.postEnvCall(env);
1762 }
1763 return bypass;
1764 }
1765 }