1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableSet;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32 import java.util.regex.Matcher;
33
34 import org.apache.commons.collections.map.AbstractReferenceMap;
35 import org.apache.commons.collections.map.ReferenceMap;
36 import org.apache.commons.lang.ClassUtils;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.classification.InterfaceStability;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.Coprocessor;
47 import org.apache.hadoop.hbase.CoprocessorEnvironment;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.client.Append;
54 import org.apache.hadoop.hbase.client.Delete;
55 import org.apache.hadoop.hbase.client.Durability;
56 import org.apache.hadoop.hbase.client.Get;
57 import org.apache.hadoop.hbase.client.Increment;
58 import org.apache.hadoop.hbase.client.Mutation;
59 import org.apache.hadoop.hbase.client.Put;
60 import org.apache.hadoop.hbase.client.Result;
61 import org.apache.hadoop.hbase.client.Scan;
62 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
63 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
64 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
65 import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
66 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
67 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
68 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
69 import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
70 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
71 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
72 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
73 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
74 import org.apache.hadoop.hbase.io.Reference;
75 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
76 import org.apache.hadoop.hbase.regionserver.Region.Operation;
77 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
78 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
79 import org.apache.hadoop.hbase.wal.WALKey;
80 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
81 import org.apache.hadoop.hbase.util.BoundedConcurrentLinkedQueue;
82 import org.apache.hadoop.hbase.util.Bytes;
83 import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
84 import org.apache.hadoop.hbase.util.Pair;
85
86 import com.google.common.collect.ImmutableList;
87 import com.google.common.collect.Lists;
88 import com.google.protobuf.Message;
89 import com.google.protobuf.Service;
90
91
92
93
94
95 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
96 @InterfaceStability.Evolving
97 public class RegionCoprocessorHost
98 extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
99
100 private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
101
102 private static ReferenceMap sharedDataMap =
103 new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
104
105
106 private final boolean hasCustomPostScannerFilterRow;
107
108
109
110
111
112 static class RegionEnvironment extends CoprocessorHost.Environment
113 implements RegionCoprocessorEnvironment {
114
115 private Region region;
116 private RegionServerServices rsServices;
117 ConcurrentMap<String, Object> sharedData;
118 private static final int LATENCY_BUFFER_SIZE = 100;
119 private final BoundedConcurrentLinkedQueue<Long> coprocessorTimeNanos =
120 new BoundedConcurrentLinkedQueue<Long>(LATENCY_BUFFER_SIZE);
121 private final boolean useLegacyPre;
122 private final boolean useLegacyPost;
123
124
125
126
127
128
129 public RegionEnvironment(final Coprocessor impl, final int priority,
130 final int seq, final Configuration conf, final Region region,
131 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
132 super(impl, priority, seq, conf);
133 this.region = region;
134 this.rsServices = services;
135 this.sharedData = sharedData;
136
137
138
139
140 useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
141 HRegionInfo.class, WALKey.class, WALEdit.class);
142 useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
143 HRegionInfo.class, WALKey.class, WALEdit.class);
144 }
145
146
147 @Override
148 public Region getRegion() {
149 return region;
150 }
151
152
153 @Override
154 public RegionServerServices getRegionServerServices() {
155 return rsServices;
156 }
157
158 public void shutdown() {
159 super.shutdown();
160 }
161
162 @Override
163 public ConcurrentMap<String, Object> getSharedData() {
164 return sharedData;
165 }
166
167 public void offerExecutionLatency(long latencyNanos) {
168 coprocessorTimeNanos.offer(latencyNanos);
169 }
170
171 public Collection<Long> getExecutionLatenciesNanos() {
172 final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
173 coprocessorTimeNanos.drainTo(latencies);
174 return latencies;
175 }
176
177 @Override
178 public HRegionInfo getRegionInfo() {
179 return region.getRegionInfo();
180 }
181
182 }
183
184 static class TableCoprocessorAttribute {
185 private Path path;
186 private String className;
187 private int priority;
188 private Configuration conf;
189
190 public TableCoprocessorAttribute(Path path, String className, int priority,
191 Configuration conf) {
192 this.path = path;
193 this.className = className;
194 this.priority = priority;
195 this.conf = conf;
196 }
197
198 public Path getPath() {
199 return path;
200 }
201
202 public String getClassName() {
203 return className;
204 }
205
206 public int getPriority() {
207 return priority;
208 }
209
210 public Configuration getConf() {
211 return conf;
212 }
213 }
214
215
216 RegionServerServices rsServices;
217
218 Region region;
219
220
221
222
223
224
225
226 public RegionCoprocessorHost(final Region region,
227 final RegionServerServices rsServices, final Configuration conf) {
228 super(rsServices);
229 this.conf = conf;
230 this.rsServices = rsServices;
231 this.region = region;
232 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
233
234
235 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
236
237
238 if (!region.getRegionInfo().getTable().isSystemTable()) {
239 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
240 }
241
242
243 loadTableCoprocessors(conf);
244
245
246 boolean hasCustomPostScannerFilterRow = false;
247 out: for (RegionEnvironment env: coprocessors) {
248 if (env.getInstance() instanceof RegionObserver) {
249 Class<?> clazz = env.getInstance().getClass();
250 for(;;) {
251 if (clazz == null) {
252
253 hasCustomPostScannerFilterRow = true;
254 break out;
255 }
256 if (clazz == BaseRegionObserver.class) {
257
258 break;
259 }
260 try {
261 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
262 InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
263
264 hasCustomPostScannerFilterRow = true;
265 break out;
266 } catch (NoSuchMethodException ignore) {
267 }
268 clazz = clazz.getSuperclass();
269 }
270 }
271 }
272 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
273 }
274
275 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
276 HTableDescriptor htd) {
277 List<TableCoprocessorAttribute> result = Lists.newArrayList();
278 for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
279 String key = Bytes.toString(e.getKey().get()).trim();
280 if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
281 String spec = Bytes.toString(e.getValue().get()).trim();
282
283 try {
284 Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
285 if (matcher.matches()) {
286
287
288 Path path = matcher.group(1).trim().isEmpty() ?
289 null : new Path(matcher.group(1).trim());
290 String className = matcher.group(2).trim();
291 if (className.isEmpty()) {
292 LOG.error("Malformed table coprocessor specification: key=" +
293 key + ", spec: " + spec);
294 continue;
295 }
296 int priority = matcher.group(3).trim().isEmpty() ?
297 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
298 String cfgSpec = null;
299 try {
300 cfgSpec = matcher.group(4);
301 } catch (IndexOutOfBoundsException ex) {
302
303 }
304 Configuration ourConf;
305 if (cfgSpec != null && !cfgSpec.trim().equals("|")) {
306 cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
307
308 ourConf = new Configuration(false);
309 HBaseConfiguration.merge(ourConf, conf);
310 Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
311 while (m.find()) {
312 ourConf.set(m.group(1), m.group(2));
313 }
314 } else {
315 ourConf = conf;
316 }
317 result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
318 } else {
319 LOG.error("Malformed table coprocessor specification: key=" + key +
320 ", spec: " + spec);
321 }
322 } catch (Exception ioe) {
323 LOG.error("Malformed table coprocessor specification: key=" + key +
324 ", spec: " + spec);
325 }
326 }
327 }
328 return result;
329 }
330
331
332
333
334
335
336
337
338 public static void testTableCoprocessorAttrs(final Configuration conf,
339 final HTableDescriptor htd) throws IOException {
340 String pathPrefix = UUID.randomUUID().toString();
341 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
342 if (attr.getPriority() < 0) {
343 throw new IOException("Priority for coprocessor " + attr.getClassName() +
344 " cannot be less than 0");
345 }
346 ClassLoader old = Thread.currentThread().getContextClassLoader();
347 try {
348 ClassLoader cl;
349 if (attr.getPath() != null) {
350 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
351 CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
352 } else {
353 cl = CoprocessorHost.class.getClassLoader();
354 }
355 Thread.currentThread().setContextClassLoader(cl);
356 cl.loadClass(attr.getClassName());
357 } catch (ClassNotFoundException e) {
358 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
359 } finally {
360 Thread.currentThread().setContextClassLoader(old);
361 }
362 }
363 }
364
365 void loadTableCoprocessors(final Configuration conf) {
366 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
367 DEFAULT_COPROCESSORS_ENABLED);
368 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
369 DEFAULT_USER_COPROCESSORS_ENABLED);
370 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
371 return;
372 }
373
374
375
376 List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
377 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
378 region.getTableDesc())) {
379
380 try {
381 RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
382 attr.getConf());
383 configured.add(env);
384 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
385 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
386 } catch (Throwable t) {
387
388 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
389 abortServer(attr.getClassName(), t);
390 } else {
391 LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
392 }
393 }
394 }
395
396 coprocessors.addAll(configured);
397 }
398
399 @Override
400 public RegionEnvironment createEnvironment(Class<?> implClass,
401 Coprocessor instance, int priority, int seq, Configuration conf) {
402
403
404
405
406
407 for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
408 Class<?> c = (Class<?>) itf;
409 if (CoprocessorService.class.isAssignableFrom(c)) {
410 region.registerService( ((CoprocessorService)instance).getService() );
411 }
412 }
413 ConcurrentMap<String, Object> classData;
414
415 synchronized (sharedDataMap) {
416
417
418 classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
419 if (classData == null) {
420 classData = new ConcurrentHashMap<String, Object>();
421 sharedDataMap.put(implClass.getName(), classData);
422 }
423 }
424 return new RegionEnvironment(instance, priority, seq, conf, region,
425 rsServices, classData);
426 }
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441 private void handleCoprocessorThrowableNoRethrow(
442 final CoprocessorEnvironment env, final Throwable e) {
443 try {
444 handleCoprocessorThrowable(env,e);
445 } catch (IOException ioe) {
446
447 LOG.warn(
448 "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
449 e + ". Ignoring.",e);
450 }
451 }
452
453
454
455
456
457
458 public void preOpen() throws IOException {
459 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
460 @Override
461 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
462 throws IOException {
463 oserver.preOpen(ctx);
464 }
465 });
466 }
467
468
469
470
471 public void postOpen() {
472 try {
473 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
474 @Override
475 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
476 throws IOException {
477 oserver.postOpen(ctx);
478 }
479 });
480 } catch (IOException e) {
481 LOG.warn(e);
482 }
483 }
484
485
486
487
488 public void postLogReplay() {
489 try {
490 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
491 @Override
492 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
493 throws IOException {
494 oserver.postLogReplay(ctx);
495 }
496 });
497 } catch (IOException e) {
498 LOG.warn(e);
499 }
500 }
501
502
503
504
505
506 public void preClose(final boolean abortRequested) throws IOException {
507 execOperation(false, new RegionOperation() {
508 @Override
509 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
510 throws IOException {
511 oserver.preClose(ctx, abortRequested);
512 }
513 });
514 }
515
516
517
518
519
520 public void postClose(final boolean abortRequested) {
521 try {
522 execOperation(false, new RegionOperation() {
523 @Override
524 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
525 throws IOException {
526 oserver.postClose(ctx, abortRequested);
527 }
528 public void postEnvCall(RegionEnvironment env) {
529 shutdown(env);
530 }
531 });
532 } catch (IOException e) {
533 LOG.warn(e);
534 }
535 }
536
537
538
539
540
541 public InternalScanner preCompactScannerOpen(final Store store,
542 final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
543 final CompactionRequest request) throws IOException {
544 return execOperationWithResult(null,
545 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
546 @Override
547 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
548 throws IOException {
549 setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
550 earliestPutTs, getResult(), request));
551 }
552 });
553 }
554
555
556
557
558
559
560
561
562
563
564 public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
565 final CompactionRequest request) throws IOException {
566 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
567 @Override
568 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
569 throws IOException {
570 oserver.preCompactSelection(ctx, store, candidates, request);
571 }
572 });
573 }
574
575
576
577
578
579
580
581
582 public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
583 final CompactionRequest request) {
584 try {
585 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
586 @Override
587 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
588 throws IOException {
589 oserver.postCompactSelection(ctx, store, selected, request);
590 }
591 });
592 } catch (IOException e) {
593 LOG.warn(e);
594 }
595 }
596
597
598
599
600
601
602
603
604
605 public InternalScanner preCompact(final Store store, final InternalScanner scanner,
606 final ScanType scanType, final CompactionRequest request) throws IOException {
607 return execOperationWithResult(false, scanner,
608 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
609 @Override
610 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
611 throws IOException {
612 setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
613 }
614 });
615 }
616
617
618
619
620
621
622
623
624 public void postCompact(final Store store, final StoreFile resultFile,
625 final CompactionRequest request) throws IOException {
626 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
627 @Override
628 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
629 throws IOException {
630 oserver.postCompact(ctx, store, resultFile, request);
631 }
632 });
633 }
634
635
636
637
638
639 public InternalScanner preFlush(final Store store, final InternalScanner scanner)
640 throws IOException {
641 return execOperationWithResult(false, scanner,
642 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
643 @Override
644 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
645 throws IOException {
646 setResult(oserver.preFlush(ctx, store, getResult()));
647 }
648 });
649 }
650
651
652
653
654
655 public void preFlush() throws IOException {
656 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
657 @Override
658 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
659 throws IOException {
660 oserver.preFlush(ctx);
661 }
662 });
663 }
664
665
666
667
668
669
670 public InternalScanner preFlushScannerOpen(final Store store,
671 final KeyValueScanner memstoreScanner) throws IOException {
672 return execOperationWithResult(null,
673 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
674 @Override
675 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
676 throws IOException {
677 setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
678 }
679 });
680 }
681
682
683
684
685
686 public void postFlush() throws IOException {
687 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
688 @Override
689 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
690 throws IOException {
691 oserver.postFlush(ctx);
692 }
693 });
694 }
695
696
697
698
699
700 public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
701 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
702 @Override
703 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
704 throws IOException {
705 oserver.postFlush(ctx, store, storeFile);
706 }
707 });
708 }
709
710
711
712
713
714
715 public void preSplit() throws IOException {
716 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
717 @Override
718 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
719 throws IOException {
720 oserver.preSplit(ctx);
721 }
722 });
723 }
724
725
726
727
728
729 public void preSplit(final byte[] splitRow) throws IOException {
730 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
731 @Override
732 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
733 throws IOException {
734 oserver.preSplit(ctx, splitRow);
735 }
736 });
737 }
738
739
740
741
742
743
744
745 public void postSplit(final Region l, final Region r) throws IOException {
746 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
747 @Override
748 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
749 throws IOException {
750 oserver.postSplit(ctx, l, r);
751 }
752 });
753 }
754
755 public boolean preSplitBeforePONR(final byte[] splitKey,
756 final List<Mutation> metaEntries) throws IOException {
757 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
758 @Override
759 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
760 throws IOException {
761 oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
762 }
763 });
764 }
765
766 public void preSplitAfterPONR() throws IOException {
767 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
768 @Override
769 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
770 throws IOException {
771 oserver.preSplitAfterPONR(ctx);
772 }
773 });
774 }
775
776
777
778
779
780 public void preRollBackSplit() throws IOException {
781 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
782 @Override
783 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
784 throws IOException {
785 oserver.preRollBackSplit(ctx);
786 }
787 });
788 }
789
790
791
792
793
794 public void postRollBackSplit() throws IOException {
795 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
796 @Override
797 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
798 throws IOException {
799 oserver.postRollBackSplit(ctx);
800 }
801 });
802 }
803
804
805
806
807
808 public void postCompleteSplit() throws IOException {
809 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
810 @Override
811 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
812 throws IOException {
813 oserver.postCompleteSplit(ctx);
814 }
815 });
816 }
817
818
819
820
821
822
823
824
825
826
827 public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
828 final Result result) throws IOException {
829 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
830 @Override
831 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
832 throws IOException {
833 oserver.preGetClosestRowBefore(ctx, row, family, result);
834 }
835 });
836 }
837
838
839
840
841
842
843
844 public void postGetClosestRowBefore(final byte[] row, final byte[] family,
845 final Result result) throws IOException {
846 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
847 @Override
848 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
849 throws IOException {
850 oserver.postGetClosestRowBefore(ctx, row, family, result);
851 }
852 });
853 }
854
855
856
857
858
859
860 public boolean preGet(final Get get, final List<Cell> results)
861 throws IOException {
862 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
863 @Override
864 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
865 throws IOException {
866 oserver.preGetOp(ctx, get, results);
867 }
868 });
869 }
870
871
872
873
874
875
876 public void postGet(final Get get, final List<Cell> results)
877 throws IOException {
878 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
879 @Override
880 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
881 throws IOException {
882 oserver.postGetOp(ctx, get, results);
883 }
884 });
885 }
886
887
888
889
890
891
892
893 public Boolean preExists(final Get get) throws IOException {
894 return execOperationWithResult(true, false,
895 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
896 @Override
897 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
898 throws IOException {
899 setResult(oserver.preExists(ctx, get, getResult()));
900 }
901 });
902 }
903
904
905
906
907
908
909
910 public boolean postExists(final Get get, boolean exists)
911 throws IOException {
912 return execOperationWithResult(exists,
913 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
914 @Override
915 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
916 throws IOException {
917 setResult(oserver.postExists(ctx, get, getResult()));
918 }
919 });
920 }
921
922
923
924
925
926
927
928
929 public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
930 throws IOException {
931 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
932 @Override
933 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
934 throws IOException {
935 oserver.prePut(ctx, put, edit, durability);
936 }
937 });
938 }
939
940
941
942
943
944
945
946
947
948
949
950 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
951 final Cell kv, final byte[] byteNow, final Get get) throws IOException {
952 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
953 @Override
954 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
955 throws IOException {
956 oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
957 }
958 });
959 }
960
961
962
963
964
965
966
967 public void postPut(final Put put, final WALEdit edit, final Durability durability)
968 throws IOException {
969 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
970 @Override
971 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
972 throws IOException {
973 oserver.postPut(ctx, put, edit, durability);
974 }
975 });
976 }
977
978
979
980
981
982
983
984
985 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
986 throws IOException {
987 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
988 @Override
989 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
990 throws IOException {
991 oserver.preDelete(ctx, delete, edit, durability);
992 }
993 });
994 }
995
996
997
998
999
1000
1001
1002 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
1003 throws IOException {
1004 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1005 @Override
1006 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1007 throws IOException {
1008 oserver.postDelete(ctx, delete, edit, durability);
1009 }
1010 });
1011 }
1012
1013
1014
1015
1016
1017
1018 public boolean preBatchMutate(
1019 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1020 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1021 @Override
1022 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1023 throws IOException {
1024 oserver.preBatchMutate(ctx, miniBatchOp);
1025 }
1026 });
1027 }
1028
1029
1030
1031
1032
1033 public void postBatchMutate(
1034 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1035 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1036 @Override
1037 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1038 throws IOException {
1039 oserver.postBatchMutate(ctx, miniBatchOp);
1040 }
1041 });
1042 }
1043
1044 public void postBatchMutateIndispensably(
1045 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1046 throws IOException {
1047 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1048 @Override
1049 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1050 throws IOException {
1051 oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1052 }
1053 });
1054 }
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067 public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1068 final byte [] qualifier, final CompareOp compareOp,
1069 final ByteArrayComparable comparator, final Put put)
1070 throws IOException {
1071 return execOperationWithResult(true, false,
1072 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1073 @Override
1074 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1075 throws IOException {
1076 setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1077 compareOp, comparator, put, getResult()));
1078 }
1079 });
1080 }
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093 public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1094 final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1095 final Put put) throws IOException {
1096 return execOperationWithResult(true, false,
1097 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1098 @Override
1099 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1100 throws IOException {
1101 setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1102 compareOp, comparator, put, getResult()));
1103 }
1104 });
1105 }
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116 public boolean postCheckAndPut(final byte [] row, final byte [] family,
1117 final byte [] qualifier, final CompareOp compareOp,
1118 final ByteArrayComparable comparator, final Put put,
1119 boolean result) throws IOException {
1120 return execOperationWithResult(result,
1121 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1122 @Override
1123 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1124 throws IOException {
1125 setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1126 compareOp, comparator, put, getResult()));
1127 }
1128 });
1129 }
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142 public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1143 final byte [] qualifier, final CompareOp compareOp,
1144 final ByteArrayComparable comparator, final Delete delete)
1145 throws IOException {
1146 return execOperationWithResult(true, false,
1147 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1148 @Override
1149 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1150 throws IOException {
1151 setResult(oserver.preCheckAndDelete(ctx, row, family,
1152 qualifier, compareOp, comparator, delete, getResult()));
1153 }
1154 });
1155 }
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1169 final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1170 final Delete delete) throws IOException {
1171 return execOperationWithResult(true, false,
1172 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1173 @Override
1174 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1175 throws IOException {
1176 setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1177 family, qualifier, compareOp, comparator, delete, getResult()));
1178 }
1179 });
1180 }
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191 public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1192 final byte [] qualifier, final CompareOp compareOp,
1193 final ByteArrayComparable comparator, final Delete delete,
1194 boolean result) throws IOException {
1195 return execOperationWithResult(result,
1196 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1197 @Override
1198 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1199 throws IOException {
1200 setResult(oserver.postCheckAndDelete(ctx, row, family,
1201 qualifier, compareOp, comparator, delete, getResult()));
1202 }
1203 });
1204 }
1205
1206
1207
1208
1209
1210
1211
1212 public Result preAppend(final Append append) throws IOException {
1213 return execOperationWithResult(true, null,
1214 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1215 @Override
1216 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1217 throws IOException {
1218 setResult(oserver.preAppend(ctx, append));
1219 }
1220 });
1221 }
1222
1223
1224
1225
1226
1227
1228
1229 public Result preAppendAfterRowLock(final Append append) throws IOException {
1230 return execOperationWithResult(true, null,
1231 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1232 @Override
1233 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1234 throws IOException {
1235 setResult(oserver.preAppendAfterRowLock(ctx, append));
1236 }
1237 });
1238 }
1239
1240
1241
1242
1243
1244
1245
1246 public Result preIncrement(final Increment increment) throws IOException {
1247 return execOperationWithResult(true, null,
1248 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1249 @Override
1250 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1251 throws IOException {
1252 setResult(oserver.preIncrement(ctx, increment));
1253 }
1254 });
1255 }
1256
1257
1258
1259
1260
1261
1262
1263 public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1264 return execOperationWithResult(true, null,
1265 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1266 @Override
1267 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1268 throws IOException {
1269 setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1270 }
1271 });
1272 }
1273
1274
1275
1276
1277
1278
1279 public void postAppend(final Append append, final Result result) throws IOException {
1280 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1281 @Override
1282 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1283 throws IOException {
1284 oserver.postAppend(ctx, append, result);
1285 }
1286 });
1287 }
1288
1289
1290
1291
1292
1293
1294 public Result postIncrement(final Increment increment, Result result) throws IOException {
1295 return execOperationWithResult(result,
1296 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1297 @Override
1298 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1299 throws IOException {
1300 setResult(oserver.postIncrement(ctx, increment, getResult()));
1301 }
1302 });
1303 }
1304
1305
1306
1307
1308
1309
1310
1311 public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1312 return execOperationWithResult(true, null,
1313 coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1314 @Override
1315 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1316 throws IOException {
1317 setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1318 }
1319 });
1320 }
1321
1322
1323
1324
1325
1326
1327 public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1328 final NavigableSet<byte[]> targetCols) throws IOException {
1329 return execOperationWithResult(null,
1330 coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1331 @Override
1332 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1333 throws IOException {
1334 setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1335 }
1336 });
1337 }
1338
1339
1340
1341
1342
1343
1344
1345 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1346 return execOperationWithResult(s,
1347 coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1348 @Override
1349 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1350 throws IOException {
1351 setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1352 }
1353 });
1354 }
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364 public Boolean preScannerNext(final InternalScanner s,
1365 final List<Result> results, final int limit) throws IOException {
1366 return execOperationWithResult(true, false,
1367 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1368 @Override
1369 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1370 throws IOException {
1371 setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1372 }
1373 });
1374 }
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384 public boolean postScannerNext(final InternalScanner s,
1385 final List<Result> results, final int limit, boolean hasMore)
1386 throws IOException {
1387 return execOperationWithResult(hasMore,
1388 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1389 @Override
1390 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1391 throws IOException {
1392 setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1393 }
1394 });
1395 }
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407 public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1408 final int offset, final short length) throws IOException {
1409
1410 if (!hasCustomPostScannerFilterRow) return true;
1411 return execOperationWithResult(true,
1412 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1413 @Override
1414 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1415 throws IOException {
1416 setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1417 }
1418 });
1419 }
1420
1421
1422
1423
1424
1425
1426 public boolean preScannerClose(final InternalScanner s) throws IOException {
1427 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1428 @Override
1429 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1430 throws IOException {
1431 oserver.preScannerClose(ctx, s);
1432 }
1433 });
1434 }
1435
1436
1437
1438
1439 public void postScannerClose(final InternalScanner s) throws IOException {
1440 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1441 @Override
1442 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1443 throws IOException {
1444 oserver.postScannerClose(ctx, s);
1445 }
1446 });
1447 }
1448
1449
1450
1451
1452
1453
1454
1455
1456 public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1457 final WALEdit logEdit) throws IOException {
1458 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1459 @Override
1460 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1461 throws IOException {
1462
1463
1464 final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1465 if (env.useLegacyPre) {
1466 if (logKey instanceof HLogKey) {
1467 oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1468 } else {
1469 legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1470 }
1471 } else {
1472 oserver.preWALRestore(ctx, info, logKey, logEdit);
1473 }
1474 }
1475 });
1476 }
1477
1478
1479
1480
1481
1482 @Deprecated
1483 public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1484 final WALEdit logEdit) throws IOException {
1485 return preWALRestore(info, (WALKey)logKey, logEdit);
1486 }
1487
1488
1489
1490
1491
1492
1493
1494 public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1495 throws IOException {
1496 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1497 @Override
1498 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1499 throws IOException {
1500
1501
1502 final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1503 if (env.useLegacyPost) {
1504 if (logKey instanceof HLogKey) {
1505 oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1506 } else {
1507 legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1508 }
1509 } else {
1510 oserver.postWALRestore(ctx, info, logKey, logEdit);
1511 }
1512 }
1513 });
1514 }
1515
1516
1517
1518
1519 @Deprecated
1520 public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1521 throws IOException {
1522 postWALRestore(info, (WALKey)logKey, logEdit);
1523 }
1524
1525
1526
1527
1528
1529
1530 public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1531 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1532 @Override
1533 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1534 throws IOException {
1535 oserver.preBulkLoadHFile(ctx, familyPaths);
1536 }
1537 });
1538 }
1539
1540
1541
1542
1543
1544
1545
1546 public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1547 boolean hasLoaded) throws IOException {
1548 return execOperationWithResult(hasLoaded,
1549 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1550 @Override
1551 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1552 throws IOException {
1553 setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1554 }
1555 });
1556 }
1557
1558 public void postStartRegionOperation(final Operation op) throws IOException {
1559 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1560 @Override
1561 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1562 throws IOException {
1563 oserver.postStartRegionOperation(ctx, op);
1564 }
1565 });
1566 }
1567
1568 public void postCloseRegionOperation(final Operation op) throws IOException {
1569 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1570 @Override
1571 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1572 throws IOException {
1573 oserver.postCloseRegionOperation(ctx, op);
1574 }
1575 });
1576 }
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589 public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1590 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1591 final Reference r) throws IOException {
1592 return execOperationWithResult(null,
1593 coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1594 @Override
1595 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1596 throws IOException {
1597 setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1598 }
1599 });
1600 }
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613 public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1614 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1615 final Reference r, final StoreFile.Reader reader) throws IOException {
1616 return execOperationWithResult(reader,
1617 coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1618 @Override
1619 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1620 throws IOException {
1621 setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1622 }
1623 });
1624 }
1625
1626 public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1627 final Cell oldCell, Cell newCell) throws IOException {
1628 return execOperationWithResult(newCell,
1629 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1630 @Override
1631 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1632 throws IOException {
1633 setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1634 }
1635 });
1636 }
1637
1638 public Message preEndpointInvocation(final Service service, final String methodName,
1639 Message request) throws IOException {
1640 return execOperationWithResult(request,
1641 coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1642 @Override
1643 public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1644 throws IOException {
1645 setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1646 }
1647 });
1648 }
1649
1650 public void postEndpointInvocation(final Service service, final String methodName,
1651 final Message request, final Message.Builder responseBuilder) throws IOException {
1652 execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1653 @Override
1654 public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1655 throws IOException {
1656 oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1657 }
1658 });
1659 }
1660
1661 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1662 return execOperationWithResult(tracker,
1663 coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1664 @Override
1665 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1666 throws IOException {
1667 setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1668 }
1669 });
1670 }
1671
1672 public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1673 Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1674 for (RegionEnvironment env : coprocessors) {
1675 DescriptiveStatistics ds = new DescriptiveStatistics();
1676 if (env.getInstance() instanceof RegionObserver) {
1677 for (Long time : env.getExecutionLatenciesNanos()) {
1678 ds.addValue(time);
1679 }
1680
1681 if (ds.getN() == 0) {
1682 ds.addValue(0);
1683 }
1684 results.put(env.getInstance().getClass().getSimpleName(), ds);
1685 }
1686 }
1687 return results;
1688 }
1689
1690 private static abstract class CoprocessorOperation
1691 extends ObserverContext<RegionCoprocessorEnvironment> {
1692 public abstract void call(Coprocessor observer,
1693 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1694 public abstract boolean hasCall(Coprocessor observer);
1695 public void postEnvCall(RegionEnvironment env) { }
1696 }
1697
1698 private static abstract class RegionOperation extends CoprocessorOperation {
1699 public abstract void call(RegionObserver observer,
1700 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1701
1702 public boolean hasCall(Coprocessor observer) {
1703 return observer instanceof RegionObserver;
1704 }
1705
1706 public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1707 throws IOException {
1708 call((RegionObserver)observer, ctx);
1709 }
1710 }
1711
1712 private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1713 private T result = null;
1714 public void setResult(final T result) { this.result = result; }
1715 public T getResult() { return this.result; }
1716 }
1717
1718 private static abstract class EndpointOperation extends CoprocessorOperation {
1719 public abstract void call(EndpointObserver observer,
1720 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1721
1722 public boolean hasCall(Coprocessor observer) {
1723 return observer instanceof EndpointObserver;
1724 }
1725
1726 public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1727 throws IOException {
1728 call((EndpointObserver)observer, ctx);
1729 }
1730 }
1731
1732 private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1733 private T result = null;
1734 public void setResult(final T result) { this.result = result; }
1735 public T getResult() { return this.result; }
1736 }
1737
1738 private boolean execOperation(final CoprocessorOperation ctx)
1739 throws IOException {
1740 return execOperation(true, ctx);
1741 }
1742
1743 private <T> T execOperationWithResult(final T defaultValue,
1744 final RegionOperationWithResult<T> ctx) throws IOException {
1745 if (ctx == null) return defaultValue;
1746 ctx.setResult(defaultValue);
1747 execOperation(true, ctx);
1748 return ctx.getResult();
1749 }
1750
1751 private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1752 final RegionOperationWithResult<T> ctx) throws IOException {
1753 boolean bypass = false;
1754 T result = defaultValue;
1755 if (ctx != null) {
1756 ctx.setResult(defaultValue);
1757 bypass = execOperation(true, ctx);
1758 result = ctx.getResult();
1759 }
1760 return bypass == ifBypass ? result : null;
1761 }
1762
1763 private <T> T execOperationWithResult(final T defaultValue,
1764 final EndpointOperationWithResult<T> ctx) throws IOException {
1765 if (ctx == null) return defaultValue;
1766 ctx.setResult(defaultValue);
1767 execOperation(true, ctx);
1768 return ctx.getResult();
1769 }
1770
1771 private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1772 throws IOException {
1773 boolean bypass = false;
1774 for (RegionEnvironment env: coprocessors) {
1775 Coprocessor observer = env.getInstance();
1776 if (ctx.hasCall(observer)) {
1777 long startTime = System.nanoTime();
1778 ctx.prepare(env);
1779 Thread currentThread = Thread.currentThread();
1780 ClassLoader cl = currentThread.getContextClassLoader();
1781 try {
1782 currentThread.setContextClassLoader(env.getClassLoader());
1783 ctx.call(observer, ctx);
1784 } catch (Throwable e) {
1785 handleCoprocessorThrowable(env, e);
1786 } finally {
1787 currentThread.setContextClassLoader(cl);
1788 }
1789 env.offerExecutionLatency(System.nanoTime() - startTime);
1790 bypass |= ctx.shouldBypass();
1791 if (earlyExit && ctx.shouldComplete()) {
1792 break;
1793 }
1794 }
1795
1796 ctx.postEnvCall(env);
1797 }
1798 return bypass;
1799 }
1800 }