1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableSet;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29 import java.util.regex.Matcher;
30
31 import org.apache.commons.collections.map.AbstractReferenceMap;
32 import org.apache.commons.collections.map.ReferenceMap;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Coprocessor;
38 import org.apache.hadoop.hbase.CoprocessorEnvironment;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.client.Append;
44 import org.apache.hadoop.hbase.client.Delete;
45 import org.apache.hadoop.hbase.client.Get;
46 import org.apache.hadoop.hbase.client.Increment;
47 import org.apache.hadoop.hbase.client.Mutation;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Result;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.client.Durability;
52 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
53 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
54 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
56 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
57 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
58 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
60 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
61 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
62 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.Pair;
65 import org.apache.hadoop.util.StringUtils;
66
67 import com.google.common.collect.ImmutableList;
68
69
70
71
72
73 public class RegionCoprocessorHost
74 extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
75
76 private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
77
78 private static ReferenceMap sharedDataMap =
79 new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
80
81
82
83
84 static class RegionEnvironment extends CoprocessorHost.Environment
85 implements RegionCoprocessorEnvironment {
86
87 private HRegion region;
88 private RegionServerServices rsServices;
89 ConcurrentMap<String, Object> sharedData;
90
91
92
93
94
95
96 public RegionEnvironment(final Coprocessor impl, final int priority,
97 final int seq, final Configuration conf, final HRegion region,
98 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
99 super(impl, priority, seq, conf);
100 this.region = region;
101 this.rsServices = services;
102 this.sharedData = sharedData;
103 }
104
105
106 @Override
107 public HRegion getRegion() {
108 return region;
109 }
110
111
112 @Override
113 public RegionServerServices getRegionServerServices() {
114 return rsServices;
115 }
116
117 public void shutdown() {
118 super.shutdown();
119 }
120
121 @Override
122 public ConcurrentMap<String, Object> getSharedData() {
123 return sharedData;
124 }
125 }
126
127
128 RegionServerServices rsServices;
129
130 HRegion region;
131
132
133
134
135
136
137
138 public RegionCoprocessorHost(final HRegion region,
139 final RegionServerServices rsServices, final Configuration conf) {
140 this.conf = conf;
141 this.rsServices = rsServices;
142 this.region = region;
143 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
144
145
146 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
147
148
149 if (!HTableDescriptor.isMetaTable(region.getRegionInfo().getTableName())) {
150 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
151 }
152
153
154 loadTableCoprocessors(conf);
155 }
156
157 void loadTableCoprocessors(final Configuration conf) {
158
159
160 List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
161 for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
162 region.getTableDesc().getValues().entrySet()) {
163 String key = Bytes.toString(e.getKey().get()).trim();
164 String spec = Bytes.toString(e.getValue().get()).trim();
165 if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
166
167 try {
168 Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
169 if (matcher.matches()) {
170
171
172 Path path = matcher.group(1).trim().isEmpty() ?
173 null : new Path(matcher.group(1).trim());
174 String className = matcher.group(2).trim();
175 int priority = matcher.group(3).trim().isEmpty() ?
176 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
177 String cfgSpec = null;
178 try {
179 cfgSpec = matcher.group(4);
180 } catch (IndexOutOfBoundsException ex) {
181
182 }
183 if (cfgSpec != null) {
184 cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
185 Configuration newConf = new Configuration(conf);
186 Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
187 while (m.find()) {
188 newConf.set(m.group(1), m.group(2));
189 }
190 configured.add(load(path, className, priority, newConf));
191 } else {
192 configured.add(load(path, className, priority, conf));
193 }
194 LOG.info("Load coprocessor " + className + " from HTD of " +
195 Bytes.toString(region.getTableDesc().getName()) +
196 " successfully.");
197 } else {
198 throw new RuntimeException("specification does not match pattern");
199 }
200 } catch (Exception ex) {
201 LOG.warn("attribute '" + key +
202 "' has invalid coprocessor specification '" + spec + "'");
203 LOG.warn(StringUtils.stringifyException(ex));
204 }
205 }
206 }
207
208 coprocessors.addAll(configured);
209 }
210
211 @Override
212 public RegionEnvironment createEnvironment(Class<?> implClass,
213 Coprocessor instance, int priority, int seq, Configuration conf) {
214
215
216
217
218
219 for (Class c : implClass.getInterfaces()) {
220 if (CoprocessorService.class.isAssignableFrom(c)) {
221 region.registerService( ((CoprocessorService)instance).getService() );
222 }
223 }
224 ConcurrentMap<String, Object> classData;
225
226 synchronized (sharedDataMap) {
227
228
229 classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
230 if (classData == null) {
231 classData = new ConcurrentHashMap<String, Object>();
232 sharedDataMap.put(implClass.getName(), classData);
233 }
234 }
235 return new RegionEnvironment(instance, priority, seq, conf, region,
236 rsServices, classData);
237 }
238
239 @Override
240 protected void abortServer(final CoprocessorEnvironment env, final Throwable e) {
241 abortServer("regionserver", rsServices, env, e);
242 }
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257 private void handleCoprocessorThrowableNoRethrow(
258 final CoprocessorEnvironment env, final Throwable e) {
259 try {
260 handleCoprocessorThrowable(env,e);
261 } catch (IOException ioe) {
262
263 LOG.warn(
264 "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
265 e + ". Ignoring.",e);
266 }
267 }
268
269
270
271
272
273
274 public void preOpen() throws IOException {
275 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
276 for (RegionEnvironment env: coprocessors) {
277 if (env.getInstance() instanceof RegionObserver) {
278 ctx = ObserverContext.createAndPrepare(env, ctx);
279 try {
280 ((RegionObserver) env.getInstance()).preOpen(ctx);
281 } catch (Throwable e) {
282 handleCoprocessorThrowable(env, e);
283 }
284 if (ctx.shouldComplete()) {
285 break;
286 }
287 }
288 }
289 }
290
291
292
293
294 public void postOpen() {
295 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
296 for (RegionEnvironment env: coprocessors) {
297 if (env.getInstance() instanceof RegionObserver) {
298 ctx = ObserverContext.createAndPrepare(env, ctx);
299 try {
300 ((RegionObserver) env.getInstance()).postOpen(ctx);
301 } catch (Throwable e) {
302 handleCoprocessorThrowableNoRethrow(env, e);
303 }
304 if (ctx.shouldComplete()) {
305 break;
306 }
307 }
308 }
309 }
310
311
312
313
314
315 public void preClose(boolean abortRequested) throws IOException {
316 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
317 for (RegionEnvironment env: coprocessors) {
318 if (env.getInstance() instanceof RegionObserver) {
319 ctx = ObserverContext.createAndPrepare(env, ctx);
320 try {
321 ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested);
322 } catch (Throwable e) {
323 handleCoprocessorThrowable(env, e);
324 }
325 }
326 }
327 }
328
329
330
331
332
333 public void postClose(boolean abortRequested) {
334 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
335 for (RegionEnvironment env: coprocessors) {
336 if (env.getInstance() instanceof RegionObserver) {
337 ctx = ObserverContext.createAndPrepare(env, ctx);
338 try {
339 ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested);
340 } catch (Throwable e) {
341 handleCoprocessorThrowableNoRethrow(env, e);
342 }
343
344 }
345 shutdown(env);
346 }
347 }
348
349
350
351
352
353 public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
354 ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException {
355 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
356 InternalScanner s = null;
357 for (RegionEnvironment env: coprocessors) {
358 if (env.getInstance() instanceof RegionObserver) {
359 ctx = ObserverContext.createAndPrepare(env, ctx);
360 try {
361 s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
362 scanType, earliestPutTs, s, request);
363 } catch (Throwable e) {
364 handleCoprocessorThrowable(env,e);
365 }
366 if (ctx.shouldComplete()) {
367 break;
368 }
369 }
370 }
371 return s;
372 }
373
374
375
376
377
378
379
380
381
382
383 public boolean preCompactSelection(Store store, List<StoreFile> candidates,
384 CompactionRequest request) throws IOException {
385 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
386 boolean bypass = false;
387 for (RegionEnvironment env: coprocessors) {
388 if (env.getInstance() instanceof RegionObserver) {
389 ctx = ObserverContext.createAndPrepare(env, ctx);
390 try {
391 ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request);
392 } catch (Throwable e) {
393 handleCoprocessorThrowable(env,e);
394
395 }
396 bypass |= ctx.shouldBypass();
397 if (ctx.shouldComplete()) {
398 break;
399 }
400 }
401 }
402 return bypass;
403 }
404
405
406
407
408
409
410
411
412 public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
413 CompactionRequest request) {
414 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
415 for (RegionEnvironment env: coprocessors) {
416 if (env.getInstance() instanceof RegionObserver) {
417 ctx = ObserverContext.createAndPrepare(env, ctx);
418 try {
419 ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request);
420 } catch (Throwable e) {
421 handleCoprocessorThrowableNoRethrow(env,e);
422 }
423 if (ctx.shouldComplete()) {
424 break;
425 }
426 }
427 }
428 }
429
430
431
432
433
434
435
436
437
438 public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
439 CompactionRequest request) throws IOException {
440 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
441 boolean bypass = false;
442 for (RegionEnvironment env: coprocessors) {
443 if (env.getInstance() instanceof RegionObserver) {
444 ctx = ObserverContext.createAndPrepare(env, ctx);
445 try {
446 scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType,
447 request);
448 } catch (Throwable e) {
449 handleCoprocessorThrowable(env,e);
450 }
451 bypass |= ctx.shouldBypass();
452 if (ctx.shouldComplete()) {
453 break;
454 }
455 }
456 }
457 return bypass ? null : scanner;
458 }
459
460
461
462
463
464
465
466
467 public void postCompact(Store store, StoreFile resultFile, CompactionRequest request)
468 throws IOException {
469 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
470 for (RegionEnvironment env: coprocessors) {
471 if (env.getInstance() instanceof RegionObserver) {
472 ctx = ObserverContext.createAndPrepare(env, ctx);
473 try {
474 ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
475 } catch (Throwable e) {
476 handleCoprocessorThrowable(env, e);
477 }
478 if (ctx.shouldComplete()) {
479 break;
480 }
481 }
482 }
483 }
484
485
486
487
488
489 public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException {
490 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
491 boolean bypass = false;
492 for (RegionEnvironment env: coprocessors) {
493 if (env.getInstance() instanceof RegionObserver) {
494 ctx = ObserverContext.createAndPrepare(env, ctx);
495 try {
496 scanner = ((RegionObserver)env.getInstance()).preFlush(
497 ctx, store, scanner);
498 } catch (Throwable e) {
499 handleCoprocessorThrowable(env,e);
500 }
501 bypass |= ctx.shouldBypass();
502 if (ctx.shouldComplete()) {
503 break;
504 }
505 }
506 }
507 return bypass ? null : scanner;
508 }
509
510
511
512
513
514 public void preFlush() throws IOException {
515 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
516 for (RegionEnvironment env: coprocessors) {
517 if (env.getInstance() instanceof RegionObserver) {
518 ctx = ObserverContext.createAndPrepare(env, ctx);
519 try {
520 ((RegionObserver)env.getInstance()).preFlush(ctx);
521 } catch (Throwable e) {
522 handleCoprocessorThrowable(env, e);
523 }
524 if (ctx.shouldComplete()) {
525 break;
526 }
527 }
528 }
529 }
530
531
532
533
534
535
536 public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner)
537 throws IOException {
538 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
539 InternalScanner s = null;
540 for (RegionEnvironment env : coprocessors) {
541 if (env.getInstance() instanceof RegionObserver) {
542 ctx = ObserverContext.createAndPrepare(env, ctx);
543 try {
544 s = ((RegionObserver) env.getInstance())
545 .preFlushScannerOpen(ctx, store, memstoreScanner, s);
546 } catch (Throwable e) {
547 handleCoprocessorThrowable(env, e);
548 }
549 if (ctx.shouldComplete()) {
550 break;
551 }
552 }
553 }
554 return s;
555 }
556
557
558
559
560
561 public void postFlush() throws IOException {
562 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
563 for (RegionEnvironment env: coprocessors) {
564 if (env.getInstance() instanceof RegionObserver) {
565 ctx = ObserverContext.createAndPrepare(env, ctx);
566 try {
567 ((RegionObserver)env.getInstance()).postFlush(ctx);
568 } catch (Throwable e) {
569 handleCoprocessorThrowable(env, e);
570 }
571 if (ctx.shouldComplete()) {
572 break;
573 }
574 }
575 }
576 }
577
578
579
580
581
582 public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
583 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
584 for (RegionEnvironment env: coprocessors) {
585 if (env.getInstance() instanceof RegionObserver) {
586 ctx = ObserverContext.createAndPrepare(env, ctx);
587 try {
588 ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
589 } catch (Throwable e) {
590 handleCoprocessorThrowable(env, e);
591 }
592 if (ctx.shouldComplete()) {
593 break;
594 }
595 }
596 }
597 }
598
599
600
601
602
603 public void preSplit() throws IOException {
604 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
605 for (RegionEnvironment env: coprocessors) {
606 if (env.getInstance() instanceof RegionObserver) {
607 ctx = ObserverContext.createAndPrepare(env, ctx);
608 try {
609 ((RegionObserver)env.getInstance()).preSplit(ctx);
610 } catch (Throwable e) {
611 handleCoprocessorThrowable(env, e);
612 }
613 if (ctx.shouldComplete()) {
614 break;
615 }
616 }
617 }
618 }
619
620
621
622
623
624 public void preSplit(byte[] splitRow) throws IOException {
625 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
626 for (RegionEnvironment env: coprocessors) {
627 if (env.getInstance() instanceof RegionObserver) {
628 ctx = ObserverContext.createAndPrepare(env, ctx);
629 try {
630 ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow);
631 } catch (Throwable e) {
632 handleCoprocessorThrowable(env, e);
633 }
634 if (ctx.shouldComplete()) {
635 break;
636 }
637 }
638 }
639 }
640
641
642
643
644
645
646
647 public void postSplit(HRegion l, HRegion r) throws IOException {
648 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
649 for (RegionEnvironment env: coprocessors) {
650 if (env.getInstance() instanceof RegionObserver) {
651 ctx = ObserverContext.createAndPrepare(env, ctx);
652 try {
653 ((RegionObserver)env.getInstance()).postSplit(ctx, l, r);
654 } catch (Throwable e) {
655 handleCoprocessorThrowable(env, e);
656 }
657 if (ctx.shouldComplete()) {
658 break;
659 }
660 }
661 }
662 }
663
664
665
666
667
668 public void preRollBackSplit() throws IOException {
669 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
670 for (RegionEnvironment env : coprocessors) {
671 if (env.getInstance() instanceof RegionObserver) {
672 ctx = ObserverContext.createAndPrepare(env, ctx);
673 try {
674 ((RegionObserver) env.getInstance()).preRollBackSplit(ctx);
675 } catch (Throwable e) {
676 handleCoprocessorThrowable(env, e);
677 }
678 if (ctx.shouldComplete()) {
679 break;
680 }
681 }
682 }
683 }
684
685
686
687
688
689 public void postRollBackSplit() throws IOException {
690 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
691 for (RegionEnvironment env : coprocessors) {
692 if (env.getInstance() instanceof RegionObserver) {
693 ctx = ObserverContext.createAndPrepare(env, ctx);
694 try {
695 ((RegionObserver) env.getInstance()).postRollBackSplit(ctx);
696 } catch (Throwable e) {
697 handleCoprocessorThrowable(env, e);
698 }
699 if (ctx.shouldComplete()) {
700 break;
701 }
702 }
703 }
704 }
705
706
707
708
709
710 public void postCompleteSplit() throws IOException {
711 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
712 for (RegionEnvironment env : coprocessors) {
713 if (env.getInstance() instanceof RegionObserver) {
714 ctx = ObserverContext.createAndPrepare(env, ctx);
715 try {
716 ((RegionObserver) env.getInstance()).postCompleteSplit(ctx);
717 } catch (Throwable e) {
718 handleCoprocessorThrowable(env, e);
719 }
720 if (ctx.shouldComplete()) {
721 break;
722 }
723 }
724 }
725 }
726
727
728
729
730
731
732
733
734
735 public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
736 final Result result) throws IOException {
737 boolean bypass = false;
738 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
739 for (RegionEnvironment env: coprocessors) {
740 if (env.getInstance() instanceof RegionObserver) {
741 ctx = ObserverContext.createAndPrepare(env, ctx);
742 try {
743 ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row,
744 family, result);
745 } catch (Throwable e) {
746 handleCoprocessorThrowable(env, e);
747 }
748 bypass |= ctx.shouldBypass();
749 if (ctx.shouldComplete()) {
750 break;
751 }
752 }
753 }
754 return bypass;
755 }
756
757
758
759
760
761
762
763 public void postGetClosestRowBefore(final byte[] row, final byte[] family,
764 final Result result) throws IOException {
765 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
766 for (RegionEnvironment env: coprocessors) {
767 if (env.getInstance() instanceof RegionObserver) {
768 ctx = ObserverContext.createAndPrepare(env, ctx);
769 try {
770 ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row,
771 family, result);
772 } catch (Throwable e) {
773 handleCoprocessorThrowable(env, e);
774 }
775 if (ctx.shouldComplete()) {
776 break;
777 }
778 }
779 }
780 }
781
782
783
784
785
786
787 public boolean preGet(final Get get, final List<KeyValue> results)
788 throws IOException {
789 boolean bypass = false;
790 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
791 for (RegionEnvironment env: coprocessors) {
792 if (env.getInstance() instanceof RegionObserver) {
793 ctx = ObserverContext.createAndPrepare(env, ctx);
794 try {
795 ((RegionObserver)env.getInstance()).preGet(ctx, get, results);
796 } catch (Throwable e) {
797 handleCoprocessorThrowable(env, e);
798 }
799 bypass |= ctx.shouldBypass();
800 if (ctx.shouldComplete()) {
801 break;
802 }
803 }
804 }
805 return bypass;
806 }
807
808
809
810
811
812
813 public void postGet(final Get get, final List<KeyValue> results)
814 throws IOException {
815 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
816 for (RegionEnvironment env: coprocessors) {
817 if (env.getInstance() instanceof RegionObserver) {
818 ctx = ObserverContext.createAndPrepare(env, ctx);
819 try {
820 ((RegionObserver)env.getInstance()).postGet(ctx, get, results);
821 } catch (Throwable e) {
822 handleCoprocessorThrowable(env, e);
823 }
824 if (ctx.shouldComplete()) {
825 break;
826 }
827 }
828 }
829 }
830
831
832
833
834
835
836
837 public Boolean preExists(final Get get) throws IOException {
838 boolean bypass = false;
839 boolean exists = false;
840 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
841 for (RegionEnvironment env: coprocessors) {
842 if (env.getInstance() instanceof RegionObserver) {
843 ctx = ObserverContext.createAndPrepare(env, ctx);
844 try {
845 exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists);
846 } catch (Throwable e) {
847 handleCoprocessorThrowable(env, e);
848 }
849 bypass |= ctx.shouldBypass();
850 if (ctx.shouldComplete()) {
851 break;
852 }
853 }
854 }
855 return bypass ? exists : null;
856 }
857
858
859
860
861
862
863
864 public boolean postExists(final Get get, boolean exists)
865 throws IOException {
866 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
867 for (RegionEnvironment env: coprocessors) {
868 if (env.getInstance() instanceof RegionObserver) {
869 ctx = ObserverContext.createAndPrepare(env, ctx);
870 try {
871 exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists);
872 } catch (Throwable e) {
873 handleCoprocessorThrowable(env, e);
874 }
875 if (ctx.shouldComplete()) {
876 break;
877 }
878 }
879 }
880 return exists;
881 }
882
883
884
885
886
887
888
889
890 public boolean prePut(Put put, WALEdit edit,
891 final Durability durability) throws IOException {
892 boolean bypass = false;
893 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
894 for (RegionEnvironment env: coprocessors) {
895 if (env.getInstance() instanceof RegionObserver) {
896 ctx = ObserverContext.createAndPrepare(env, ctx);
897 try {
898 ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability);
899 } catch (Throwable e) {
900 handleCoprocessorThrowable(env, e);
901 }
902 bypass |= ctx.shouldBypass();
903 if (ctx.shouldComplete()) {
904 break;
905 }
906 }
907 }
908 return bypass;
909 }
910
911
912
913
914
915
916
917 public void postPut(Put put, WALEdit edit,
918 final Durability durability) throws IOException {
919 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
920 for (RegionEnvironment env: coprocessors) {
921 if (env.getInstance() instanceof RegionObserver) {
922 ctx = ObserverContext.createAndPrepare(env, ctx);
923 try {
924 ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability);
925 } catch (Throwable e) {
926 handleCoprocessorThrowable(env, e);
927 }
928 if (ctx.shouldComplete()) {
929 break;
930 }
931 }
932 }
933 }
934
935
936
937
938
939
940
941
942 public boolean preDelete(Delete delete, WALEdit edit,
943 final Durability durability) throws IOException {
944 boolean bypass = false;
945 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
946 for (RegionEnvironment env: coprocessors) {
947 if (env.getInstance() instanceof RegionObserver) {
948 ctx = ObserverContext.createAndPrepare(env, ctx);
949 try {
950 ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability);
951 } catch (Throwable e) {
952 handleCoprocessorThrowable(env, e);
953 }
954 bypass |= ctx.shouldBypass();
955 if (ctx.shouldComplete()) {
956 break;
957 }
958 }
959 }
960 return bypass;
961 }
962
963
964
965
966
967
968
969 public void postDelete(Delete delete, WALEdit edit,
970 final Durability durability) throws IOException {
971 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
972 for (RegionEnvironment env: coprocessors) {
973 if (env.getInstance() instanceof RegionObserver) {
974 ctx = ObserverContext.createAndPrepare(env, ctx);
975 try {
976 ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability);
977 } catch (Throwable e) {
978 handleCoprocessorThrowable(env, e);
979 }
980 if (ctx.shouldComplete()) {
981 break;
982 }
983 }
984 }
985 }
986
987
988
989
990
991
992 public boolean preBatchMutate(
993 final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
994 boolean bypass = false;
995 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
996 for (RegionEnvironment env : coprocessors) {
997 if (env.getInstance() instanceof RegionObserver) {
998 ctx = ObserverContext.createAndPrepare(env, ctx);
999 try {
1000 ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
1001 } catch (Throwable e) {
1002 handleCoprocessorThrowable(env, e);
1003 }
1004 bypass |= ctx.shouldBypass();
1005 if (ctx.shouldComplete()) {
1006 break;
1007 }
1008 }
1009 }
1010 return bypass;
1011 }
1012
1013
1014
1015
1016
1017 public void postBatchMutate(
1018 final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
1019 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1020 for (RegionEnvironment env : coprocessors) {
1021 if (env.getInstance() instanceof RegionObserver) {
1022 ctx = ObserverContext.createAndPrepare(env, ctx);
1023 try {
1024 ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
1025 } catch (Throwable e) {
1026 handleCoprocessorThrowable(env, e);
1027 }
1028 if (ctx.shouldComplete()) {
1029 break;
1030 }
1031 }
1032 }
1033 }
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046 public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1047 final byte [] qualifier, final CompareOp compareOp,
1048 final ByteArrayComparable comparator, Put put)
1049 throws IOException {
1050 boolean bypass = false;
1051 boolean result = false;
1052 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1053 for (RegionEnvironment env: coprocessors) {
1054 if (env.getInstance() instanceof RegionObserver) {
1055 ctx = ObserverContext.createAndPrepare(env, ctx);
1056 try {
1057 result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family,
1058 qualifier, compareOp, comparator, put, result);
1059 } catch (Throwable e) {
1060 handleCoprocessorThrowable(env, e);
1061 }
1062
1063
1064 bypass |= ctx.shouldBypass();
1065 if (ctx.shouldComplete()) {
1066 break;
1067 }
1068 }
1069 }
1070 return bypass ? result : null;
1071 }
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082 public boolean postCheckAndPut(final byte [] row, final byte [] family,
1083 final byte [] qualifier, final CompareOp compareOp,
1084 final ByteArrayComparable comparator, final Put put,
1085 boolean result)
1086 throws IOException {
1087 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1088 for (RegionEnvironment env: coprocessors) {
1089 if (env.getInstance() instanceof RegionObserver) {
1090 ctx = ObserverContext.createAndPrepare(env, ctx);
1091 try {
1092 result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row,
1093 family, qualifier, compareOp, comparator, put, result);
1094 } catch (Throwable e) {
1095 handleCoprocessorThrowable(env, e);
1096 }
1097 if (ctx.shouldComplete()) {
1098 break;
1099 }
1100 }
1101 }
1102 return result;
1103 }
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116 public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1117 final byte [] qualifier, final CompareOp compareOp,
1118 final ByteArrayComparable comparator, Delete delete)
1119 throws IOException {
1120 boolean bypass = false;
1121 boolean result = false;
1122 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1123 for (RegionEnvironment env: coprocessors) {
1124 if (env.getInstance() instanceof RegionObserver) {
1125 ctx = ObserverContext.createAndPrepare(env, ctx);
1126 try {
1127 result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row,
1128 family, qualifier, compareOp, comparator, delete, result);
1129 } catch (Throwable e) {
1130 handleCoprocessorThrowable(env, e);
1131 }
1132 bypass |= ctx.shouldBypass();
1133 if (ctx.shouldComplete()) {
1134 break;
1135 }
1136 }
1137 }
1138 return bypass ? result : null;
1139 }
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150 public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1151 final byte [] qualifier, final CompareOp compareOp,
1152 final ByteArrayComparable comparator, final Delete delete,
1153 boolean result)
1154 throws IOException {
1155 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1156 for (RegionEnvironment env: coprocessors) {
1157 if (env.getInstance() instanceof RegionObserver) {
1158 ctx = ObserverContext.createAndPrepare(env, ctx);
1159 try {
1160 result = ((RegionObserver)env.getInstance())
1161 .postCheckAndDelete(ctx, row, family, qualifier, compareOp,
1162 comparator, delete, result);
1163 } catch (Throwable e) {
1164 handleCoprocessorThrowable(env, e);
1165 }
1166 if (ctx.shouldComplete()) {
1167 break;
1168 }
1169 }
1170 }
1171 return result;
1172 }
1173
1174
1175
1176
1177
1178
1179
1180 public Result preAppend(Append append)
1181 throws IOException {
1182 boolean bypass = false;
1183 Result result = null;
1184 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1185 for (RegionEnvironment env: coprocessors) {
1186 if (env.getInstance() instanceof RegionObserver) {
1187 ctx = ObserverContext.createAndPrepare(env, ctx);
1188 try {
1189 result = ((RegionObserver)env.getInstance()).preAppend(ctx, append);
1190 } catch (Throwable e) {
1191 handleCoprocessorThrowable(env, e);
1192 }
1193 bypass |= ctx.shouldBypass();
1194 if (ctx.shouldComplete()) {
1195 break;
1196 }
1197 }
1198 }
1199 return bypass ? result : null;
1200 }
1201
1202
1203
1204
1205
1206
1207
1208 public Result preIncrement(Increment increment)
1209 throws IOException {
1210 boolean bypass = false;
1211 Result result = null;
1212 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1213 for (RegionEnvironment env: coprocessors) {
1214 if (env.getInstance() instanceof RegionObserver) {
1215 ctx = ObserverContext.createAndPrepare(env, ctx);
1216 try {
1217 result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment);
1218 } catch (Throwable e) {
1219 handleCoprocessorThrowable(env, e);
1220 }
1221 bypass |= ctx.shouldBypass();
1222 if (ctx.shouldComplete()) {
1223 break;
1224 }
1225 }
1226 }
1227 return bypass ? result : null;
1228 }
1229
1230
1231
1232
1233
1234
1235 public void postAppend(final Append append, Result result)
1236 throws IOException {
1237 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1238 for (RegionEnvironment env: coprocessors) {
1239 if (env.getInstance() instanceof RegionObserver) {
1240 ctx = ObserverContext.createAndPrepare(env, ctx);
1241 try {
1242 ((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
1243 } catch (Throwable e) {
1244 handleCoprocessorThrowable(env, e);
1245 }
1246 if (ctx.shouldComplete()) {
1247 break;
1248 }
1249 }
1250 }
1251 }
1252
1253
1254
1255
1256
1257
1258 public Result postIncrement(final Increment increment, Result result)
1259 throws IOException {
1260 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1261 for (RegionEnvironment env: coprocessors) {
1262 if (env.getInstance() instanceof RegionObserver) {
1263 ctx = ObserverContext.createAndPrepare(env, ctx);
1264 try {
1265 result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result);
1266 } catch (Throwable e) {
1267 handleCoprocessorThrowable(env, e);
1268 }
1269 if (ctx.shouldComplete()) {
1270 break;
1271 }
1272 }
1273 }
1274 return result;
1275 }
1276
1277
1278
1279
1280
1281
1282
1283 public RegionScanner preScannerOpen(Scan scan) throws IOException {
1284 boolean bypass = false;
1285 RegionScanner s = null;
1286 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1287 for (RegionEnvironment env: coprocessors) {
1288 if (env.getInstance() instanceof RegionObserver) {
1289 ctx = ObserverContext.createAndPrepare(env, ctx);
1290 try {
1291 s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s);
1292 } catch (Throwable e) {
1293 handleCoprocessorThrowable(env, e);
1294 }
1295 bypass |= ctx.shouldBypass();
1296 if (ctx.shouldComplete()) {
1297 break;
1298 }
1299 }
1300 }
1301 return bypass ? s : null;
1302 }
1303
1304
1305
1306
1307
1308
1309 public KeyValueScanner preStoreScannerOpen(Store store, Scan scan,
1310 final NavigableSet<byte[]> targetCols) throws IOException {
1311 KeyValueScanner s = null;
1312 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1313 for (RegionEnvironment env: coprocessors) {
1314 if (env.getInstance() instanceof RegionObserver) {
1315 ctx = ObserverContext.createAndPrepare(env, ctx);
1316 try {
1317 s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
1318 targetCols, s);
1319 } catch (Throwable e) {
1320 handleCoprocessorThrowable(env, e);
1321 }
1322 if (ctx.shouldComplete()) {
1323 break;
1324 }
1325 }
1326 }
1327 return s;
1328 }
1329
1330
1331
1332
1333
1334
1335
1336 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s)
1337 throws IOException {
1338 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1339 for (RegionEnvironment env: coprocessors) {
1340 if (env.getInstance() instanceof RegionObserver) {
1341 ctx = ObserverContext.createAndPrepare(env, ctx);
1342 try {
1343 s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s);
1344 } catch (Throwable e) {
1345 handleCoprocessorThrowable(env, e);
1346 }
1347 if (ctx.shouldComplete()) {
1348 break;
1349 }
1350 }
1351 }
1352 return s;
1353 }
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363 public Boolean preScannerNext(final InternalScanner s,
1364 final List<Result> results, int limit) throws IOException {
1365 boolean bypass = false;
1366 boolean hasNext = false;
1367 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1368 for (RegionEnvironment env: coprocessors) {
1369 if (env.getInstance() instanceof RegionObserver) {
1370 ctx = ObserverContext.createAndPrepare(env, ctx);
1371 try {
1372 hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results,
1373 limit, hasNext);
1374 } catch (Throwable e) {
1375 handleCoprocessorThrowable(env, e);
1376 }
1377 bypass |= ctx.shouldBypass();
1378 if (ctx.shouldComplete()) {
1379 break;
1380 }
1381 }
1382 }
1383 return bypass ? hasNext : null;
1384 }
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394 public boolean postScannerNext(final InternalScanner s,
1395 final List<Result> results, final int limit, boolean hasMore)
1396 throws IOException {
1397 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1398 for (RegionEnvironment env: coprocessors) {
1399 if (env.getInstance() instanceof RegionObserver) {
1400 ctx = ObserverContext.createAndPrepare(env, ctx);
1401 try {
1402 hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s,
1403 results, limit, hasMore);
1404 } catch (Throwable e) {
1405 handleCoprocessorThrowable(env, e);
1406 }
1407 if (ctx.shouldComplete()) {
1408 break;
1409 }
1410 }
1411 }
1412 return hasMore;
1413 }
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423 public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
1424 throws IOException {
1425 boolean hasMore = true;
1426 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1427 for (RegionEnvironment env : coprocessors) {
1428 if (env.getInstance() instanceof RegionObserver) {
1429 ctx = ObserverContext.createAndPrepare(env, ctx);
1430 try {
1431 hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
1432 hasMore);
1433 } catch (Throwable e) {
1434 handleCoprocessorThrowable(env, e);
1435 }
1436 if (ctx.shouldComplete()) {
1437 break;
1438 }
1439 }
1440 }
1441 return hasMore;
1442 }
1443
1444
1445
1446
1447
1448
1449 public boolean preScannerClose(final InternalScanner s)
1450 throws IOException {
1451 boolean bypass = false;
1452 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1453 for (RegionEnvironment env: coprocessors) {
1454 if (env.getInstance() instanceof RegionObserver) {
1455 ctx = ObserverContext.createAndPrepare(env, ctx);
1456 try {
1457 ((RegionObserver)env.getInstance()).preScannerClose(ctx, s);
1458 } catch (Throwable e) {
1459 handleCoprocessorThrowable(env, e);
1460 }
1461 bypass |= ctx.shouldBypass();
1462 if (ctx.shouldComplete()) {
1463 break;
1464 }
1465 }
1466 }
1467 return bypass;
1468 }
1469
1470
1471
1472
1473
1474 public void postScannerClose(final InternalScanner s)
1475 throws IOException {
1476 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1477 for (RegionEnvironment env: coprocessors) {
1478 if (env.getInstance() instanceof RegionObserver) {
1479 ctx = ObserverContext.createAndPrepare(env, ctx);
1480 try {
1481 ((RegionObserver)env.getInstance()).postScannerClose(ctx, s);
1482 } catch (Throwable e) {
1483 handleCoprocessorThrowable(env, e);
1484 }
1485 if (ctx.shouldComplete()) {
1486 break;
1487 }
1488 }
1489 }
1490 }
1491
1492
1493
1494
1495
1496
1497
1498
1499 public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
1500 WALEdit logEdit) throws IOException {
1501 boolean bypass = false;
1502 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1503 for (RegionEnvironment env: coprocessors) {
1504 if (env.getInstance() instanceof RegionObserver) {
1505 ctx = ObserverContext.createAndPrepare(env, ctx);
1506 try {
1507 ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey,
1508 logEdit);
1509 } catch (Throwable e) {
1510 handleCoprocessorThrowable(env, e);
1511 }
1512 bypass |= ctx.shouldBypass();
1513 if (ctx.shouldComplete()) {
1514 break;
1515 }
1516 }
1517 }
1518 return bypass;
1519 }
1520
1521
1522
1523
1524
1525
1526
1527 public void postWALRestore(HRegionInfo info, HLogKey logKey,
1528 WALEdit logEdit) throws IOException {
1529 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1530 for (RegionEnvironment env: coprocessors) {
1531 if (env.getInstance() instanceof RegionObserver) {
1532 ctx = ObserverContext.createAndPrepare(env, ctx);
1533 try {
1534 ((RegionObserver)env.getInstance()).postWALRestore(ctx, info,
1535 logKey, logEdit);
1536 } catch (Throwable e) {
1537 handleCoprocessorThrowable(env, e);
1538 }
1539 if (ctx.shouldComplete()) {
1540 break;
1541 }
1542 }
1543 }
1544 }
1545
1546
1547
1548
1549
1550
1551 public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws IOException {
1552 boolean bypass = false;
1553 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1554 for (RegionEnvironment env: coprocessors) {
1555 if (env.getInstance() instanceof RegionObserver) {
1556 ctx = ObserverContext.createAndPrepare(env, ctx);
1557 try {
1558 ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
1559 } catch (Throwable e) {
1560 handleCoprocessorThrowable(env, e);
1561 }
1562 bypass |= ctx.shouldBypass();
1563 if (ctx.shouldComplete()) {
1564 break;
1565 }
1566 }
1567 }
1568
1569 return bypass;
1570 }
1571
1572
1573
1574
1575
1576
1577
1578 public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean hasLoaded)
1579 throws IOException {
1580 ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1581 for (RegionEnvironment env: coprocessors) {
1582 if (env.getInstance() instanceof RegionObserver) {
1583 ctx = ObserverContext.createAndPrepare(env, ctx);
1584 try {
1585 hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
1586 familyPaths, hasLoaded);
1587 } catch (Throwable e) {
1588 handleCoprocessorThrowable(env, e);
1589 }
1590 if (ctx.shouldComplete()) {
1591 break;
1592 }
1593 }
1594 }
1595
1596 return hasLoaded;
1597 }
1598
1599 }