View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   * http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableSet;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.regex.Matcher;
31  
32  import org.apache.commons.collections.map.AbstractReferenceMap;
33  import org.apache.commons.collections.map.ReferenceMap;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Coprocessor;
39  import org.apache.hadoop.hbase.CoprocessorEnvironment;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.client.Append;
46  import org.apache.hadoop.hbase.client.Delete;
47  import org.apache.hadoop.hbase.client.Get;
48  import org.apache.hadoop.hbase.client.Increment;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Result;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
56  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
57  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
58  import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
59  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
60  import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
61  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
62  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
63  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.util.Pair;
66  import org.apache.hadoop.util.StringUtils;
67  
68  import com.google.common.collect.ImmutableList;
69  
70  /**
71   * Implements the coprocessor environment and runtime support for coprocessors
72   * loaded within a {@link HRegion}.
73   */
74  public class RegionCoprocessorHost
75      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
76  
77    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
78    // The shared data map
79    private static ReferenceMap sharedDataMap =
80        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
81  
82    /**
83     * Encapsulation of the environment of each coprocessor
84     */
85    static class RegionEnvironment extends CoprocessorHost.Environment
86        implements RegionCoprocessorEnvironment {
87  
88      private HRegion region;
89      private RegionServerServices rsServices;
90      ConcurrentMap<String, Object> sharedData;
91  
92      /**
93       * Constructor
94       * @param impl the coprocessor instance
95       * @param priority chaining priority
96       */
97      public RegionEnvironment(final Coprocessor impl, final int priority,
98          final int seq, final Configuration conf, final HRegion region,
99          final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
100       super(impl, priority, seq, conf);
101       this.region = region;
102       this.rsServices = services;
103       this.sharedData = sharedData;
104     }
105 
106     /** @return the region */
107     @Override
108     public HRegion getRegion() {
109       return region;
110     }
111 
112     /** @return reference to the region server services */
113     @Override
114     public RegionServerServices getRegionServerServices() {
115       return rsServices;
116     }
117 
118     public void shutdown() {
119       super.shutdown();
120     }
121 
122     @Override
123     public ConcurrentMap<String, Object> getSharedData() {
124       return sharedData;
125     }
126   }
127 
128   /** The region server services */
129   RegionServerServices rsServices;
130   /** The region */
131   HRegion region;
132 
133   /**
134    * Constructor
135    * @param region the region
136    * @param rsServices interface to available region server functionality
137    * @param conf the configuration
138    */
139   public RegionCoprocessorHost(final HRegion region,
140       final RegionServerServices rsServices, final Configuration conf) {
141     this.conf = conf;
142     this.rsServices = rsServices;
143     this.region = region;
144     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
145 
146     // load system default cp's from configuration.
147     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
148 
149     // load system default cp's for user tables from configuration.
150     if (!HTableDescriptor.isMetaTable(region.getRegionInfo().getTableName())) {
151       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
152     }
153 
154     // load Coprocessor From HDFS
155     loadTableCoprocessors(conf);
156   }
157 
158   void loadTableCoprocessors(final Configuration conf) {
159     // scan the table attributes for coprocessor load specifications
160     // initialize the coprocessors
161     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
162     for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
163         region.getTableDesc().getValues().entrySet()) {
164       String key = Bytes.toString(e.getKey().get()).trim();
165       String spec = Bytes.toString(e.getValue().get()).trim();
166       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
167         // found one
168         try {
169           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
170           if (matcher.matches()) {
171             // jar file path can be empty if the cp class can be loaded
172             // from class loader.
173             Path path = matcher.group(1).trim().isEmpty() ?
174                 null : new Path(matcher.group(1).trim());
175             String className = matcher.group(2).trim();
176             int priority = matcher.group(3).trim().isEmpty() ?
177                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
178             String cfgSpec = null;
179             try {
180               cfgSpec = matcher.group(4);
181             } catch (IndexOutOfBoundsException ex) {
182               // ignore
183             }
184             if (cfgSpec != null) {
185               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
186               // do an explicit deep copy of the passed configuration
187               Configuration newConf = new Configuration(false);
188               HBaseConfiguration.merge(newConf, conf);
189               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
190               while (m.find()) {
191                 newConf.set(m.group(1), m.group(2));
192               }
193               configured.add(load(path, className, priority, newConf));
194             } else {
195               configured.add(load(path, className, priority, conf));
196             }
197             LOG.info("Load coprocessor " + className + " from HTD of " +
198               Bytes.toString(region.getTableDesc().getName()) +
199                 " successfully.");
200           } else {
201             throw new RuntimeException("specification does not match pattern");
202           }
203         } catch (Exception ex) {
204           LOG.warn("attribute '" + key +
205             "' has invalid coprocessor specification '" + spec + "'");
206           LOG.warn(StringUtils.stringifyException(ex));
207         }
208       }
209     }
210     // add together to coprocessor set for COW efficiency
211     coprocessors.addAll(configured);
212   }
213 
214   @Override
215   public RegionEnvironment createEnvironment(Class<?> implClass,
216       Coprocessor instance, int priority, int seq, Configuration conf) {
217     // Check if it's an Endpoint.
218     // Due to current dynamic protocol design, Endpoint
219     // uses a different way to be registered and executed.
220     // It uses a visitor pattern to invoke registered Endpoint
221     // method.
222     for (Class c : implClass.getInterfaces()) {
223       if (CoprocessorProtocol.class.isAssignableFrom(c)) {
224         region.registerProtocol(c, (CoprocessorProtocol)instance);
225         break;
226       }
227     }
228     ConcurrentMap<String, Object> classData;
229     // make sure only one thread can add maps
230     synchronized (sharedDataMap) {
231       // as long as at least one RegionEnvironment holds on to its classData it will
232       // remain in this map
233       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
234       if (classData == null) {
235         classData = new ConcurrentHashMap<String, Object>();
236         sharedDataMap.put(implClass.getName(), classData);
237       }
238     }
239     return new RegionEnvironment(instance, priority, seq, conf, region,
240         rsServices, classData);
241   }
242 
243   @Override
244   protected void abortServer(final CoprocessorEnvironment env, final Throwable e) {
245     abortServer("regionserver", rsServices, env, e);
246   }
247 
248   /**
249    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
250    *
251    * For example, {@link
252    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
253    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
254    *
255    * See also {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable()}
256    * @param env The coprocessor that threw the exception.
257    * @param e The exception that was thrown.
258    */
259   private void handleCoprocessorThrowableNoRethrow(
260       final CoprocessorEnvironment env, final Throwable e) {
261     try {
262       handleCoprocessorThrowable(env,e);
263     } catch (IOException ioe) {
264       // We cannot throw exceptions from the caller hook, so ignore.
265       LOG.warn("handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " + e
266         + ". Ignoring.",e);
267     }
268   }
269 
270   /**
271    * Invoked before a region open
272    */
273   public void preOpen(){
274     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
275     for (RegionEnvironment env: coprocessors) {
276       if (env.getInstance() instanceof RegionObserver) {
277         ctx = ObserverContext.createAndPrepare(env, ctx);
278          try {
279           ((RegionObserver)env.getInstance()).preOpen(ctx);
280          } catch (Throwable e) {
281            handleCoprocessorThrowableNoRethrow(env, e);
282          }
283         if (ctx.shouldComplete()) {
284           break;
285         }
286       }
287     }
288   }
289 
290   /**
291    * Invoked after a region open
292    */
293   public void postOpen(){
294     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
295     for (RegionEnvironment env: coprocessors) {
296       if (env.getInstance() instanceof RegionObserver) {
297         ctx = ObserverContext.createAndPrepare(env, ctx);
298         try {
299           ((RegionObserver)env.getInstance()).postOpen(ctx);
300         } catch (Throwable e) {
301           handleCoprocessorThrowableNoRethrow(env, e);
302         }
303         if (ctx.shouldComplete()) {
304           break;
305         }
306       }
307     }
308   }
309 
310   /**
311    * Invoked before a region is closed
312    * @param abortRequested true if the server is aborting
313    */
314   public void preClose(boolean abortRequested) throws IOException {
315     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
316     for (RegionEnvironment env: coprocessors) {
317       if (env.getInstance() instanceof RegionObserver) {
318         ctx = ObserverContext.createAndPrepare(env, ctx);
319         try {
320           ((RegionObserver)env.getInstance()).preClose(ctx, abortRequested);
321         } catch (Throwable e) {
322           handleCoprocessorThrowable(env, e);
323         }
324       }
325     }
326   }
327 
328   /**
329    * Invoked after a region is closed
330    * @param abortRequested true if the server is aborting
331    */
332   public void postClose(boolean abortRequested){
333     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
334     for (RegionEnvironment env: coprocessors) {
335       if (env.getInstance() instanceof RegionObserver) {
336         ctx = ObserverContext.createAndPrepare(env, ctx);
337         try {
338           ((RegionObserver)env.getInstance()).postClose(ctx, abortRequested);
339         } catch (Throwable e) {
340           handleCoprocessorThrowableNoRethrow(env, e);
341         }
342 
343       }
344       shutdown(env);
345     }
346   }
347 
348   /**
349    * See
350    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
351    */
352   public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
353       ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException {
354     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
355     InternalScanner s = null;
356     for (RegionEnvironment env: coprocessors) {
357       if (env.getInstance() instanceof RegionObserver) {
358         ctx = ObserverContext.createAndPrepare(env, ctx);
359         try {
360           s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
361             scanType, earliestPutTs, s, request);
362         } catch (Throwable e) {
363           handleCoprocessorThrowable(env,e);
364         }
365         if (ctx.shouldComplete()) {
366           break;
367         }
368       }
369     }
370     return s;
371   }
372 
373   /**
374    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
375    * available candidates.
376    * @param store The store where compaction is being requested
377    * @param candidates The currently available store files
378    * @param request custom compaction request
379    * @return If {@code true}, skip the normal selection process and use the current list
380    * @throws IOException
381    */
382   public boolean preCompactSelection(Store store, List<StoreFile> candidates,
383       CompactionRequest request) throws IOException {
384     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
385     boolean bypass = false;
386     for (RegionEnvironment env: coprocessors) {
387       if (env.getInstance() instanceof RegionObserver) {
388         ctx = ObserverContext.createAndPrepare(env, ctx);
389         try {
390           ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request);
391         } catch (Throwable e) {
392           handleCoprocessorThrowable(env,e);
393         }
394         bypass |= ctx.shouldBypass();
395         if (ctx.shouldComplete()) {
396           break;
397         }
398       }
399     }
400     return bypass;
401   }
402 
403   /**
404    * Called after the {@link StoreFile}s to be compacted have been selected from the available
405    * candidates.
406    * @param store The store where compaction is being requested
407    * @param selected The store files selected to compact
408    * @param request custom compaction
409    */
410   public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
411       CompactionRequest request) {
412     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
413     for (RegionEnvironment env: coprocessors) {
414       if (env.getInstance() instanceof RegionObserver) {
415         ctx = ObserverContext.createAndPrepare(env, ctx);
416         try {
417           ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request);
418         } catch (Throwable e) {
419           handleCoprocessorThrowableNoRethrow(env,e);
420         }
421         if (ctx.shouldComplete()) {
422           break;
423         }
424       }
425     }
426   }
427 
428   /**
429    * Called prior to rewriting the store files selected for compaction
430    * @param store the store being compacted
431    * @param scanner the scanner used to read store data during compaction
432    * @param request the compaction that will be executed
433    * @throws IOException
434    */
435   public InternalScanner preCompact(Store store, InternalScanner scanner, 
436       CompactionRequest request) throws IOException {
437     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
438     boolean bypass = false;
439     for (RegionEnvironment env : coprocessors) {
440       if (env.getInstance() instanceof RegionObserver) {
441         ctx = ObserverContext.createAndPrepare(env, ctx);
442         try {
443           scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, request);
444         } catch (Throwable e) {
445           handleCoprocessorThrowable(env, e);
446         }
447         bypass |= ctx.shouldBypass();
448         if (ctx.shouldComplete()) {
449           break;
450         }
451       }
452     }
453     return bypass ? null : scanner;
454   }
455 
456   /**
457    * Called after the store compaction has completed.
458    * @param store the store being compacted
459    * @param resultFile the new store file written during compaction
460    * @param request the compaction that is being executed
461    * @throws IOException
462    */
463   public void postCompact(Store store, StoreFile resultFile, CompactionRequest request)
464       throws IOException {
465     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
466     for (RegionEnvironment env: coprocessors) {
467       if (env.getInstance() instanceof RegionObserver) {
468         ctx = ObserverContext.createAndPrepare(env, ctx);
469         try {
470           ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
471         } catch (Throwable e) {
472           handleCoprocessorThrowable(env, e);
473         }
474         if (ctx.shouldComplete()) {
475           break;
476         }
477       }
478     }
479   }
480 
481   /**
482    * Invoked before a memstore flush
483    * @throws IOException
484    */
485   public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException {
486     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
487     boolean bypass = false;
488     for (RegionEnvironment env: coprocessors) {
489       if (env.getInstance() instanceof RegionObserver) {
490         ctx = ObserverContext.createAndPrepare(env, ctx);
491         try {
492           scanner = ((RegionObserver)env.getInstance()).preFlush(
493               ctx, store, scanner);
494         } catch (Throwable e) {
495           handleCoprocessorThrowable(env,e);
496         }
497         bypass |= ctx.shouldBypass();
498         if (ctx.shouldComplete()) {
499           break;
500         }
501       }
502     }
503     return bypass ? null : scanner;
504   }
505 
506   /**
507    * Invoked before a memstore flush
508    * @throws IOException 
509    */
510   public void preFlush() throws IOException {
511     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
512     for (RegionEnvironment env: coprocessors) {
513       if (env.getInstance() instanceof RegionObserver) {
514         ctx = ObserverContext.createAndPrepare(env, ctx);
515         try {
516           ((RegionObserver)env.getInstance()).preFlush(ctx);
517         } catch (Throwable e) {
518           handleCoprocessorThrowable(env, e);
519         }
520         if (ctx.shouldComplete()) {
521           break;
522         }
523       }
524     }
525   }
526 
527   /**
528    * See
529    * {@link RegionObserver#preFlush(ObserverContext, Store, KeyValueScanner)}
530    */
531   public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner) throws IOException {
532     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
533     InternalScanner s = null;
534     for (RegionEnvironment env : coprocessors) {
535       if (env.getInstance() instanceof RegionObserver) {
536         ctx = ObserverContext.createAndPrepare(env, ctx);
537         try {
538           s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, memstoreScanner, s);
539         } catch (Throwable e) {
540           handleCoprocessorThrowable(env, e);
541         }
542         if (ctx.shouldComplete()) {
543           break;
544         }
545       }
546     }
547     return s;
548   }
549 
550   /**
551    * Invoked after a memstore flush
552    * @throws IOException
553    */
554   public void postFlush() throws IOException {
555     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
556     for (RegionEnvironment env: coprocessors) {
557       if (env.getInstance() instanceof RegionObserver) {
558         ctx = ObserverContext.createAndPrepare(env, ctx);
559         try {
560           ((RegionObserver)env.getInstance()).postFlush(ctx);
561         } catch (Throwable e) {
562           handleCoprocessorThrowable(env, e);
563         }
564         if (ctx.shouldComplete()) {
565           break;
566         }
567       }
568     }
569   }
570 
571   /**
572    * Invoked after a memstore flush
573    * @throws IOException
574    */
575   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
576     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
577     for (RegionEnvironment env: coprocessors) {
578       if (env.getInstance() instanceof RegionObserver) {
579         ctx = ObserverContext.createAndPrepare(env, ctx);
580         try {
581           ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
582         } catch (Throwable e) {
583           handleCoprocessorThrowable(env, e);
584         }
585         if (ctx.shouldComplete()) {
586           break;
587         }
588       }
589     }
590   }
591 
592   /**
593    * Invoked just before a split
594    * @throws IOException
595    */
596   public void preSplit() throws IOException {
597     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
598     for (RegionEnvironment env: coprocessors) {
599       if (env.getInstance() instanceof RegionObserver) {
600         ctx = ObserverContext.createAndPrepare(env, ctx);
601         try {
602           ((RegionObserver)env.getInstance()).preSplit(ctx);
603         } catch (Throwable e) {
604           handleCoprocessorThrowable(env, e);
605         }
606         if (ctx.shouldComplete()) {
607           break;
608         }
609       }
610     }
611   }
612 
613   /**
614    * Invoked just after a split
615    * @param l the new left-hand daughter region
616    * @param r the new right-hand daughter region
617    * @throws IOException 
618    */
619   public void postSplit(HRegion l, HRegion r) throws IOException {
620     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
621     for (RegionEnvironment env: coprocessors) {
622       if (env.getInstance() instanceof RegionObserver) {
623         ctx = ObserverContext.createAndPrepare(env, ctx);
624         try {
625           ((RegionObserver)env.getInstance()).postSplit(ctx, l, r);
626         } catch (Throwable e) {
627           handleCoprocessorThrowable(env, e);
628         }
629         if (ctx.shouldComplete()) {
630           break;
631         }
632       }
633     }
634   }
635 
636   // RegionObserver support
637 
638   /**
639    * @param row the row key
640    * @param family the family
641    * @param result the result set from the region
642    * @return true if default processing should be bypassed
643    * @exception IOException Exception
644    */
645   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
646       final Result result) throws IOException {
647     boolean bypass = false;
648     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
649     for (RegionEnvironment env: coprocessors) {
650       if (env.getInstance() instanceof RegionObserver) {
651         ctx = ObserverContext.createAndPrepare(env, ctx);
652         try {
653           ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row,
654               family, result);
655         } catch (Throwable e) {
656           handleCoprocessorThrowable(env, e);
657         }
658         bypass |= ctx.shouldBypass();
659         if (ctx.shouldComplete()) {
660           break;
661         }
662       }
663     }
664     return bypass;
665   }
666 
667   /**
668    * @param row the row key
669    * @param family the family
670    * @param result the result set from the region
671    * @exception IOException Exception
672    */
673   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
674       final Result result) throws IOException {
675     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
676     for (RegionEnvironment env: coprocessors) {
677       if (env.getInstance() instanceof RegionObserver) {
678         ctx = ObserverContext.createAndPrepare(env, ctx);
679         try {
680           ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row,
681               family, result);
682         } catch (Throwable e) {
683           handleCoprocessorThrowable(env, e);
684         }
685         if (ctx.shouldComplete()) {
686           break;
687         }
688       }
689     }
690   }
691 
692   /**
693    * @param get the Get request
694    * @return true if default processing should be bypassed
695    * @exception IOException Exception
696    */
697   public boolean preGet(final Get get, final List<KeyValue> results)
698       throws IOException {
699     boolean bypass = false;
700     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
701     for (RegionEnvironment env: coprocessors) {
702       if (env.getInstance() instanceof RegionObserver) {
703         ctx = ObserverContext.createAndPrepare(env, ctx);
704         try {
705           ((RegionObserver)env.getInstance()).preGet(ctx, get, results);
706         } catch (Throwable e) {
707           handleCoprocessorThrowable(env, e);
708         }
709         bypass |= ctx.shouldBypass();
710         if (ctx.shouldComplete()) {
711           break;
712         }
713       }
714     }
715     return bypass;
716   }
717 
718   /**
719    * @param get the Get request
720    * @param results the result set
721    * @exception IOException Exception
722    */
723   public void postGet(final Get get, final List<KeyValue> results)
724   throws IOException {
725     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
726     for (RegionEnvironment env: coprocessors) {
727       if (env.getInstance() instanceof RegionObserver) {
728         ctx = ObserverContext.createAndPrepare(env, ctx);
729         try {
730           ((RegionObserver)env.getInstance()).postGet(ctx, get, results);
731         } catch (Throwable e) {
732           handleCoprocessorThrowable(env, e);
733         }
734         if (ctx.shouldComplete()) {
735           break;
736         }
737       }
738     }
739   }
740 
741   /**
742    * @param get the Get request
743    * @return true or false to return to client if bypassing normal operation,
744    * or null otherwise
745    * @exception IOException Exception
746    */
747   public Boolean preExists(final Get get) throws IOException {
748     boolean bypass = false;
749     boolean exists = false;
750     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
751     for (RegionEnvironment env: coprocessors) {
752       if (env.getInstance() instanceof RegionObserver) {
753         ctx = ObserverContext.createAndPrepare(env, ctx);
754         try {
755           exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists);
756         } catch (Throwable e) {
757           handleCoprocessorThrowable(env, e);
758         }
759         bypass |= ctx.shouldBypass();
760         if (ctx.shouldComplete()) {
761           break;
762         }
763       }
764     }
765     return bypass ? exists : null;
766   }
767 
768   /**
769    * @param get the Get request
770    * @param exists the result returned by the region server
771    * @return the result to return to the client
772    * @exception IOException Exception
773    */
774   public boolean postExists(final Get get, boolean exists)
775       throws IOException {
776     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
777     for (RegionEnvironment env: coprocessors) {
778       if (env.getInstance() instanceof RegionObserver) {
779         ctx = ObserverContext.createAndPrepare(env, ctx);
780         try {
781           exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists);
782         } catch (Throwable e) {
783           handleCoprocessorThrowable(env, e);
784         }
785         if (ctx.shouldComplete()) {
786           break;
787         }
788       }
789     }
790     return exists;
791   }
792 
793   /**
794    * @param put The Put object
795    * @param edit The WALEdit object.
796    * @param writeToWAL true if the change should be written to the WAL
797    * @return true if default processing should be bypassed
798    * @exception IOException Exception
799    */
800   public boolean prePut(Put put, WALEdit edit,
801       final boolean writeToWAL) throws IOException {
802     boolean bypass = false;
803     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
804     for (RegionEnvironment env: coprocessors) {
805       if (env.getInstance() instanceof RegionObserver) {
806         ctx = ObserverContext.createAndPrepare(env, ctx);
807         try {
808           ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, writeToWAL);
809         } catch (Throwable e) {
810           handleCoprocessorThrowable(env, e);
811         }
812         bypass |= ctx.shouldBypass();
813         if (ctx.shouldComplete()) {
814           break;
815         }
816       }
817     }
818     return bypass;
819   }
820 
821   /**
822    * @param put The Put object
823    * @param edit The WALEdit object.
824    * @param writeToWAL true if the change should be written to the WAL
825    * @exception IOException Exception
826    */
827   public void postPut(Put put, WALEdit edit,
828       final boolean writeToWAL) throws IOException {
829     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
830     for (RegionEnvironment env: coprocessors) {
831       if (env.getInstance() instanceof RegionObserver) {
832         ctx = ObserverContext.createAndPrepare(env, ctx);
833         try {
834           ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, writeToWAL);
835         } catch (Throwable e) {
836           handleCoprocessorThrowable(env, e);
837         }
838         if (ctx.shouldComplete()) {
839           break;
840         }
841       }
842     }
843   }
844 
845   /**
846    * @param delete The Delete object
847    * @param edit The WALEdit object.
848    * @param writeToWAL true if the change should be written to the WAL
849    * @return true if default processing should be bypassed
850    * @exception IOException Exception
851    */
852   public boolean preDelete(Delete delete, WALEdit edit,
853       final boolean writeToWAL) throws IOException {
854     boolean bypass = false;
855     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
856     for (RegionEnvironment env: coprocessors) {
857       if (env.getInstance() instanceof RegionObserver) {
858         ctx = ObserverContext.createAndPrepare(env, ctx);
859         try {
860           ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, writeToWAL);
861         } catch (Throwable e) {
862           handleCoprocessorThrowable(env, e);
863         }
864         bypass |= ctx.shouldBypass();
865         if (ctx.shouldComplete()) {
866           break;
867         }
868       }
869     }
870     return bypass;
871   }
872 
873   /**
874    * @param delete The Delete object
875    * @param edit The WALEdit object.
876    * @param writeToWAL true if the change should be written to the WAL
877    * @exception IOException Exception
878    */
879   public void postDelete(Delete delete, WALEdit edit,
880       final boolean writeToWAL) throws IOException {
881     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
882     for (RegionEnvironment env: coprocessors) {
883       if (env.getInstance() instanceof RegionObserver) {
884         ctx = ObserverContext.createAndPrepare(env, ctx);
885         try {
886           ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, writeToWAL);
887         } catch (Throwable e) {
888           handleCoprocessorThrowable(env, e);
889         }
890         if (ctx.shouldComplete()) {
891           break;
892         }
893       }
894     }
895   }
896   
897   /**
898    * @param miniBatchOp
899    * @return true if default processing should be bypassed
900    * @throws IOException
901    */
902   public boolean preBatchMutate(
903       final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
904     boolean bypass = false;
905     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
906     for (RegionEnvironment env : coprocessors) {
907       if (env.getInstance() instanceof RegionObserver) {
908         ctx = ObserverContext.createAndPrepare(env, ctx);
909         try {
910           ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
911         } catch (Throwable e) {
912           handleCoprocessorThrowable(env, e);
913         }
914         bypass |= ctx.shouldBypass();
915         if (ctx.shouldComplete()) {
916           break;
917         }
918       }
919     }
920     return bypass;
921   }
922 
923   /**
924    * @param miniBatchOp
925    * @throws IOException
926    */
927   public void postBatchMutate(
928       final MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
929     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
930     for (RegionEnvironment env : coprocessors) {
931       if (env.getInstance() instanceof RegionObserver) {
932         ctx = ObserverContext.createAndPrepare(env, ctx);
933         try {
934           ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
935         } catch (Throwable e) {
936           handleCoprocessorThrowable(env, e);
937         }
938         if (ctx.shouldComplete()) {
939           break;
940         }
941       }
942     }
943   }
944 
945   /**
946    * @param row row to check
947    * @param family column family
948    * @param qualifier column qualifier
949    * @param compareOp the comparison operation
950    * @param comparator the comparator
951    * @param put data to put if check succeeds
952    * @return true or false to return to client if default processing should
953    * be bypassed, or null otherwise
954    * @throws IOException e
955    */
956   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
957       final byte [] qualifier, final CompareOp compareOp,
958       final WritableByteArrayComparable comparator, Put put)
959     throws IOException {
960     boolean bypass = false;
961     boolean result = false;
962     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
963     for (RegionEnvironment env: coprocessors) {
964       if (env.getInstance() instanceof RegionObserver) {
965         ctx = ObserverContext.createAndPrepare(env, ctx);
966         try {
967           result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family,
968             qualifier, compareOp, comparator, put, result);
969         } catch (Throwable e) {
970           handleCoprocessorThrowable(env, e);
971         }
972 
973 
974         bypass |= ctx.shouldBypass();
975         if (ctx.shouldComplete()) {
976           break;
977         }
978       }
979     }
980     return bypass ? result : null;
981   }
982 
983   /**
984    * @param row row to check
985    * @param family column family
986    * @param qualifier column qualifier
987    * @param compareOp the comparison operation
988    * @param comparator the comparator
989    * @param put data to put if check succeeds
990    * @throws IOException e
991    */
992   public boolean postCheckAndPut(final byte [] row, final byte [] family,
993       final byte [] qualifier, final CompareOp compareOp,
994       final WritableByteArrayComparable comparator, final Put put,
995       boolean result)
996     throws IOException {
997     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
998     for (RegionEnvironment env: coprocessors) {
999       if (env.getInstance() instanceof RegionObserver) {
1000         ctx = ObserverContext.createAndPrepare(env, ctx);
1001         try {
1002           result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row,
1003             family, qualifier, compareOp, comparator, put, result);
1004         } catch (Throwable e) {
1005           handleCoprocessorThrowable(env, e);
1006         }
1007         if (ctx.shouldComplete()) {
1008           break;
1009         }
1010       }
1011     }
1012     return result;
1013   }
1014 
1015   /**
1016    * @param row row to check
1017    * @param family column family
1018    * @param qualifier column qualifier
1019    * @param compareOp the comparison operation
1020    * @param comparator the comparator
1021    * @param delete delete to commit if check succeeds
1022    * @return true or false to return to client if default processing should
1023    * be bypassed, or null otherwise
1024    * @throws IOException e
1025    */
1026   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1027       final byte [] qualifier, final CompareOp compareOp,
1028       final WritableByteArrayComparable comparator, Delete delete)
1029       throws IOException {
1030     boolean bypass = false;
1031     boolean result = false;
1032     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1033     for (RegionEnvironment env: coprocessors) {
1034       if (env.getInstance() instanceof RegionObserver) {
1035         ctx = ObserverContext.createAndPrepare(env, ctx);
1036         try {
1037           result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row,
1038             family, qualifier, compareOp, comparator, delete, result);
1039         } catch (Throwable e) {
1040           handleCoprocessorThrowable(env, e);
1041         }
1042         bypass |= ctx.shouldBypass();
1043         if (ctx.shouldComplete()) {
1044           break;
1045         }
1046       }
1047     }
1048     return bypass ? result : null;
1049   }
1050 
1051   /**
1052    * @param row row to check
1053    * @param family column family
1054    * @param qualifier column qualifier
1055    * @param compareOp the comparison operation
1056    * @param comparator the comparator
1057    * @param delete delete to commit if check succeeds
1058    * @throws IOException e
1059    */
1060   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1061       final byte [] qualifier, final CompareOp compareOp,
1062       final WritableByteArrayComparable comparator, final Delete delete,
1063       boolean result)
1064     throws IOException {
1065     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1066     for (RegionEnvironment env: coprocessors) {
1067       if (env.getInstance() instanceof RegionObserver) {
1068         ctx = ObserverContext.createAndPrepare(env, ctx);
1069         try {
1070           result = ((RegionObserver)env.getInstance())
1071             .postCheckAndDelete(ctx, row, family, qualifier, compareOp,
1072               comparator, delete, result);
1073         } catch (Throwable e) {
1074           handleCoprocessorThrowable(env, e);
1075         }
1076         if (ctx.shouldComplete()) {
1077           break;
1078         }
1079       }
1080     }
1081     return result;
1082   }
1083 
1084   /**
1085    * @param row row to check
1086    * @param family column family
1087    * @param qualifier column qualifier
1088    * @param amount long amount to increment
1089    * @param writeToWAL true if the change should be written to the WAL
1090    * @return return value for client if default operation should be bypassed,
1091    * or null otherwise
1092    * @throws IOException if an error occurred on the coprocessor
1093    */
1094   public Long preIncrementColumnValue(final byte [] row, final byte [] family,
1095       final byte [] qualifier, long amount, final boolean writeToWAL)
1096       throws IOException {
1097     boolean bypass = false;
1098     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1099     for (RegionEnvironment env: coprocessors) {
1100       if (env.getInstance() instanceof RegionObserver) {
1101         ctx = ObserverContext.createAndPrepare(env, ctx);
1102         try {
1103           amount = ((RegionObserver)env.getInstance()).preIncrementColumnValue(ctx,
1104               row, family, qualifier, amount, writeToWAL);
1105         } catch (Throwable e) {
1106           handleCoprocessorThrowable(env, e);
1107         }
1108         bypass |= ctx.shouldBypass();
1109         if (ctx.shouldComplete()) {
1110           break;
1111         }
1112       }
1113     }
1114     return bypass ? amount : null;
1115   }
1116 
1117   /**
1118    * @param row row to check
1119    * @param family column family
1120    * @param qualifier column qualifier
1121    * @param amount long amount to increment
1122    * @param writeToWAL true if the change should be written to the WAL
1123    * @param result the result returned by incrementColumnValue
1124    * @return the result to return to the client
1125    * @throws IOException if an error occurred on the coprocessor
1126    */
1127   public long postIncrementColumnValue(final byte [] row, final byte [] family,
1128       final byte [] qualifier, final long amount, final boolean writeToWAL,
1129       long result) throws IOException {
1130     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1131     for (RegionEnvironment env: coprocessors) {
1132       if (env.getInstance() instanceof RegionObserver) {
1133         ctx = ObserverContext.createAndPrepare(env, ctx);
1134         try {
1135           result = ((RegionObserver)env.getInstance()).postIncrementColumnValue(ctx,
1136               row, family, qualifier, amount, writeToWAL, result);
1137         } catch (Throwable e) {
1138           handleCoprocessorThrowable(env, e);
1139         }
1140         if (ctx.shouldComplete()) {
1141           break;
1142         }
1143       }
1144     }
1145     return result;
1146   }
1147 
1148   /**
1149    * @param append append object
1150    * @return result to return to client if default operation should be
1151    * bypassed, null otherwise
1152    * @throws IOException if an error occurred on the coprocessor
1153    */
1154   public Result preAppend(Append append)
1155       throws IOException {
1156     boolean bypass = false;
1157     Result result = null;
1158     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1159     for (RegionEnvironment env: coprocessors) {
1160       if (env.getInstance() instanceof RegionObserver) {
1161         ctx = ObserverContext.createAndPrepare(env, ctx);
1162         try {
1163           result = ((RegionObserver)env.getInstance()).preAppend(ctx, append);
1164         } catch (Throwable e) {
1165           handleCoprocessorThrowable(env, e);
1166         }
1167         bypass |= ctx.shouldBypass();
1168         if (ctx.shouldComplete()) {
1169           break;
1170         }
1171       }
1172     }
1173     return bypass ? result : null;
1174   }
1175 
1176   /**
1177    * @param increment increment object
1178    * @return result to return to client if default operation should be
1179    * bypassed, null otherwise
1180    * @throws IOException if an error occurred on the coprocessor
1181    */
1182   public Result preIncrement(Increment increment)
1183       throws IOException {
1184     boolean bypass = false;
1185     Result result = null;
1186     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1187     for (RegionEnvironment env: coprocessors) {
1188       if (env.getInstance() instanceof RegionObserver) {
1189         ctx = ObserverContext.createAndPrepare(env, ctx);
1190         try {
1191           result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment);
1192         } catch (Throwable e) {
1193           handleCoprocessorThrowable(env, e);
1194         }
1195         bypass |= ctx.shouldBypass();
1196         if (ctx.shouldComplete()) {
1197           break;
1198         }
1199       }
1200     }
1201     return bypass ? result : null;
1202   }
1203 
1204   /**
1205    * @param append Append object
1206    * @param result the result returned by postAppend
1207    * @throws IOException if an error occurred on the coprocessor
1208    */
1209   public void postAppend(final Append append, Result result)
1210       throws IOException {
1211     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1212     for (RegionEnvironment env: coprocessors) {
1213       if (env.getInstance() instanceof RegionObserver) {
1214         ctx = ObserverContext.createAndPrepare(env, ctx);
1215         try {
1216           ((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
1217         } catch (Throwable e) {
1218           handleCoprocessorThrowable(env, e);
1219         }
1220         if (ctx.shouldComplete()) {
1221           break;
1222         }
1223       }
1224     }
1225   }
1226 
1227   /**
1228    * @param increment increment object
1229    * @param result the result returned by postIncrement
1230    * @throws IOException if an error occurred on the coprocessor
1231    */
1232   public Result postIncrement(final Increment increment, Result result)
1233       throws IOException {
1234     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1235     for (RegionEnvironment env: coprocessors) {
1236       if (env.getInstance() instanceof RegionObserver) {
1237         ctx = ObserverContext.createAndPrepare(env, ctx);
1238         try {
1239           result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result);
1240         } catch (Throwable e) {
1241           handleCoprocessorThrowable(env, e);
1242         }
1243         if (ctx.shouldComplete()) {
1244           break;
1245         }
1246       }
1247     }
1248     return result;
1249   }
1250 
1251   /**
1252    * @param scan the Scan specification
1253    * @return scanner id to return to client if default operation should be
1254    * bypassed, false otherwise
1255    * @exception IOException Exception
1256    */
1257   public RegionScanner preScannerOpen(Scan scan) throws IOException {
1258     boolean bypass = false;
1259     RegionScanner s = null;
1260     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1261     for (RegionEnvironment env: coprocessors) {
1262       if (env.getInstance() instanceof RegionObserver) {
1263         ctx = ObserverContext.createAndPrepare(env, ctx);
1264         try {
1265           s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s);
1266         } catch (Throwable e) {
1267           handleCoprocessorThrowable(env, e);
1268         }
1269         bypass |= ctx.shouldBypass();
1270         if (ctx.shouldComplete()) {
1271           break;
1272         }
1273       }
1274     }
1275     return bypass ? s : null;
1276   }
1277 
1278   /**
1279    * See
1280    * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner)}
1281    */
1282   public KeyValueScanner preStoreScannerOpen(Store store, Scan scan,
1283       final NavigableSet<byte[]> targetCols) throws IOException {
1284     KeyValueScanner s = null;
1285     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1286     for (RegionEnvironment env: coprocessors) {
1287       if (env.getInstance() instanceof RegionObserver) {
1288         ctx = ObserverContext.createAndPrepare(env, ctx);
1289         try {
1290           s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
1291               targetCols, s);
1292         } catch (Throwable e) {
1293           handleCoprocessorThrowable(env, e);
1294         }
1295         if (ctx.shouldComplete()) {
1296           break;
1297         }
1298       }
1299     }
1300     return s;
1301   }
1302 
1303   /**
1304    * @param scan the Scan specification
1305    * @param s the scanner
1306    * @return the scanner instance to use
1307    * @exception IOException Exception
1308    */
1309   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s)
1310       throws IOException {
1311     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1312     for (RegionEnvironment env: coprocessors) {
1313       if (env.getInstance() instanceof RegionObserver) {
1314         ctx = ObserverContext.createAndPrepare(env, ctx);
1315         try {
1316           s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s);
1317         } catch (Throwable e) {
1318           handleCoprocessorThrowable(env, e);
1319         }
1320         if (ctx.shouldComplete()) {
1321           break;
1322         }
1323       }
1324     }
1325     return s;
1326   }
1327 
1328   /**
1329    * @param s the scanner
1330    * @param results the result set returned by the region server
1331    * @param limit the maximum number of results to return
1332    * @return 'has next' indication to client if bypassing default behavior, or
1333    * null otherwise
1334    * @exception IOException Exception
1335    */
1336   public Boolean preScannerNext(final InternalScanner s,
1337       final List<Result> results, int limit) throws IOException {
1338     boolean bypass = false;
1339     boolean hasNext = false;
1340     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1341     for (RegionEnvironment env: coprocessors) {
1342       if (env.getInstance() instanceof RegionObserver) {
1343         ctx = ObserverContext.createAndPrepare(env, ctx);
1344         try {
1345           hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results,
1346             limit, hasNext);
1347         } catch (Throwable e) {
1348           handleCoprocessorThrowable(env, e);
1349         }
1350         bypass |= ctx.shouldBypass();
1351         if (ctx.shouldComplete()) {
1352           break;
1353         }
1354       }
1355     }
1356     return bypass ? hasNext : null;
1357   }
1358 
1359   /**
1360    * @param s the scanner
1361    * @param results the result set returned by the region server
1362    * @param limit the maximum number of results to return
1363    * @param hasMore
1364    * @return 'has more' indication to give to client
1365    * @exception IOException Exception
1366    */
1367   public boolean postScannerNext(final InternalScanner s,
1368       final List<Result> results, final int limit, boolean hasMore)
1369       throws IOException {
1370     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1371     for (RegionEnvironment env: coprocessors) {
1372       if (env.getInstance() instanceof RegionObserver) {
1373         ctx = ObserverContext.createAndPrepare(env, ctx);
1374         try {
1375           hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s,
1376             results, limit, hasMore);
1377         } catch (Throwable e) {
1378           handleCoprocessorThrowable(env, e);
1379         }
1380         if (ctx.shouldComplete()) {
1381           break;
1382         }
1383       }
1384     }
1385     return hasMore;
1386   }
1387 
1388   /**
1389    * This will be called by the scan flow when the current scanned row is being filtered out by the
1390    * filter.
1391    * @param s the scanner
1392    * @param currentRow The current rowkey which got filtered out
1393    * @param offset offset to rowkey
1394    * @param length length of rowkey
1395    * @return whether more rows are available for the scanner or not
1396    * @throws IOException
1397    */
1398   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow, int offset,
1399       short length) throws IOException {
1400     boolean hasMore = true; // By default assume more rows there.
1401     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1402     for (RegionEnvironment env : coprocessors) {
1403       if (env.getInstance() instanceof RegionObserver) {
1404         ctx = ObserverContext.createAndPrepare(env, ctx);
1405         try {
1406           hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
1407               offset, length, hasMore);
1408         } catch (Throwable e) {
1409           handleCoprocessorThrowable(env, e);
1410         }
1411         if (ctx.shouldComplete()) {
1412           break;
1413         }
1414       }
1415     }
1416     return hasMore;
1417   }
1418  
1419   /**
1420    * @param s the scanner
1421    * @return true if default behavior should be bypassed, false otherwise
1422    * @exception IOException Exception
1423    */
1424   public boolean preScannerClose(final InternalScanner s)
1425       throws IOException {
1426     boolean bypass = false;
1427     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1428     for (RegionEnvironment env: coprocessors) {
1429       if (env.getInstance() instanceof RegionObserver) {
1430         ctx = ObserverContext.createAndPrepare(env, ctx);
1431         try {
1432           ((RegionObserver)env.getInstance()).preScannerClose(ctx, s);
1433         } catch (Throwable e) {
1434           handleCoprocessorThrowable(env, e);
1435         }
1436         bypass |= ctx.shouldBypass();
1437         if (ctx.shouldComplete()) {
1438           break;
1439         }
1440       }
1441     }
1442     return bypass;
1443   }
1444 
1445   /**
1446    * @param s the scanner
1447    * @exception IOException Exception
1448    */
1449   public void postScannerClose(final InternalScanner s)
1450       throws IOException {
1451     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1452     for (RegionEnvironment env: coprocessors) {
1453       if (env.getInstance() instanceof RegionObserver) {
1454         ctx = ObserverContext.createAndPrepare(env, ctx);
1455         try {
1456           ((RegionObserver)env.getInstance()).postScannerClose(ctx, s);
1457         } catch (Throwable e) {
1458           handleCoprocessorThrowable(env, e);
1459         }
1460         if (ctx.shouldComplete()) {
1461           break;
1462         }
1463       }
1464     }
1465   }
1466 
1467   /**
1468    * @param info
1469    * @param logKey
1470    * @param logEdit
1471    * @return true if default behavior should be bypassed, false otherwise
1472    * @throws IOException
1473    */
1474   public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
1475       WALEdit logEdit) throws IOException {
1476     boolean bypass = false;
1477     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1478     for (RegionEnvironment env: coprocessors) {
1479       if (env.getInstance() instanceof RegionObserver) {
1480         ctx = ObserverContext.createAndPrepare(env, ctx);
1481         try {
1482           ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey,
1483               logEdit);
1484         } catch (Throwable e) {
1485           handleCoprocessorThrowable(env, e);
1486         }
1487         bypass |= ctx.shouldBypass();
1488         if (ctx.shouldComplete()) {
1489           break;
1490         }
1491       }
1492     }
1493     return bypass;
1494   }
1495 
1496   /**
1497    * @param info
1498    * @param logKey
1499    * @param logEdit
1500    * @throws IOException
1501    */
1502   public void postWALRestore(HRegionInfo info, HLogKey logKey,
1503       WALEdit logEdit) throws IOException {
1504     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1505     for (RegionEnvironment env: coprocessors) {
1506       if (env.getInstance() instanceof RegionObserver) {
1507         ctx = ObserverContext.createAndPrepare(env, ctx);
1508         try {
1509           ((RegionObserver)env.getInstance()).postWALRestore(ctx, info,
1510               logKey, logEdit);
1511         } catch (Throwable e) {
1512           handleCoprocessorThrowable(env, e);
1513         }
1514         if (ctx.shouldComplete()) {
1515           break;
1516         }
1517       }
1518     }
1519   }
1520 
1521   /**
1522    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1523    * @return true if the default operation should be bypassed
1524    * @throws IOException
1525    */
1526   public boolean preBulkLoadHFile(List<Pair<byte[], String>> familyPaths) throws IOException {
1527     boolean bypass = false;
1528     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1529     for (RegionEnvironment env: coprocessors) {
1530       if (env.getInstance() instanceof RegionObserver) {
1531         ctx = ObserverContext.createAndPrepare(env, ctx);
1532         try {
1533           ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
1534         } catch (Throwable e) {
1535           handleCoprocessorThrowable(env, e);
1536         }
1537         bypass |= ctx.shouldBypass();
1538         if (ctx.shouldComplete()) {
1539           break;
1540         }
1541       }
1542     }
1543 
1544     return bypass;
1545   }
1546 
1547   /**
1548    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1549    * @param hasLoaded whether load was successful or not
1550    * @return the possibly modified value of hasLoaded
1551    * @throws IOException
1552    */
1553   public boolean postBulkLoadHFile(List<Pair<byte[], String>> familyPaths, boolean hasLoaded)
1554       throws IOException {
1555     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1556     for (RegionEnvironment env: coprocessors) {
1557       if (env.getInstance() instanceof RegionObserver) {
1558         ctx = ObserverContext.createAndPrepare(env, ctx);
1559         try {
1560           hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx,
1561             familyPaths, hasLoaded);
1562         } catch (Throwable e) {
1563           handleCoprocessorThrowable(env, e);
1564         }
1565         if (ctx.shouldComplete()) {
1566           break;
1567         }
1568       }
1569     }
1570 
1571     return hasLoaded;
1572   }
1573   
1574   public void preLockRow(byte[] regionName, byte[] row) throws IOException {
1575     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1576     for (RegionEnvironment env : coprocessors) {
1577       if (env.getInstance() instanceof RegionObserver) {
1578         ctx = ObserverContext.createAndPrepare(env, ctx);
1579         ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row);
1580         if (ctx.shouldComplete()) {
1581           break;
1582         }
1583       }
1584     }
1585   }
1586 
1587   public void preUnLockRow(byte[] regionName, long lockId) throws IOException {
1588     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1589     for (RegionEnvironment env : coprocessors) {
1590       if (env.getInstance() instanceof RegionObserver) {
1591         ctx = ObserverContext.createAndPrepare(env, ctx);
1592         ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId);
1593         if (ctx.shouldComplete()) {
1594           break;
1595         }
1596       }
1597     }
1598   }
1599 
1600 }