1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentMap;
29  import java.util.regex.Matcher;
30  
31  import org.apache.commons.collections.map.AbstractReferenceMap;
32  import org.apache.commons.collections.map.ReferenceMap;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Coprocessor;
38  import org.apache.hadoop.hbase.CoprocessorEnvironment;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.client.Append;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.Get;
46  import org.apache.hadoop.hbase.client.Increment;
47  import org.apache.hadoop.hbase.client.Mutation;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.client.Scan;
51  import org.apache.hadoop.hbase.client.Durability;
52  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
53  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
54  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
56  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
57  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
58  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
60  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
61  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
62  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.util.StringUtils;
66  
67  import com.google.common.collect.ImmutableList;
68  
69  /**
70   * Implements the coprocessor environment and runtime support for coprocessors
71   * loaded within a {@link HRegion}.
72   */
73  public class RegionCoprocessorHost
74      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
75  
76    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
77    // The shared data map
78    private static ReferenceMap sharedDataMap =
79        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
80  
81    /**
82     * Encapsulation of the environment of each coprocessor
83     */
84    static class RegionEnvironment extends CoprocessorHost.Environment
85        implements RegionCoprocessorEnvironment {
86  
87      private HRegion region;
88      private RegionServerServices rsServices;
89      ConcurrentMap<String, Object> sharedData;
90  
91      /**
92       * Constructor
93       * @param impl the coprocessor instance
94       * @param priority chaining priority
95       */
96      public RegionEnvironment(final Coprocessor impl, final int priority,
97          final int seq, final Configuration conf, final HRegion region,
98          final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
99        super(impl, priority, seq, conf);
100       this.region = region;
101       this.rsServices = services;
102       this.sharedData = sharedData;
103     }
104 
105     /** @return the region */
106     @Override
107     public HRegion getRegion() {
108       return region;
109     }
110 
111     /** @return reference to the region server services */
112     @Override
113     public RegionServerServices getRegionServerServices() {
114       return rsServices;
115     }
116 
117     public void shutdown() {
118       super.shutdown();
119     }
120 
121     @Override
122     public ConcurrentMap<String, Object> getSharedData() {
123       return sharedData;
124     }
125   }
126 
127   /** The region server services */
128   RegionServerServices rsServices;
129   /** The region */
130   HRegion region;
131 
132   /**
133    * Constructor
134    * @param region the region
135    * @param rsServices interface to available region server functionality
136    * @param conf the configuration
137    */
138   public RegionCoprocessorHost(final HRegion region,
139       final RegionServerServices rsServices, final Configuration conf) {
140     this.conf = conf;
141     this.rsServices = rsServices;
142     this.region = region;
143     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
144 
145     // load system default cp's from configuration.
146     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
147 
148     // load system default cp's for user tables from configuration.
149     if (!HTableDescriptor.isMetaTable(region.getRegionInfo().getTableName())) {
150       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
151     }
152 
153     // load Coprocessor From HDFS
154     loadTableCoprocessors(conf);
155   }
156 
157   void loadTableCoprocessors(final Configuration conf) {
158     // scan the table attributes for coprocessor load specifications
159     // initialize the coprocessors
160     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
161     for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
162         region.getTableDesc().getValues().entrySet()) {
163       String key = Bytes.toString(e.getKey().get()).trim();
164       String spec = Bytes.toString(e.getValue().get()).trim();
165       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
166         // found one
167         try {
168           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
169           if (matcher.matches()) {
170             // jar file path can be empty if the cp class can be loaded
171             // from class loader.
172             Path path = matcher.group(1).trim().isEmpty() ?
173                 null : new Path(matcher.group(1).trim());
174             String className = matcher.group(2).trim();
175             int priority = matcher.group(3).trim().isEmpty() ?
176                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
177             String cfgSpec = null;
178             try {
179               cfgSpec = matcher.group(4);
180             } catch (IndexOutOfBoundsException ex) {
181               // ignore
182             }
183             if (cfgSpec != null) {
184               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
185               Configuration newConf = new Configuration(conf);
186               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
187               while (m.find()) {
188                 newConf.set(m.group(1), m.group(2));
189               }
190               configured.add(load(path, className, priority, newConf));
191             } else {
192               configured.add(load(path, className, priority, conf));
193             }
194             LOG.info("Load coprocessor " + className + " from HTD of " +
195               Bytes.toString(region.getTableDesc().getName()) +
196                 " successfully.");
197           } else {
198             throw new RuntimeException("specification does not match pattern");
199           }
200         } catch (Exception ex) {
201           LOG.warn("attribute '" + key +
202             "' has invalid coprocessor specification '" + spec + "'");
203           LOG.warn(StringUtils.stringifyException(ex));
204         }
205       }
206     }
207     // add together to coprocessor set for COW efficiency
208     coprocessors.addAll(configured);
209   }
210 
211   @Override
212   public RegionEnvironment createEnvironment(Class<?> implClass,
213       Coprocessor instance, int priority, int seq, Configuration conf) {
214     // Check if it's an Endpoint.
215     // Due to current dynamic protocol design, Endpoint
216     // uses a different way to be registered and executed.
217     // It uses a visitor pattern to invoke registered Endpoint
218     // method.
219     for (Class c : implClass.getInterfaces()) {
220       if (CoprocessorService.class.isAssignableFrom(c)) {
221         region.registerService( ((CoprocessorService)instance).getService() );
222       }
223     }
224     ConcurrentMap<String, Object> classData;
225     // make sure only one thread can add maps
226     synchronized (sharedDataMap) {
227       // as long as at least one RegionEnvironment holds on to its classData it will
228       // remain in this map
229       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
230       if (classData == null) {
231         classData = new ConcurrentHashMap<String, Object>();
232         sharedDataMap.put(implClass.getName(), classData);
233       }
234     }
235     return new RegionEnvironment(instance, priority, seq, conf, region,
236         rsServices, classData);
237   }
238 
239   @Override
240   protected void abortServer(final CoprocessorEnvironment env, final Throwable e) {
241     abortServer("regionserver", rsServices, env, e);
242   }
243 
244   /**
245    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
246    *
247    * For example, {@link
248    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
249    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
250    *
251    * See also
252    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
253    *    CoprocessorEnvironment, Throwable)}
254    * @param env The coprocessor that threw the exception.
255    * @param e The exception that was thrown.
256    */
257   private void handleCoprocessorThrowableNoRethrow(
258       final CoprocessorEnvironment env, final Throwable e) {
259     try {
260       handleCoprocessorThrowable(env,e);
261     } catch (IOException ioe) {
262       // We cannot throw exceptions from the caller hook, so ignore.
263       LOG.warn(
264         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
265         e + ". Ignoring.",e);
266     }
267   }
268 
269   /**
270    * Invoked before a region open.
271    *
272    * @throws IOException Signals that an I/O exception has occurred.
273    */
274   public void preOpen() throws IOException {
275     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
276     for (RegionEnvironment env: coprocessors) {
277       if (env.getInstance() instanceof RegionObserver) {
278         ctx = ObserverContext.createAndPrepare(env, ctx);
279          try {
280           ((RegionObserver) env.getInstance()).preOpen(ctx);
281          } catch (Throwable e) {
282            handleCoprocessorThrowable(env, e);
283          }
284         if (ctx.shouldComplete()) {
285           break;
286         }
287       }
288     }
289   }
290 
291   /**
292    * Invoked after a region open
293    */
294   public void postOpen() {
295     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
296     for (RegionEnvironment env: coprocessors) {
297       if (env.getInstance() instanceof RegionObserver) {
298         ctx = ObserverContext.createAndPrepare(env, ctx);
299         try {
300           ((RegionObserver) env.getInstance()).postOpen(ctx);
301         } catch (Throwable e) {
302           handleCoprocessorThrowableNoRethrow(env, e);
303         }
304         if (ctx.shouldComplete()) {
305           break;
306         }
307       }
308     }
309   }
310 
311   /**
312    * Invoked before a region is closed
313    * @param abortRequested true if the server is aborting
314    */
315   public void preClose(boolean abortRequested) throws IOException {
316     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
317     for (RegionEnvironment env: coprocessors) {
318       if (env.getInstance() instanceof RegionObserver) {
319         ctx = ObserverContext.createAndPrepare(env, ctx);
320         try {
321           ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested);
322         } catch (Throwable e) {
323           handleCoprocessorThrowable(env, e);
324         }
325       }
326     }
327   }
328 
329   /**
330    * Invoked after a region is closed
331    * @param abortRequested true if the server is aborting
332    */
333   public void postClose(boolean abortRequested) {
334     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
335     for (RegionEnvironment env: coprocessors) {
336       if (env.getInstance() instanceof RegionObserver) {
337         ctx = ObserverContext.createAndPrepare(env, ctx);
338         try {
339           ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested);
340         } catch (Throwable e) {
341           handleCoprocessorThrowableNoRethrow(env, e);
342         }
343 
344       }
345       shutdown(env);
346     }
347   }
348 
349   /**
350    * See
351    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
352    */
353   public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
354       ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException {
355     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
356     InternalScanner s = null;
357     for (RegionEnvironment env: coprocessors) {
358       if (env.getInstance() instanceof RegionObserver) {
359         ctx = ObserverContext.createAndPrepare(env, ctx);
360         try {
361           s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
362             scanType, earliestPutTs, s, request);
363         } catch (Throwable e) {
364           handleCoprocessorThrowable(env,e);
365         }
366         if (ctx.shouldComplete()) {
367           break;
368         }
369       }
370     }
371     return s;
372   }
373 
374   /**
375    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
376    * available candidates.
377    * @param store The store where compaction is being requested
378    * @param candidates The currently available store files
379    * @param request custom compaction request
380    * @return If {@code true}, skip the normal selection process and use the current list
381    * @throws IOException
382    */
383   public boolean preCompactSelection(Store store, List<StoreFile> candidates,
384       CompactionRequest request) throws IOException {
385     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
386     boolean bypass = false;
387     for (RegionEnvironment env: coprocessors) {
388       if (env.getInstance() instanceof RegionObserver) {
389         ctx = ObserverContext.createAndPrepare(env, ctx);
390         try {
391           ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request);
392         } catch (Throwable e) {
393           handleCoprocessorThrowable(env,e);
394 
395         }
396         bypass |= ctx.shouldBypass();
397         if (ctx.shouldComplete()) {
398           break;
399         }
400       }
401     }
402     return bypass;
403   }
404 
405   /**
406    * Called after the {@link StoreFile}s to be compacted have been selected from the available
407    * candidates.
408    * @param store The store where compaction is being requested
409    * @param selected The store files selected to compact
410    * @param request custom compaction
411    */
412   public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
413       CompactionRequest request) {
414     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
415     for (RegionEnvironment env: coprocessors) {
416       if (env.getInstance() instanceof RegionObserver) {
417         ctx = ObserverContext.createAndPrepare(env, ctx);
418         try {
419           ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request);
420         } catch (Throwable e) {
421           handleCoprocessorThrowableNoRethrow(env,e);
422         }
423         if (ctx.shouldComplete()) {
424           break;
425         }
426       }
427     }
428   }
429 
430   /**
431    * Called prior to rewriting the store files selected for compaction
432    * @param store the store being compacted
433    * @param scanner the scanner used to read store data during compaction
434    * @param scanType type of Scan
435    * @param request the compaction that will be executed
436    * @throws IOException
437    */
438   public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
439       CompactionRequest request) throws IOException {
440     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
441     boolean bypass = false;
442     for (RegionEnvironment env: coprocessors) {
443       if (env.getInstance() instanceof RegionObserver) {
444         ctx = ObserverContext.createAndPrepare(env, ctx);
445         try {
446           scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType,
447             request);
448         } catch (Throwable e) {
449           handleCoprocessorThrowable(env,e);
450         }
451         bypass |= ctx.shouldBypass();
452         if (ctx.shouldComplete()) {
453           break;
454         }
455       }
456     }
457     return bypass ? null : scanner;
458   }
459 
460   /**
461    * Called after the store compaction has completed.
462    * @param store the store being compacted
463    * @param resultFile the new store file written during compaction
464    * @param request the compaction that is being executed
465    * @throws IOException
466    */
467   public void postCompact(Store store, StoreFile resultFile, CompactionRequest request)
468       throws IOException {
469     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
470     for (RegionEnvironment env: coprocessors) {
471       if (env.getInstance() instanceof RegionObserver) {
472         ctx = ObserverContext.createAndPrepare(env, ctx);
473         try {
474           ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
475         } catch (Throwable e) {
476           handleCoprocessorThrowable(env, e);
477         }
478         if (ctx.shouldComplete()) {
479           break;
480         }
481       }
482     }
483   }
484 
485   /**
486    * Invoked before a memstore flush
487    * @throws IOException
488    */
489   public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException {
490     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
491     boolean bypass = false;
492     for (RegionEnvironment env: coprocessors) {
493       if (env.getInstance() instanceof RegionObserver) {
494         ctx = ObserverContext.createAndPrepare(env, ctx);
495         try {
496           scanner = ((RegionObserver)env.getInstance()).preFlush(
497               ctx, store, scanner);
498         } catch (Throwable e) {
499           handleCoprocessorThrowable(env,e);
500         }
501         bypass |= ctx.shouldBypass();
502         if (ctx.shouldComplete()) {
503           break;
504         }
505       }
506     }
507     return bypass ? null : scanner;
508   }
509 
510   /**
511    * Invoked before a memstore flush
512    * @throws IOException
513    */
514   public void preFlush() throws IOException {
515     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
516     for (RegionEnvironment env: coprocessors) {
517       if (env.getInstance() instanceof RegionObserver) {
518         ctx = ObserverContext.createAndPrepare(env, ctx);
519         try {
520           ((RegionObserver)env.getInstance()).preFlush(ctx);
521         } catch (Throwable e) {
522           handleCoprocessorThrowable(env, e);
523         }
524         if (ctx.shouldComplete()) {
525           break;
526         }
527       }
528     }
529   }
530 
531   /**
532    * See
533    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
534    *    Store, KeyValueScanner, InternalScanner)}
535    */
536   public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner)
537       throws IOException {
538     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
539     InternalScanner s = null;
540     for (RegionEnvironment env : coprocessors) {
541       if (env.getInstance() instanceof RegionObserver) {
542         ctx = ObserverContext.createAndPrepare(env, ctx);
543         try {
544           s = ((RegionObserver) env.getInstance())
545             .preFlushScannerOpen(ctx, store, memstoreScanner, s);
546         } catch (Throwable e) {
547           handleCoprocessorThrowable(env, e);
548         }
549         if (ctx.shouldComplete()) {
550           break;
551         }
552       }
553     }
554     return s;
555   }
556 
557   /**
558    * Invoked after a memstore flush
559    * @throws IOException
560    */
561   public void postFlush() throws IOException {
562     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
563     for (RegionEnvironment env: coprocessors) {
564       if (env.getInstance() instanceof RegionObserver) {
565         ctx = ObserverContext.createAndPrepare(env, ctx);
566         try {
567           ((RegionObserver)env.getInstance()).postFlush(ctx);
568         } catch (Throwable e) {
569           handleCoprocessorThrowable(env, e);
570         }
571         if (ctx.shouldComplete()) {
572           break;
573         }
574       }
575     }
576   }
577 
578   /**
579    * Invoked after a memstore flush
580    * @throws IOException
581    */
582   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
583     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
584     for (RegionEnvironment env: coprocessors) {
585       if (env.getInstance() instanceof RegionObserver) {
586         ctx = ObserverContext.createAndPrepare(env, ctx);
587         try {
588           ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
589         } catch (Throwable e) {
590           handleCoprocessorThrowable(env, e);
591         }
592         if (ctx.shouldComplete()) {
593           break;
594         }
595       }
596     }
597   }
598 
599   /**
600    * Invoked just before a split
601    * @throws IOException
602    */
603   public void preSplit() throws IOException {
604     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
605     for (RegionEnvironment env: coprocessors) {
606       if (env.getInstance() instanceof RegionObserver) {
607         ctx = ObserverContext.createAndPrepare(env, ctx);
608         try {
609           ((RegionObserver)env.getInstance()).preSplit(ctx);
610         } catch (Throwable e) {
611           handleCoprocessorThrowable(env, e);
612         }
613         if (ctx.shouldComplete()) {
614           break;
615         }
616       }
617     }
618   }
619 
620   /**
621    * Invoked just before a split
622    * @throws IOException
623    */
624   public void preSplit(byte[] splitRow) throws IOException {
625     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
626     for (RegionEnvironment env: coprocessors) {
627       if (env.getInstance() instanceof RegionObserver) {
628         ctx = ObserverContext.createAndPrepare(env, ctx);
629         try {
630           ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow);
631         } catch (Throwable e) {
632           handleCoprocessorThrowable(env, e);
633         }
634         if (ctx.shouldComplete()) {
635           break;
636         }
637       }
638     }
639   }
640 
641   /**
642    * Invoked just after a split
643    * @param l the new left-hand daughter region
644    * @param r the new right-hand daughter region
645    * @throws IOException
646    */
647   public void postSplit(HRegion l, HRegion r) throws IOException {
648     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
649     for (RegionEnvironment env: coprocessors) {
650       if (env.getInstance() instanceof RegionObserver) {
651         ctx = ObserverContext.createAndPrepare(env, ctx);
652         try {
653           ((RegionObserver)env.getInstance()).postSplit(ctx, l, r);
654         } catch (Throwable e) {
655           handleCoprocessorThrowable(env, e);
656         }
657         if (ctx.shouldComplete()) {
658           break;
659         }
660       }
661     }
662   }
663 
664   /**
665    * Invoked just before the rollback of a failed split is started
666    * @throws IOException
667    */
668   public void preRollBackSplit() throws IOException {
669     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
670     for (RegionEnvironment env : coprocessors) {
671       if (env.getInstance() instanceof RegionObserver) {
672         ctx = ObserverContext.createAndPrepare(env, ctx);
673         try {
674           ((RegionObserver) env.getInstance()).preRollBackSplit(ctx);
675         } catch (Throwable e) {
676           handleCoprocessorThrowable(env, e);
677         }
678         if (ctx.shouldComplete()) {
679           break;
680         }
681       }
682     }
683   }
684 
685   /**
686    * Invoked just after the rollback of a failed split is done
687    * @throws IOException
688    */
689   public void postRollBackSplit() throws IOException {
690     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
691     for (RegionEnvironment env : coprocessors) {
692       if (env.getInstance() instanceof RegionObserver) {
693         ctx = ObserverContext.createAndPrepare(env, ctx);
694         try {
695           ((RegionObserver) env.getInstance()).postRollBackSplit(ctx);
696         } catch (Throwable e) {
697           handleCoprocessorThrowable(env, e);
698         }
699         if (ctx.shouldComplete()) {
700           break;
701         }
702       }
703     }
704   }
705 
706   /**
707    * Invoked after a split is completed irrespective of a failure or success.
708    * @throws IOException
709    */
710   public void postCompleteSplit() throws IOException {
711     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
712     for (RegionEnvironment env : coprocessors) {
713       if (env.getInstance() instanceof RegionObserver) {
714         ctx = ObserverContext.createAndPrepare(env, ctx);
715         try {
716           ((RegionObserver) env.getInstance()).postCompleteSplit(ctx);
717         } catch (Throwable e) {
718           handleCoprocessorThrowable(env, e);
719         }
720         if (ctx.shouldComplete()) {
721           break;
722         }
723       }
724     }
725   }
726   // RegionObserver support
727 
728   /**
729    * @param row the row key
730    * @param family the family
731    * @param result the result set from the region
732    * @return true if default processing should be bypassed
733    * @exception IOException Exception
734    */
735   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
736       final Result result) throws IOException {
737     boolean bypass = false;
738     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
739     for (RegionEnvironment env: coprocessors) {
740       if (env.getInstance() instanceof RegionObserver) {
741         ctx = ObserverContext.createAndPrepare(env, ctx);
742         try {
743           ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row,
744               family, result);
745         } catch (Throwable e) {
746           handleCoprocessorThrowable(env, e);
747         }
748         bypass |= ctx.shouldBypass();
749         if (ctx.shouldComplete()) {
750           break;
751         }
752       }
753     }
754     return bypass;
755   }
756 
757   /**
758    * @param row the row key
759    * @param family the family
760    * @param result the result set from the region
761    * @exception IOException Exception
762    */
763   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
764       final Result result) throws IOException {
765     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
766     for (RegionEnvironment env: coprocessors) {
767       if (env.getInstance() instanceof RegionObserver) {
768         ctx = ObserverContext.createAndPrepare(env, ctx);
769         try {
770           ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row,
771               family, result);
772         } catch (Throwable e) {
773           handleCoprocessorThrowable(env, e);
774         }
775         if (ctx.shouldComplete()) {
776           break;
777         }
778       }
779     }
780   }
781 
782   /**
783    * @param get the Get request
784    * @return true if default processing should be bypassed
785    * @exception IOException Exception
786    */
787   public boolean preGet(final Get get, final List<KeyValue> results)
788       throws IOException {
789     boolean bypass = false;
790     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
791     for (RegionEnvironment env: coprocessors) {
792       if (env.getInstance() instanceof RegionObserver) {
793         ctx = ObserverContext.createAndPrepare(env, ctx);
794         try {
795           ((RegionObserver)env.getInstance()).preGet(ctx, get, results);
796         } catch (Throwable e) {
797           handleCoprocessorThrowable(env, e);
798         }
799         bypass |= ctx.shouldBypass();
800         if (ctx.shouldComplete()) {
801           break;
802         }
803       }
804     }
805     return bypass;
806   }
807 
808   /**
809    * @param get the Get request
810    * @param results the result set
811    * @exception IOException Exception
812    */
813   public void postGet(final Get get, final List<KeyValue> results)
814   throws IOException {
815     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
816     for (RegionEnvironment env: coprocessors) {
817       if (env.getInstance() instanceof RegionObserver) {
818         ctx = ObserverContext.createAndPrepare(env, ctx);
819         try {
820           ((RegionObserver)env.getInstance()).postGet(ctx, get, results);
821         } catch (Throwable e) {
822           handleCoprocessorThrowable(env, e);
823         }
824         if (ctx.shouldComplete()) {
825           break;
826         }
827       }
828     }
829   }
830 
831   /**
832    * @param get the Get request
833    * @return true or false to return to client if bypassing normal operation,
834    * or null otherwise
835    * @exception IOException Exception
836    */
837   public Boolean preExists(final Get get) throws IOException {
838     boolean bypass = false;
839     boolean exists = false;
840     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
841     for (RegionEnvironment env: coprocessors) {
842       if (env.getInstance() instanceof RegionObserver) {
843         ctx = ObserverContext.createAndPrepare(env, ctx);
844         try {
845           exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists);
846         } catch (Throwable e) {
847           handleCoprocessorThrowable(env, e);
848         }
849         bypass |= ctx.shouldBypass();
850         if (ctx.shouldComplete()) {
851           break;
852         }
853       }
854     }
855     return bypass ? exists : null;
856   }
857 
858   /**
859    * @param get the Get request
860    * @param exists the result returned by the region server
861    * @return the result to return to the client
862    * @exception IOException Exception
863    */
864   public boolean postExists(final Get get, boolean exists)
865       throws IOException {
866     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
867     for (RegionEnvironment env: coprocessors) {
868       if (env.getInstance() instanceof RegionObserver) {
869         ctx = ObserverContext.createAndPrepare(env, ctx);
870         try {
871           exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists);
872         } catch (Throwable e) {
873           handleCoprocessorThrowable(env, e);
874         }
875         if (ctx.shouldComplete()) {
876           break;
877         }
878       }
879     }
880     return exists;
881   }
882 
883   /**
884    * @param put The Put object
885    * @param edit The WALEdit object.
886    * @param durability The durability used
887    * @return true if default processing should be bypassed
888    * @exception IOException Exception
889    */
890   public boolean prePut(Put put, WALEdit edit,
891       final Durability durability) throws IOException {
892     boolean bypass = false;
893     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
894     for (RegionEnvironment env: coprocessors) {
895       if (env.getInstance() instanceof RegionObserver) {
896         ctx = ObserverContext.createAndPrepare(env, ctx);
897         try {
898           ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability);
899         } catch (Throwable e) {
900           handleCoprocessorThrowable(env, e);
901         }
902         bypass |= ctx.shouldBypass();
903         if (ctx.shouldComplete()) {
904           break;
905         }
906       }
907     }
908     return bypass;
909   }
910 
911   /**
912    * @param put The Put object
913    * @param edit The WALEdit object.
914    * @param durability The durability used
915    * @exception IOException Exception
916    */
917   public void postPut(Put put, WALEdit edit,
918       final Durability durability) throws IOException {
919     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
920     for (RegionEnvironment env: coprocessors) {
921       if (env.getInstance() instanceof RegionObserver) {
922         ctx = ObserverContext.createAndPrepare(env, ctx);
923         try {
924           ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability);
925         } catch (Throwable e) {
926           handleCoprocessorThrowable(env, e);
927         }
928         if (ctx.shouldComplete()) {
929           break;
930         }
931       }
932     }
933   }
934 
935   /**
936    * @param delete The Delete object
937    * @param edit The WALEdit object.
938    * @param durability The durability used
939    * @return true if default processing should be bypassed
940    * @exception IOException Exception
941    */
942   public boolean preDelete(Delete delete, WALEdit edit,
943       final Durability durability) throws IOException {
944     boolean bypass = false;
945     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
946     for (RegionEnvironment env: coprocessors) {
947       if (env.getInstance() instanceof RegionObserver) {
948         ctx = ObserverContext.createAndPrepare(env, ctx);
949         try {
950           ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability);
951         } catch (Throwable e) {
952           handleCoprocessorThrowable(env, e);
953         }
954         bypass |= ctx.shouldBypass();
955         if (ctx.shouldComplete()) {
956           break;
957         }
958       }
959     }
960     return bypass;
961   }
962 
963   /**
964    * @param delete The Delete object
965    * @param edit The WALEdit object.
966    * @param durability The durability used
967    * @exception IOException Exception
968    */
969   public void postDelete(Delete delete, WALEdit edit,
970       final Durability durability) throws IOException {
971     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
972     for (RegionEnvironment env: coprocessors) {
973       if (env.getInstance() instanceof RegionObserver) {
974         ctx = ObserverContext.createAndPrepare(env, ctx);
975         try {
976           ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability);
977         } catch (Throwable e) {
978           handleCoprocessorThrowable(env, e);
979         }
980         if (ctx.shouldComplete()) {
981           break;
982         }
983       }
984     }
985   }
986   
987   /**
988    * @param miniBatchOp
989    * @return true if default processing should be bypassed
990    * @throws IOException
991    */
992   public boolean preBatchMutate(
993       final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
994     boolean bypass = false;
995     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
996     for (RegionEnvironment env : coprocessors) {
997       if (env.getInstance() instanceof RegionObserver) {
998         ctx = ObserverContext.createAndPrepare(env, ctx);
999         try {
1000           ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
1001         } catch (Throwable e) {
1002           handleCoprocessorThrowable(env, e);
1003         }
1004         bypass |= ctx.shouldBypass();
1005         if (ctx.shouldComplete()) {
1006           break;
1007         }
1008       }
1009     }
1010     return bypass;
1011   }
1012 
1013   /**
1014    * @param miniBatchOp
1015    * @throws IOException
1016    */
1017   public void postBatchMutate(
1018       final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
1019     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1020     for (RegionEnvironment env : coprocessors) {
1021       if (env.getInstance() instanceof RegionObserver) {
1022         ctx = ObserverContext.createAndPrepare(env, ctx);
1023         try {
1024           ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
1025         } catch (Throwable e) {
1026           handleCoprocessorThrowable(env, e);
1027         }
1028         if (ctx.shouldComplete()) {
1029           break;
1030         }
1031       }
1032     }
1033   }
1034 
1035   /**
1036    * @param row row to check
1037    * @param family column family
1038    * @param qualifier column qualifier
1039    * @param compareOp the comparison operation
1040    * @param comparator the comparator
1041    * @param put data to put if check succeeds
1042    * @return true or false to return to client if default processing should
1043    * be bypassed, or null otherwise
1044    * @throws IOException e
1045    */
1046   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1047       final byte [] qualifier, final CompareOp compareOp,
1048       final ByteArrayComparable comparator, Put put)
1049     throws IOException {
1050     boolean bypass = false;
1051     boolean result = false;
1052     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1053     for (RegionEnvironment env: coprocessors) {
1054       if (env.getInstance() instanceof RegionObserver) {
1055         ctx = ObserverContext.createAndPrepare(env, ctx);
1056         try {
1057           result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family,
1058             qualifier, compareOp, comparator, put, result);
1059         } catch (Throwable e) {
1060           handleCoprocessorThrowable(env, e);
1061         }
1062 
1063 
1064         bypass |= ctx.shouldBypass();
1065         if (ctx.shouldComplete()) {
1066           break;
1067         }
1068       }
1069     }
1070     return bypass ? result : null;
1071   }
1072 
1073   /**
1074    * @param row row to check
1075    * @param family column family
1076    * @param qualifier column qualifier
1077    * @param compareOp the comparison operation
1078    * @param comparator the comparator
1079    * @param put data to put if check succeeds
1080    * @throws IOException e
1081    */
1082   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1083       final byte [] qualifier, final CompareOp compareOp,
1084       final ByteArrayComparable comparator, final Put put,
1085       boolean result)
1086     throws IOException {
1087     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1088     for (RegionEnvironment env: coprocessors) {
1089       if (env.getInstance() instanceof RegionObserver) {
1090         ctx = ObserverContext.createAndPrepare(env, ctx);
1091         try {
1092           result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row,
1093             family, qualifier, compareOp, comparator, put, result);
1094         } catch (Throwable e) {
1095           handleCoprocessorThrowable(env, e);
1096         }
1097         if (ctx.shouldComplete()) {
1098           break;
1099         }
1100       }
1101     }
1102     return result;
1103   }
1104 
1105   /**
1106    * @param row row to check
1107    * @param family column family
1108    * @param qualifier column qualifier
1109    * @param compareOp the comparison operation
1110    * @param comparator the comparator
1111    * @param delete delete to commit if check succeeds
1112    * @return true or false to return to client if default processing should
1113    * be bypassed, or null otherwise
1114    * @throws IOException e
1115    */
1116   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1117       final byte [] qualifier, final CompareOp compareOp,
1118       final ByteArrayComparable comparator, Delete delete)
1119       throws IOException {
1120     boolean bypass = false;
1121     boolean result = false;
1122     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1123     for (RegionEnvironment env: coprocessors) {
1124       if (env.getInstance() instanceof RegionObserver) {
1125         ctx = ObserverContext.createAndPrepare(env, ctx);
1126         try {
1127           result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row,
1128             family, qualifier, compareOp, comparator, delete, result);
1129         } catch (Throwable e) {
1130           handleCoprocessorThrowable(env, e);
1131         }
1132         bypass |= ctx.shouldBypass();
1133         if (ctx.shouldComplete()) {
1134           break;
1135         }
1136       }
1137     }
1138     return bypass ? result : null;
1139   }
1140 
1141   /**
1142    * @param row row to check
1143    * @param family column family
1144    * @param qualifier column qualifier
1145    * @param compareOp the comparison operation
1146    * @param comparator the comparator
1147    * @param delete delete to commit if check succeeds
1148    * @throws IOException e
1149    */
1150   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1151       final byte [] qualifier, final CompareOp compareOp,
1152       final ByteArrayComparable comparator, final Delete delete,
1153       boolean result)
1154     throws IOException {
1155     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1156     for (RegionEnvironment env: coprocessors) {
1157       if (env.getInstance() instanceof RegionObserver) {
1158         ctx = ObserverContext.createAndPrepare(env, ctx);
1159         try {
1160           result = ((RegionObserver)env.getInstance())
1161             .postCheckAndDelete(ctx, row, family, qualifier, compareOp,
1162               comparator, delete, result);
1163         } catch (Throwable e) {
1164           handleCoprocessorThrowable(env, e);
1165         }
1166         if (ctx.shouldComplete()) {
1167           break;
1168         }
1169       }
1170     }
1171     return result;
1172   }
1173 
1174   /**
1175    * @param append append object
1176    * @return result to return to client if default operation should be
1177    * bypassed, null otherwise
1178    * @throws IOException if an error occurred on the coprocessor
1179    */
1180   public Result preAppend(Append append)
1181       throws IOException {
1182     boolean bypass = false;
1183     Result result = null;
1184     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1185     for (RegionEnvironment env: coprocessors) {
1186       if (env.getInstance() instanceof RegionObserver) {
1187         ctx = ObserverContext.createAndPrepare(env, ctx);
1188         try {
1189           result = ((RegionObserver)env.getInstance()).preAppend(ctx, append);
1190         } catch (Throwable e) {
1191           handleCoprocessorThrowable(env, e);
1192         }
1193         bypass |= ctx.shouldBypass();
1194         if (ctx.shouldComplete()) {
1195           break;
1196         }
1197       }
1198     }
1199     return bypass ? result : null;
1200   }
1201 
1202   /**
1203    * @param increment increment object
1204    * @return result to return to client if default operation should be
1205    * bypassed, null otherwise
1206    * @throws IOException if an error occurred on the coprocessor
1207    */
1208   public Result preIncrement(Increment increment)
1209       throws IOException {
1210     boolean bypass = false;
1211     Result result = null;
1212     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1213     for (RegionEnvironment env: coprocessors) {
1214       if (env.getInstance() instanceof RegionObserver) {
1215         ctx = ObserverContext.createAndPrepare(env, ctx);
1216         try {
1217           result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment);
1218         } catch (Throwable e) {
1219           handleCoprocessorThrowable(env, e);
1220         }
1221         bypass |= ctx.shouldBypass();
1222         if (ctx.shouldComplete()) {
1223           break;
1224         }
1225       }
1226     }
1227     return bypass ? result : null;
1228   }
1229 
1230   /**
1231    * @param append Append object
1232    * @param result the result returned by postAppend
1233    * @throws IOException if an error occurred on the coprocessor
1234    */
1235   public void postAppend(final Append append, Result result)
1236       throws IOException {
1237     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1238     for (RegionEnvironment env: coprocessors) {
1239       if (env.getInstance() instanceof RegionObserver) {
1240         ctx = ObserverContext.createAndPrepare(env, ctx);
1241         try {
1242           ((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
1243         } catch (Throwable e) {
1244           handleCoprocessorThrowable(env, e);
1245         }
1246         if (ctx.shouldComplete()) {
1247           break;
1248         }
1249       }
1250     }
1251   }
1252 
1253   /**
1254    * @param increment increment object
1255    * @param result the result returned by postIncrement
1256    * @throws IOException if an error occurred on the coprocessor
1257    */
1258   public Result postIncrement(final Increment increment, Result result)
1259       throws IOException {
1260     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1261     for (RegionEnvironment env: coprocessors) {
1262       if (env.getInstance() instanceof RegionObserver) {
1263         ctx = ObserverContext.createAndPrepare(env, ctx);
1264         try {
1265           result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result);
1266         } catch (Throwable e) {
1267           handleCoprocessorThrowable(env, e);
1268         }
1269         if (ctx.shouldComplete()) {
1270           break;
1271         }
1272       }
1273     }
1274     return result;
1275   }
1276 
1277   /**
1278    * @param scan the Scan specification
1279    * @return scanner id to return to client if default operation should be
1280    * bypassed, false otherwise
1281    * @exception IOException Exception
1282    */
1283   public RegionScanner preScannerOpen(Scan scan) throws IOException {
1284     boolean bypass = false;
1285     RegionScanner s = null;
1286     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1287     for (RegionEnvironment env: coprocessors) {
1288       if (env.getInstance() instanceof RegionObserver) {
1289         ctx = ObserverContext.createAndPrepare(env, ctx);
1290         try {
1291           s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s);
1292         } catch (Throwable e) {
1293           handleCoprocessorThrowable(env, e);
1294         }
1295         bypass |= ctx.shouldBypass();
1296         if (ctx.shouldComplete()) {
1297           break;
1298         }
1299       }
1300     }
1301     return bypass ? s : null;
1302   }
1303 
1304   /**
1305    * See
1306    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1307    *    Store, Scan, NavigableSet, KeyValueScanner)}
1308    */
1309   public KeyValueScanner preStoreScannerOpen(Store store, Scan scan,
1310       final NavigableSet<byte[]> targetCols) throws IOException {
1311     KeyValueScanner s = null;
1312     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1313     for (RegionEnvironment env: coprocessors) {
1314       if (env.getInstance() instanceof RegionObserver) {
1315         ctx = ObserverContext.createAndPrepare(env, ctx);
1316         try {
1317           s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
1318               targetCols, s);
1319         } catch (Throwable e) {
1320           handleCoprocessorThrowable(env, e);
1321         }
1322         if (ctx.shouldComplete()) {
1323           break;
1324         }
1325       }
1326     }
1327     return s;
1328   }
1329 
1330   /**
1331    * @param scan the Scan specification
1332    * @param s the scanner
1333    * @return the scanner instance to use
1334    * @exception IOException Exception
1335    */
1336   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s)
1337       throws IOException {
1338     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1339     for (RegionEnvironment env: coprocessors) {
1340       if (env.getInstance() instanceof RegionObserver) {
1341         ctx = ObserverContext.createAndPrepare(env, ctx);
1342         try {
1343           s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s);
1344         } catch (Throwable e) {
1345           handleCoprocessorThrowable(env, e);
1346         }
1347         if (ctx.shouldComplete()) {
1348           break;
1349         }
1350       }
1351     }
1352     return s;
1353   }
1354 
1355   /**
1356    * @param s the scanner
1357    * @param results the result set returned by the region server
1358    * @param limit the maximum number of results to return
1359    * @return 'has next' indication to client if bypassing default behavior, or
1360    * null otherwise
1361    * @exception IOException Exception
1362    */
1363   public Boolean preScannerNext(final InternalScanner s,
1364       final List<Result> results, int limit) throws IOException {
1365     boolean bypass = false;
1366     boolean hasNext = false;
1367     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1368     for (RegionEnvironment env: coprocessors) {
1369       if (env.getInstance() instanceof RegionObserver) {
1370         ctx = ObserverContext.createAndPrepare(env, ctx);
1371         try {
1372           hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results,
1373             limit, hasNext);
1374         } catch (Throwable e) {
1375           handleCoprocessorThrowable(env, e);
1376         }
1377         bypass |= ctx.shouldBypass();
1378         if (ctx.shouldComplete()) {
1379           break;
1380         }
1381       }
1382     }
1383     return bypass ? hasNext : null;
1384   }
1385 
1386   /**
1387    * @param s the scanner
1388    * @param results the result set returned by the region server
1389    * @param limit the maximum number of results to return
1390    * @param hasMore
1391    * @return 'has more' indication to give to client
1392    * @exception IOException Exception
1393    */
1394   public boolean postScannerNext(final InternalScanner s,
1395       final List<Result> results, final int limit, boolean hasMore)
1396       throws IOException {
1397     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1398     for (RegionEnvironment env: coprocessors) {
1399       if (env.getInstance() instanceof RegionObserver) {
1400         ctx = ObserverContext.createAndPrepare(env, ctx);
1401         try {
1402           hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s,
1403             results, limit, hasMore);
1404         } catch (Throwable e) {
1405           handleCoprocessorThrowable(env, e);
1406         }
1407         if (ctx.shouldComplete()) {
1408           break;
1409         }
1410       }
1411     }
1412     return hasMore;
1413   }
1414 
1415   /**
1416    * This will be called by the scan flow when the current scanned row is being filtered out by the
1417    * filter.
1418    * @param s the scanner
1419    * @param currentRow The current rowkey which got filtered out
1420    * @return whether more rows are available for the scanner or not
1421    * @throws IOException
1422    */
1423   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow)
1424       throws IOException {
1425     boolean hasMore = true; // By default assume more rows there.
1426     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1427     for (RegionEnvironment env : coprocessors) {
1428       if (env.getInstance() instanceof RegionObserver) {
1429         ctx = ObserverContext.createAndPrepare(env, ctx);
1430         try {
1431           hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
1432               hasMore);
1433         } catch (Throwable e) {
1434           handleCoprocessorThrowable(env, e);
1435         }
1436         if (ctx.shouldComplete()) {
1437           break;
1438         }
1439       }
1440     }
1441     return hasMore;
1442   }
1443   
1444   /**
1445    * @param s the scanner
1446    * @return true if default behavior should be bypassed, false otherwise
1447    * @exception IOException Exception
1448    */
1449   public boolean preScannerClose(final InternalScanner s)
1450       throws IOException {
1451     boolean bypass = false;
1452     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1453     for (RegionEnvironment env: coprocessors) {
1454       if (env.getInstance() instanceof RegionObserver) {
1455         ctx = ObserverContext.createAndPrepare(env, ctx);
1456         try {
1457           ((RegionObserver)env.getInstance()).preScannerClose(ctx, s);
1458         } catch (Throwable e) {
1459           handleCoprocessorThrowable(env, e);
1460         }
1461         bypass |= ctx.shouldBypass();
1462         if (ctx.shouldComplete()) {
1463           break;
1464         }
1465       }
1466     }
1467     return bypass;
1468   }
1469 
1470   /**
1471    * @param s the scanner
1472    * @exception IOException Exception
1473    */
1474   public void postScannerClose(final InternalScanner s)
1475       throws IOException {
1476     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1477     for (RegionEnvironment env: coprocessors) {
1478       if (env.getInstance() instanceof RegionObserver) {
1479         ctx = ObserverContext.createAndPrepare(env, ctx);
1480         try {
1481           ((RegionObserver)env.getInstance()).postScannerClose(ctx, s);
1482         } catch (Throwable e) {
1483           handleCoprocessorThrowable(env, e);
1484         }
1485         if (ctx.shouldComplete()) {
1486           break;
1487         }
1488       }
1489     }
1490   }
1491 
1492   /**
1493    * @param info
1494    * @param logKey
1495    * @param logEdit
1496    * @return true if default behavior should be bypassed, false otherwise
1497    * @throws IOException
1498    */
1499   public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
1500       WALEdit logEdit) throws IOException {
1501     boolean bypass = false;
1502     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1503     for (RegionEnvironment env: coprocessors) {
1504       if (env.getInstance() instanceof RegionObserver) {
1505         ctx = ObserverContext.createAndPrepare(env, ctx);
1506         try {
1507           ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey,
1508               logEdit);
1509         } catch (Throwable e) {
1510           handleCoprocessorThrowable(env, e);
1511         }
1512         bypass |= ctx.shouldBypass();
1513         if (ctx.shouldComplete()) {
1514           break;
1515         }
1516       }
1517     }
1518     return bypass;
1519   }
1520 
1521   /**
1522    * @param info
1523    * @param logKey
1524    * @param logEdit
1525    * @throws IOException
1526    */
1527   public void postWALRestore(HRegionInfo info, HLogKey logKey,
1528       WALEdit logEdit) throws IOException {
1529     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1530     for (RegionEnvironment env: coprocessors) {
1531       if (env.getInstance() instanceof RegionObserver) {
1532         ctx = ObserverContext.createAndPrepare(env, ctx);
1533         try {
1534           ((RegionObserver)env.getInstance()).postWALRestore(ctx, info,
1535               logKey, logEdit);
1536         } catch (Throwable e) {
1537           handleCoprocessorThrowable(env, e);
1538         }
1539         if (ctx.shouldComplete()) {
1540           break;
1541         }
1542       }
1543     }
1544   }
1545 
1546   /**
1547    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1548    * @return true if the default operation should be bypassed
1549    * @throws IOException
1550    */
1551   public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws IOException {
1552     boolean bypass = false;
1553     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1554     for (RegionEnvironment env: coprocessors) {
1555       if (env.getInstance() instanceof RegionObserver) {
1556         ctx = ObserverContext.createAndPrepare(env, ctx);
1557         try {
1558           ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
1559         } catch (Throwable e) {
1560           handleCoprocessorThrowable(env, e);
1561         }
1562         bypass |= ctx.shouldBypass();
1563         if (ctx.shouldComplete()) {
1564           break;
1565         }
1566       }
1567     }
1568 
1569     return bypass;
1570   }
1571 
1572   /**
1573    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1574    * @param hasLoaded whether load was successful or not
1575    * @return the possibly modified value of hasLoaded
1576    * @throws IOException
1577    */
1578   public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean hasLoaded)
1579       throws IOException {
1580     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1581     for (RegionEnvironment env: coprocessors) {
1582       if (env.getInstance() instanceof RegionObserver) {
1583         ctx = ObserverContext.createAndPrepare(env, ctx);
1584         try {
1585           hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
1586             familyPaths, hasLoaded);
1587         } catch (Throwable e) {
1588           handleCoprocessorThrowable(env, e);
1589         }
1590         if (ctx.shouldComplete()) {
1591           break;
1592         }
1593       }
1594     }
1595 
1596     return hasLoaded;
1597   }
1598 
1599 }