View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentMap;
29  import java.util.regex.Matcher;
30  
31  import org.apache.commons.collections.map.AbstractReferenceMap;
32  import org.apache.commons.collections.map.ReferenceMap;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.Coprocessor;
40  import org.apache.hadoop.hbase.CoprocessorEnvironment;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.client.Append;
45  import org.apache.hadoop.hbase.client.Delete;
46  import org.apache.hadoop.hbase.client.Durability;
47  import org.apache.hadoop.hbase.client.Get;
48  import org.apache.hadoop.hbase.client.Increment;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Result;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
55  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
56  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
57  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
58  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
59  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
60  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
61  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
62  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
63  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
64  import org.apache.hadoop.hbase.io.Reference;
65  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
67  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
68  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
69  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.apache.hadoop.hbase.util.Pair;
72  
73  import com.google.common.collect.ImmutableList;
74  import com.google.protobuf.Message;
75  import com.google.protobuf.Service;
76  
77  /**
78   * Implements the coprocessor environment and runtime support for coprocessors
79   * loaded within a {@link HRegion}.
80   */
81  public class RegionCoprocessorHost
82      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
83  
84    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
85    // The shared data map
86    private static ReferenceMap sharedDataMap =
87        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
88  
89    /**
90     * Encapsulation of the environment of each coprocessor
91     */
92    static class RegionEnvironment extends CoprocessorHost.Environment
93        implements RegionCoprocessorEnvironment {
94  
95      private HRegion region;
96      private RegionServerServices rsServices;
97      ConcurrentMap<String, Object> sharedData;
98  
99      /**
100      * Constructor
101      * @param impl the coprocessor instance
102      * @param priority chaining priority
103      */
104     public RegionEnvironment(final Coprocessor impl, final int priority,
105         final int seq, final Configuration conf, final HRegion region,
106         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
107       super(impl, priority, seq, conf);
108       this.region = region;
109       this.rsServices = services;
110       this.sharedData = sharedData;
111     }
112 
113     /** @return the region */
114     @Override
115     public HRegion getRegion() {
116       return region;
117     }
118 
119     /** @return reference to the region server services */
120     @Override
121     public RegionServerServices getRegionServerServices() {
122       return rsServices;
123     }
124 
125     public void shutdown() {
126       super.shutdown();
127     }
128 
129     @Override
130     public ConcurrentMap<String, Object> getSharedData() {
131       return sharedData;
132     }
133   }
134 
135   /** The region server services */
136   RegionServerServices rsServices;
137   /** The region */
138   HRegion region;
139 
140   /**
141    * Constructor
142    * @param region the region
143    * @param rsServices interface to available region server functionality
144    * @param conf the configuration
145    */
146   public RegionCoprocessorHost(final HRegion region,
147       final RegionServerServices rsServices, final Configuration conf) {
148     super(rsServices);
149     this.conf = conf;
150     this.rsServices = rsServices;
151     this.region = region;
152     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
153 
154     // load system default cp's from configuration.
155     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
156 
157     // load system default cp's for user tables from configuration.
158     if (!region.getRegionInfo().getTable().isSystemTable()) {
159       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
160     }
161 
162     // load Coprocessor From HDFS
163     loadTableCoprocessors(conf);
164   }
165 
166   void loadTableCoprocessors(final Configuration conf) {
167     // scan the table attributes for coprocessor load specifications
168     // initialize the coprocessors
169     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
170     for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
171         region.getTableDesc().getValues().entrySet()) {
172       String key = Bytes.toString(e.getKey().get()).trim();
173       String spec = Bytes.toString(e.getValue().get()).trim();
174       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
175         // found one
176         try {
177           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
178           if (matcher.matches()) {
179             // jar file path can be empty if the cp class can be loaded
180             // from class loader.
181             Path path = matcher.group(1).trim().isEmpty() ?
182                 null : new Path(matcher.group(1).trim());
183             String className = matcher.group(2).trim();
184             int priority = matcher.group(3).trim().isEmpty() ?
185                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
186             String cfgSpec = null;
187             try {
188               cfgSpec = matcher.group(4);
189             } catch (IndexOutOfBoundsException ex) {
190               // ignore
191             }
192             Configuration ourConf;
193             if (cfgSpec != null) {
194               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
195               // do an explicit deep copy of the passed configuration
196               ourConf = new Configuration(false);
197               HBaseConfiguration.merge(ourConf, conf);
198               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
199               while (m.find()) {
200                 ourConf.set(m.group(1), m.group(2));
201               }
202             } else {
203               ourConf = conf;
204             }
205             // Load encompasses classloading and coprocessor initialization
206             try {
207               RegionEnvironment env = load(path, className, priority, ourConf);
208               configured.add(env);
209               LOG.info("Loaded coprocessor " + className + " from HTD of " +
210                 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
211             } catch (Throwable t) {
212               // Coprocessor failed to load, do we abort on error?
213               if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
214                 abortServer(className, t);
215               } else {
216                 LOG.error("Failed to load coprocessor " + className, t);
217               }
218             }
219           } else {
220             LOG.error("Malformed table coprocessor specification: key=" + key +
221               ", spec: " + spec);
222           }
223         } catch (Exception ioe) {
224           LOG.error("Malformed table coprocessor specification: key=" + key +
225             ", spec: " + spec);
226         }
227       }
228     }
229     // add together to coprocessor set for COW efficiency
230     coprocessors.addAll(configured);
231   }
232 
233   @Override
234   public RegionEnvironment createEnvironment(Class<?> implClass,
235       Coprocessor instance, int priority, int seq, Configuration conf) {
236     // Check if it's an Endpoint.
237     // Due to current dynamic protocol design, Endpoint
238     // uses a different way to be registered and executed.
239     // It uses a visitor pattern to invoke registered Endpoint
240     // method.
241     for (Class<?> c : implClass.getInterfaces()) {
242       if (CoprocessorService.class.isAssignableFrom(c)) {
243         region.registerService( ((CoprocessorService)instance).getService() );
244       }
245     }
246     ConcurrentMap<String, Object> classData;
247     // make sure only one thread can add maps
248     synchronized (sharedDataMap) {
249       // as long as at least one RegionEnvironment holds on to its classData it will
250       // remain in this map
251       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
252       if (classData == null) {
253         classData = new ConcurrentHashMap<String, Object>();
254         sharedDataMap.put(implClass.getName(), classData);
255       }
256     }
257     return new RegionEnvironment(instance, priority, seq, conf, region,
258         rsServices, classData);
259   }
260 
261   /**
262    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
263    *
264    * For example, {@link
265    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
266    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
267    *
268    * See also
269    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
270    *    CoprocessorEnvironment, Throwable)}
271    * @param env The coprocessor that threw the exception.
272    * @param e The exception that was thrown.
273    */
274   private void handleCoprocessorThrowableNoRethrow(
275       final CoprocessorEnvironment env, final Throwable e) {
276     try {
277       handleCoprocessorThrowable(env,e);
278     } catch (IOException ioe) {
279       // We cannot throw exceptions from the caller hook, so ignore.
280       LOG.warn(
281         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
282         e + ". Ignoring.",e);
283     }
284   }
285 
286   /**
287    * Invoked before a region open.
288    *
289    * @throws IOException Signals that an I/O exception has occurred.
290    */
291   public void preOpen() throws IOException {
292     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
293     for (RegionEnvironment env: coprocessors) {
294       if (env.getInstance() instanceof RegionObserver) {
295         ctx = ObserverContext.createAndPrepare(env, ctx);
296         Thread currentThread = Thread.currentThread();
297         ClassLoader cl = currentThread.getContextClassLoader();
298         try {
299           currentThread.setContextClassLoader(env.getClassLoader());
300           ((RegionObserver) env.getInstance()).preOpen(ctx);
301         } catch (Throwable e) {
302           handleCoprocessorThrowable(env, e);
303         } finally {
304           currentThread.setContextClassLoader(cl);
305         }
306         if (ctx.shouldComplete()) {
307           break;
308         }
309       }
310     }
311   }
312 
313   /**
314    * Invoked after a region open
315    */
316   public void postOpen() {
317     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
318     for (RegionEnvironment env: coprocessors) {
319       if (env.getInstance() instanceof RegionObserver) {
320         ctx = ObserverContext.createAndPrepare(env, ctx);
321         Thread currentThread = Thread.currentThread();
322         ClassLoader cl = currentThread.getContextClassLoader();
323         try {
324           currentThread.setContextClassLoader(env.getClassLoader());
325           ((RegionObserver) env.getInstance()).postOpen(ctx);
326         } catch (Throwable e) {
327           handleCoprocessorThrowableNoRethrow(env, e);
328         } finally {
329           currentThread.setContextClassLoader(cl);
330         }
331         if (ctx.shouldComplete()) {
332           break;
333         }
334       }
335     }
336   }
337 
338   /**
339    * Invoked after log replay on region
340    */
341   public void postLogReplay() {
342     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
343     for (RegionEnvironment env: coprocessors) {
344       if (env.getInstance() instanceof RegionObserver) {
345         ctx = ObserverContext.createAndPrepare(env, ctx);
346         Thread currentThread = Thread.currentThread();
347         ClassLoader cl = currentThread.getContextClassLoader();
348         try {
349           currentThread.setContextClassLoader(env.getClassLoader());
350           ((RegionObserver) env.getInstance()).postLogReplay(ctx);
351         } catch (Throwable e) {
352           handleCoprocessorThrowableNoRethrow(env, e);
353         } finally {
354           currentThread.setContextClassLoader(cl);
355         }
356         if (ctx.shouldComplete()) {
357           break;
358         }
359       }
360     }
361   }
362 
363   /**
364    * Invoked before a region is closed
365    * @param abortRequested true if the server is aborting
366    */
367   public void preClose(final boolean abortRequested) throws IOException {
368     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
369     for (RegionEnvironment env: coprocessors) {
370       if (env.getInstance() instanceof RegionObserver) {
371         ctx = ObserverContext.createAndPrepare(env, ctx);
372         Thread currentThread = Thread.currentThread();
373         ClassLoader cl = currentThread.getContextClassLoader();
374         try {
375           currentThread.setContextClassLoader(env.getClassLoader());
376           ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested);
377         } catch (Throwable e) {
378           handleCoprocessorThrowable(env, e);
379         } finally {
380           currentThread.setContextClassLoader(cl);
381         }
382       }
383     }
384   }
385 
386   /**
387    * Invoked after a region is closed
388    * @param abortRequested true if the server is aborting
389    */
390   public void postClose(final boolean abortRequested) {
391     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
392     for (RegionEnvironment env: coprocessors) {
393       if (env.getInstance() instanceof RegionObserver) {
394         ctx = ObserverContext.createAndPrepare(env, ctx);
395         Thread currentThread = Thread.currentThread();
396         ClassLoader cl = currentThread.getContextClassLoader();
397         try {
398           currentThread.setContextClassLoader(env.getClassLoader());
399           ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested);
400         } catch (Throwable e) {
401           handleCoprocessorThrowableNoRethrow(env, e);
402         } finally {
403           currentThread.setContextClassLoader(cl);
404         }
405       }
406       shutdown(env);
407     }
408   }
409 
410   /**
411    * See
412    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
413    */
414   public InternalScanner preCompactScannerOpen(final Store store,
415       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
416       final CompactionRequest request) throws IOException {
417     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
418     InternalScanner s = null;
419     for (RegionEnvironment env: coprocessors) {
420       if (env.getInstance() instanceof RegionObserver) {
421         ctx = ObserverContext.createAndPrepare(env, ctx);
422         Thread currentThread = Thread.currentThread();
423         ClassLoader cl = currentThread.getContextClassLoader();
424         try {
425           currentThread.setContextClassLoader(env.getClassLoader());
426           s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store,
427             scanners, scanType, earliestPutTs, s, request);
428         } catch (Throwable e) {
429           handleCoprocessorThrowable(env,e);
430         } finally {
431           currentThread.setContextClassLoader(cl);
432         }
433         if (ctx.shouldComplete()) {
434           break;
435         }
436       }
437     }
438     return s;
439   }
440 
441   /**
442    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
443    * available candidates.
444    * @param store The store where compaction is being requested
445    * @param candidates The currently available store files
446    * @param request custom compaction request
447    * @return If {@code true}, skip the normal selection process and use the current list
448    * @throws IOException
449    */
450   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
451       final CompactionRequest request) throws IOException {
452     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
453     boolean bypass = false;
454     for (RegionEnvironment env: coprocessors) {
455       if (env.getInstance() instanceof RegionObserver) {
456         ctx = ObserverContext.createAndPrepare(env, ctx);
457         Thread currentThread = Thread.currentThread();
458         ClassLoader cl = currentThread.getContextClassLoader();
459         try {
460           currentThread.setContextClassLoader(env.getClassLoader());
461           ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates,
462             request);
463         } catch (Throwable e) {
464           handleCoprocessorThrowable(env,e);
465         } finally {
466           currentThread.setContextClassLoader(cl);
467         }
468         bypass |= ctx.shouldBypass();
469         if (ctx.shouldComplete()) {
470           break;
471         }
472       }
473     }
474     return bypass;
475   }
476 
477   /**
478    * Called after the {@link StoreFile}s to be compacted have been selected from the available
479    * candidates.
480    * @param store The store where compaction is being requested
481    * @param selected The store files selected to compact
482    * @param request custom compaction
483    */
484   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
485       final CompactionRequest request) {
486     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
487     for (RegionEnvironment env: coprocessors) {
488       if (env.getInstance() instanceof RegionObserver) {
489         ctx = ObserverContext.createAndPrepare(env, ctx);
490         Thread currentThread = Thread.currentThread();
491         ClassLoader cl = currentThread.getContextClassLoader();
492         try {
493           currentThread.setContextClassLoader(env.getClassLoader());
494           ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected,
495             request);
496         } catch (Throwable e) {
497           handleCoprocessorThrowableNoRethrow(env,e);
498         } finally {
499           currentThread.setContextClassLoader(cl);
500         }
501         if (ctx.shouldComplete()) {
502           break;
503         }
504       }
505     }
506   }
507 
508   /**
509    * Called prior to rewriting the store files selected for compaction
510    * @param store the store being compacted
511    * @param scanner the scanner used to read store data during compaction
512    * @param scanType type of Scan
513    * @param request the compaction that will be executed
514    * @throws IOException
515    */
516   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
517       final ScanType scanType, final CompactionRequest request) throws IOException {
518     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
519     boolean bypass = false;
520     InternalScanner s = scanner;
521     for (RegionEnvironment env: coprocessors) {
522       if (env.getInstance() instanceof RegionObserver) {
523         ctx = ObserverContext.createAndPrepare(env, ctx);
524         Thread currentThread = Thread.currentThread();
525         ClassLoader cl = currentThread.getContextClassLoader();
526         try {
527           currentThread.setContextClassLoader(env.getClassLoader());
528           s = ((RegionObserver) env.getInstance()).preCompact(ctx, store, s, scanType,
529             request);
530         } catch (Throwable e) {
531           handleCoprocessorThrowable(env,e);
532         } finally {
533           currentThread.setContextClassLoader(cl);
534         }
535         bypass |= ctx.shouldBypass();
536         if (ctx.shouldComplete()) {
537           break;
538         }
539       }
540     }
541     return bypass ? null : s;
542   }
543 
544   /**
545    * Called after the store compaction has completed.
546    * @param store the store being compacted
547    * @param resultFile the new store file written during compaction
548    * @param request the compaction that is being executed
549    * @throws IOException
550    */
551   public void postCompact(final Store store, final StoreFile resultFile,
552       final CompactionRequest request) throws IOException {
553     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
554     for (RegionEnvironment env: coprocessors) {
555       if (env.getInstance() instanceof RegionObserver) {
556         ctx = ObserverContext.createAndPrepare(env, ctx);
557         Thread currentThread = Thread.currentThread();
558         ClassLoader cl = currentThread.getContextClassLoader();
559         try {
560           currentThread.setContextClassLoader(env.getClassLoader());
561           ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
562         } catch (Throwable e) {
563           handleCoprocessorThrowable(env, e);
564         } finally {
565           currentThread.setContextClassLoader(cl);
566         }
567         if (ctx.shouldComplete()) {
568           break;
569         }
570       }
571     }
572   }
573 
574   /**
575    * Invoked before a memstore flush
576    * @throws IOException
577    */
578   public InternalScanner preFlush(final Store store, final InternalScanner scanner) throws IOException {
579     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
580     boolean bypass = false;
581     InternalScanner s = scanner;
582     for (RegionEnvironment env: coprocessors) {
583       if (env.getInstance() instanceof RegionObserver) {
584         ctx = ObserverContext.createAndPrepare(env, ctx);
585         Thread currentThread = Thread.currentThread();
586         ClassLoader cl = currentThread.getContextClassLoader();
587         try {
588           currentThread.setContextClassLoader(env.getClassLoader());
589           s = ((RegionObserver)env.getInstance()).preFlush(ctx, store, s);
590         } catch (Throwable e) {
591           handleCoprocessorThrowable(env,e);
592         } finally {
593           currentThread.setContextClassLoader(cl);
594         }
595         bypass |= ctx.shouldBypass();
596         if (ctx.shouldComplete()) {
597           break;
598         }
599       }
600     }
601     return bypass ? null : s;
602   }
603 
604   /**
605    * Invoked before a memstore flush
606    * @throws IOException
607    */
608   public void preFlush() throws IOException {
609     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
610     for (RegionEnvironment env: coprocessors) {
611       if (env.getInstance() instanceof RegionObserver) {
612         ctx = ObserverContext.createAndPrepare(env, ctx);
613         Thread currentThread = Thread.currentThread();
614         ClassLoader cl = currentThread.getContextClassLoader();
615         try {
616           currentThread.setContextClassLoader(env.getClassLoader());
617           ((RegionObserver)env.getInstance()).preFlush(ctx);
618         } catch (Throwable e) {
619           handleCoprocessorThrowable(env, e);
620         } finally {
621           currentThread.setContextClassLoader(cl);
622         }
623         if (ctx.shouldComplete()) {
624           break;
625         }
626       }
627     }
628   }
629 
630   /**
631    * See
632    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
633    *    Store, KeyValueScanner, InternalScanner)}
634    */
635   public InternalScanner preFlushScannerOpen(final Store store,
636       final KeyValueScanner memstoreScanner) throws IOException {
637     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
638     InternalScanner s = null;
639     for (RegionEnvironment env : coprocessors) {
640       if (env.getInstance() instanceof RegionObserver) {
641         ctx = ObserverContext.createAndPrepare(env, ctx);
642         Thread currentThread = Thread.currentThread();
643         ClassLoader cl = currentThread.getContextClassLoader();
644         try {
645           currentThread.setContextClassLoader(env.getClassLoader());
646           s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store,
647             memstoreScanner, s);
648         } catch (Throwable e) {
649           handleCoprocessorThrowable(env, e);
650         } finally {
651           currentThread.setContextClassLoader(cl);
652         }
653         if (ctx.shouldComplete()) {
654           break;
655         }
656       }
657     }
658     return s;
659   }
660 
661   /**
662    * Invoked after a memstore flush
663    * @throws IOException
664    */
665   public void postFlush() throws IOException {
666     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
667     for (RegionEnvironment env: coprocessors) {
668       if (env.getInstance() instanceof RegionObserver) {
669         ctx = ObserverContext.createAndPrepare(env, ctx);
670         Thread currentThread = Thread.currentThread();
671         ClassLoader cl = currentThread.getContextClassLoader();
672         try {
673           currentThread.setContextClassLoader(env.getClassLoader());
674           ((RegionObserver)env.getInstance()).postFlush(ctx);
675         } catch (Throwable e) {
676           handleCoprocessorThrowable(env, e);
677         } finally {
678           currentThread.setContextClassLoader(cl);
679         }
680         if (ctx.shouldComplete()) {
681           break;
682         }
683       }
684     }
685   }
686 
687   /**
688    * Invoked after a memstore flush
689    * @throws IOException
690    */
691   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
692     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
693     for (RegionEnvironment env: coprocessors) {
694       if (env.getInstance() instanceof RegionObserver) {
695         ctx = ObserverContext.createAndPrepare(env, ctx);
696         Thread currentThread = Thread.currentThread();
697         ClassLoader cl = currentThread.getContextClassLoader();
698         try {
699           currentThread.setContextClassLoader(env.getClassLoader());
700           ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
701         } catch (Throwable e) {
702           handleCoprocessorThrowable(env, e);
703         } finally {
704           currentThread.setContextClassLoader(cl);
705         }
706         if (ctx.shouldComplete()) {
707           break;
708         }
709       }
710     }
711   }
712 
713   /**
714    * Invoked just before a split
715    * @throws IOException
716    */
717   public void preSplit() throws IOException {
718     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
719     for (RegionEnvironment env: coprocessors) {
720       if (env.getInstance() instanceof RegionObserver) {
721         ctx = ObserverContext.createAndPrepare(env, ctx);
722         Thread currentThread = Thread.currentThread();
723         ClassLoader cl = currentThread.getContextClassLoader();
724         try {
725           currentThread.setContextClassLoader(env.getClassLoader());
726           ((RegionObserver)env.getInstance()).preSplit(ctx);
727         } catch (Throwable e) {
728           handleCoprocessorThrowable(env, e);
729         } finally {
730           currentThread.setContextClassLoader(cl);
731         }
732         if (ctx.shouldComplete()) {
733           break;
734         }
735       }
736     }
737   }
738 
739   /**
740    * Invoked just before a split
741    * @throws IOException
742    */
743   public void preSplit(final byte[] splitRow) throws IOException {
744     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
745     for (RegionEnvironment env: coprocessors) {
746       if (env.getInstance() instanceof RegionObserver) {
747         ctx = ObserverContext.createAndPrepare(env, ctx);
748         Thread currentThread = Thread.currentThread();
749         ClassLoader cl = currentThread.getContextClassLoader();
750         try {
751           currentThread.setContextClassLoader(env.getClassLoader());
752           ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow);
753         } catch (Throwable e) {
754           handleCoprocessorThrowable(env, e);
755         } finally {
756           currentThread.setContextClassLoader(cl);
757         }
758         if (ctx.shouldComplete()) {
759           break;
760         }
761       }
762     }
763   }
764 
765   /**
766    * Invoked just after a split
767    * @param l the new left-hand daughter region
768    * @param r the new right-hand daughter region
769    * @throws IOException
770    */
771   public void postSplit(final HRegion l, final HRegion r) throws IOException {
772     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
773     for (RegionEnvironment env: coprocessors) {
774       if (env.getInstance() instanceof RegionObserver) {
775         ctx = ObserverContext.createAndPrepare(env, ctx);
776         Thread currentThread = Thread.currentThread();
777         ClassLoader cl = currentThread.getContextClassLoader();
778         try {
779           currentThread.setContextClassLoader(env.getClassLoader());
780           ((RegionObserver)env.getInstance()).postSplit(ctx, l, r);
781         } catch (Throwable e) {
782           handleCoprocessorThrowable(env, e);
783         } finally {
784           currentThread.setContextClassLoader(cl);
785         }
786         if (ctx.shouldComplete()) {
787           break;
788         }
789       }
790     }
791   }
792 
793   public boolean preSplitBeforePONR(final byte[] splitKey, 
794       final List<Mutation> metaEntries) throws IOException {
795     boolean bypass = false;
796     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
797     for (RegionEnvironment env : coprocessors) {
798       if (env.getInstance() instanceof RegionObserver) {
799         ctx = ObserverContext.createAndPrepare(env, ctx);
800         Thread currentThread = Thread.currentThread();
801         ClassLoader cl = currentThread.getContextClassLoader();
802         try {
803           currentThread.setContextClassLoader(env.getClassLoader());
804           ((RegionObserver) env.getInstance()).preSplitBeforePONR(ctx, splitKey, metaEntries);
805         } catch (Throwable e) {
806           handleCoprocessorThrowable(env, e);
807         } finally {
808           currentThread.setContextClassLoader(cl);
809         }
810         bypass |= ctx.shouldBypass();
811         if (ctx.shouldComplete()) {
812           break;
813         }
814       }
815     }
816     return bypass;
817   }
818 
819   public void preSplitAfterPONR() throws IOException {
820     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
821     for (RegionEnvironment env : coprocessors) {
822       if (env.getInstance() instanceof RegionObserver) {
823         ctx = ObserverContext.createAndPrepare(env, ctx);
824         Thread currentThread = Thread.currentThread();
825         ClassLoader cl = currentThread.getContextClassLoader();
826         try {
827           currentThread.setContextClassLoader(env.getClassLoader());
828           ((RegionObserver) env.getInstance()).preSplitAfterPONR(ctx);
829         } catch (Throwable e) {
830           handleCoprocessorThrowable(env, e);
831         } finally {
832           currentThread.setContextClassLoader(cl);
833         }
834         if (ctx.shouldComplete()) {
835           break;
836         }
837       }
838     }
839   }
840 
841   /**
842    * Invoked just before the rollback of a failed split is started
843    * @throws IOException
844    */
845   public void preRollBackSplit() throws IOException {
846     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
847     for (RegionEnvironment env : coprocessors) {
848       if (env.getInstance() instanceof RegionObserver) {
849         ctx = ObserverContext.createAndPrepare(env, ctx);
850         Thread currentThread = Thread.currentThread();
851         ClassLoader cl = currentThread.getContextClassLoader();
852         try {
853           currentThread.setContextClassLoader(env.getClassLoader());
854           ((RegionObserver) env.getInstance()).preRollBackSplit(ctx);
855         } catch (Throwable e) {
856           handleCoprocessorThrowable(env, e);
857         } finally {
858           currentThread.setContextClassLoader(cl);
859         }
860         if (ctx.shouldComplete()) {
861           break;
862         }
863       }
864     }
865   }
866 
867   /**
868    * Invoked just after the rollback of a failed split is done
869    * @throws IOException
870    */
871   public void postRollBackSplit() throws IOException {
872     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
873     for (RegionEnvironment env : coprocessors) {
874       if (env.getInstance() instanceof RegionObserver) {
875         ctx = ObserverContext.createAndPrepare(env, ctx);
876         Thread currentThread = Thread.currentThread();
877         ClassLoader cl = currentThread.getContextClassLoader();
878         try {
879           currentThread.setContextClassLoader(env.getClassLoader());
880           ((RegionObserver) env.getInstance()).postRollBackSplit(ctx);
881         } catch (Throwable e) {
882           handleCoprocessorThrowable(env, e);
883         } finally {
884           currentThread.setContextClassLoader(cl);
885         }
886         if (ctx.shouldComplete()) {
887           break;
888         }
889       }
890     }
891   }
892 
893   /**
894    * Invoked after a split is completed irrespective of a failure or success.
895    * @throws IOException
896    */
897   public void postCompleteSplit() throws IOException {
898     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
899     for (RegionEnvironment env : coprocessors) {
900       if (env.getInstance() instanceof RegionObserver) {
901         ctx = ObserverContext.createAndPrepare(env, ctx);
902         Thread currentThread = Thread.currentThread();
903         ClassLoader cl = currentThread.getContextClassLoader();
904         try {
905           currentThread.setContextClassLoader(env.getClassLoader());
906           ((RegionObserver) env.getInstance()).postCompleteSplit(ctx);
907         } catch (Throwable e) {
908           handleCoprocessorThrowable(env, e);
909         } finally {
910           currentThread.setContextClassLoader(cl);
911         }
912         if (ctx.shouldComplete()) {
913           break;
914         }
915       }
916     }
917   }
918 
919   // RegionObserver support
920 
921   /**
922    * @param row the row key
923    * @param family the family
924    * @param result the result set from the region
925    * @return true if default processing should be bypassed
926    * @exception IOException Exception
927    */
928   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
929       final Result result) throws IOException {
930     boolean bypass = false;
931     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
932     for (RegionEnvironment env: coprocessors) {
933       if (env.getInstance() instanceof RegionObserver) {
934         ctx = ObserverContext.createAndPrepare(env, ctx);
935         Thread currentThread = Thread.currentThread();
936         ClassLoader cl = currentThread.getContextClassLoader();
937         try {
938           currentThread.setContextClassLoader(env.getClassLoader());
939           ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row, family, result);
940         } catch (Throwable e) {
941           handleCoprocessorThrowable(env, e);
942         } finally {
943           currentThread.setContextClassLoader(cl);
944         }
945         bypass |= ctx.shouldBypass();
946         if (ctx.shouldComplete()) {
947           break;
948         }
949       }
950     }
951     return bypass;
952   }
953 
954   /**
955    * @param row the row key
956    * @param family the family
957    * @param result the result set from the region
958    * @exception IOException Exception
959    */
960   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
961       final Result result) throws IOException {
962     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
963     for (RegionEnvironment env: coprocessors) {
964       if (env.getInstance() instanceof RegionObserver) {
965         ctx = ObserverContext.createAndPrepare(env, ctx);
966         Thread currentThread = Thread.currentThread();
967         ClassLoader cl = currentThread.getContextClassLoader();
968         try {
969           currentThread.setContextClassLoader(env.getClassLoader());
970           ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row, family, result);
971         } catch (Throwable e) {
972           handleCoprocessorThrowable(env, e);
973         } finally {
974           currentThread.setContextClassLoader(cl);
975         }
976         if (ctx.shouldComplete()) {
977           break;
978         }
979       }
980     }
981   }
982 
983   /**
984    * @param get the Get request
985    * @return true if default processing should be bypassed
986    * @exception IOException Exception
987    */
988   public boolean preGet(final Get get, final List<Cell> results)
989       throws IOException {
990     boolean bypass = false;
991     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
992     for (RegionEnvironment env: coprocessors) {
993       if (env.getInstance() instanceof RegionObserver) {
994         ctx = ObserverContext.createAndPrepare(env, ctx);
995         Thread currentThread = Thread.currentThread();
996         ClassLoader cl = currentThread.getContextClassLoader();
997         try {
998           currentThread.setContextClassLoader(env.getClassLoader());
999           ((RegionObserver)env.getInstance()).preGetOp(ctx, get, results);
1000         } catch (Throwable e) {
1001           handleCoprocessorThrowable(env, e);
1002         } finally {
1003           currentThread.setContextClassLoader(cl);
1004         }
1005         bypass |= ctx.shouldBypass();
1006         if (ctx.shouldComplete()) {
1007           break;
1008         }
1009       }
1010     }
1011     return bypass;
1012   }
1013 
1014   /**
1015    * @param get the Get request
1016    * @param results the result sett
1017    * @exception IOException Exception
1018    */
1019   public void postGet(final Get get, final List<Cell> results)
1020   throws IOException {
1021     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1022     for (RegionEnvironment env: coprocessors) {
1023       if (env.getInstance() instanceof RegionObserver) {
1024         ctx = ObserverContext.createAndPrepare(env, ctx);
1025         Thread currentThread = Thread.currentThread();
1026         ClassLoader cl = currentThread.getContextClassLoader();
1027         try {
1028           currentThread.setContextClassLoader(env.getClassLoader());
1029           ((RegionObserver)env.getInstance()).postGetOp(ctx, get, results);
1030         } catch (Throwable e) {
1031           handleCoprocessorThrowable(env, e);
1032         } finally {
1033           currentThread.setContextClassLoader(cl);
1034         }
1035         if (ctx.shouldComplete()) {
1036           break;
1037         }
1038       }
1039     }
1040   }
1041 
1042   /**
1043    * @param get the Get request
1044    * @return true or false to return to client if bypassing normal operation,
1045    * or null otherwise
1046    * @exception IOException Exception
1047    */
1048   public Boolean preExists(final Get get) throws IOException {
1049     boolean bypass = false;
1050     boolean exists = false;
1051     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1052     for (RegionEnvironment env: coprocessors) {
1053       if (env.getInstance() instanceof RegionObserver) {
1054         ctx = ObserverContext.createAndPrepare(env, ctx);
1055         Thread currentThread = Thread.currentThread();
1056         ClassLoader cl = currentThread.getContextClassLoader();
1057         try {
1058           currentThread.setContextClassLoader(env.getClassLoader());
1059           exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists);
1060         } catch (Throwable e) {
1061           handleCoprocessorThrowable(env, e);
1062         } finally {
1063           currentThread.setContextClassLoader(cl);
1064         }
1065         bypass |= ctx.shouldBypass();
1066         if (ctx.shouldComplete()) {
1067           break;
1068         }
1069       }
1070     }
1071     return bypass ? exists : null;
1072   }
1073 
1074   /**
1075    * @param get the Get request
1076    * @param exists the result returned by the region server
1077    * @return the result to return to the client
1078    * @exception IOException Exception
1079    */
1080   public boolean postExists(final Get get, boolean exists)
1081       throws IOException {
1082     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1083     for (RegionEnvironment env: coprocessors) {
1084       if (env.getInstance() instanceof RegionObserver) {
1085         ctx = ObserverContext.createAndPrepare(env, ctx);
1086         Thread currentThread = Thread.currentThread();
1087         ClassLoader cl = currentThread.getContextClassLoader();
1088         try {
1089           currentThread.setContextClassLoader(env.getClassLoader());
1090           exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists);
1091         } catch (Throwable e) {
1092           handleCoprocessorThrowable(env, e);
1093         } finally {
1094           currentThread.setContextClassLoader(cl);
1095         }
1096         if (ctx.shouldComplete()) {
1097           break;
1098         }
1099       }
1100     }
1101     return exists;
1102   }
1103 
1104   /**
1105    * @param put The Put object
1106    * @param edit The WALEdit object.
1107    * @param durability The durability used
1108    * @return true if default processing should be bypassed
1109    * @exception IOException Exception
1110    */
1111   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
1112       throws IOException {
1113     boolean bypass = false;
1114     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1115     for (RegionEnvironment env: coprocessors) {
1116       if (env.getInstance() instanceof RegionObserver) {
1117         ctx = ObserverContext.createAndPrepare(env, ctx);
1118         Thread currentThread = Thread.currentThread();
1119         ClassLoader cl = currentThread.getContextClassLoader();
1120         try {
1121           currentThread.setContextClassLoader(env.getClassLoader());
1122           ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability);
1123         } catch (Throwable e) {
1124           handleCoprocessorThrowable(env, e);
1125         } finally {
1126           currentThread.setContextClassLoader(cl);
1127         }
1128         bypass |= ctx.shouldBypass();
1129         if (ctx.shouldComplete()) {
1130           break;
1131         }
1132       }
1133     }
1134     return bypass;
1135   }
1136 
1137   /**
1138    * @param put The Put object
1139    * @param edit The WALEdit object.
1140    * @param durability The durability used
1141    * @exception IOException Exception
1142    */
1143   public void postPut(final Put put, final WALEdit edit, final Durability durability)
1144       throws IOException {
1145     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1146     for (RegionEnvironment env: coprocessors) {
1147       if (env.getInstance() instanceof RegionObserver) {
1148         ctx = ObserverContext.createAndPrepare(env, ctx);
1149         Thread currentThread = Thread.currentThread();
1150         ClassLoader cl = currentThread.getContextClassLoader();
1151         try {
1152           currentThread.setContextClassLoader(env.getClassLoader());
1153           ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability);
1154         } catch (Throwable e) {
1155           handleCoprocessorThrowable(env, e);
1156         } finally {
1157           currentThread.setContextClassLoader(cl);
1158         }
1159         if (ctx.shouldComplete()) {
1160           break;
1161         }
1162       }
1163     }
1164   }
1165 
1166   /**
1167    * @param delete The Delete object
1168    * @param edit The WALEdit object.
1169    * @param durability The durability used
1170    * @return true if default processing should be bypassed
1171    * @exception IOException Exception
1172    */
1173   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
1174       throws IOException {
1175     boolean bypass = false;
1176     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1177     for (RegionEnvironment env: coprocessors) {
1178       if (env.getInstance() instanceof RegionObserver) {
1179         ctx = ObserverContext.createAndPrepare(env, ctx);
1180         Thread currentThread = Thread.currentThread();
1181         ClassLoader cl = currentThread.getContextClassLoader();
1182         try {
1183           currentThread.setContextClassLoader(env.getClassLoader());
1184           ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability);
1185         } catch (Throwable e) {
1186           handleCoprocessorThrowable(env, e);
1187         } finally {
1188           currentThread.setContextClassLoader(cl);
1189         }
1190         bypass |= ctx.shouldBypass();
1191         if (ctx.shouldComplete()) {
1192           break;
1193         }
1194       }
1195     }
1196     return bypass;
1197   }
1198 
1199   /**
1200    * @param delete The Delete object
1201    * @param edit The WALEdit object.
1202    * @param durability The durability used
1203    * @exception IOException Exception
1204    */
1205   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
1206       throws IOException {
1207     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1208     for (RegionEnvironment env: coprocessors) {
1209       if (env.getInstance() instanceof RegionObserver) {
1210         ctx = ObserverContext.createAndPrepare(env, ctx);
1211         Thread currentThread = Thread.currentThread();
1212         ClassLoader cl = currentThread.getContextClassLoader();
1213         try {
1214           currentThread.setContextClassLoader(env.getClassLoader());
1215           ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability);
1216         } catch (Throwable e) {
1217           handleCoprocessorThrowable(env, e);
1218         } finally {
1219           currentThread.setContextClassLoader(cl);
1220         }
1221         if (ctx.shouldComplete()) {
1222           break;
1223         }
1224       }
1225     }
1226   }
1227   
1228   /**
1229    * @param miniBatchOp
1230    * @return true if default processing should be bypassed
1231    * @throws IOException
1232    */
1233   public boolean preBatchMutate(
1234       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1235     boolean bypass = false;
1236     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1237     for (RegionEnvironment env : coprocessors) {
1238       if (env.getInstance() instanceof RegionObserver) {
1239         ctx = ObserverContext.createAndPrepare(env, ctx);
1240         Thread currentThread = Thread.currentThread();
1241         ClassLoader cl = currentThread.getContextClassLoader();
1242         try {
1243           currentThread.setContextClassLoader(env.getClassLoader());
1244           ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp);
1245         } catch (Throwable e) {
1246           handleCoprocessorThrowable(env, e);
1247         } finally {
1248           currentThread.setContextClassLoader(cl);
1249         }
1250         bypass |= ctx.shouldBypass();
1251         if (ctx.shouldComplete()) {
1252           break;
1253         }
1254       }
1255     }
1256     return bypass;
1257   }
1258 
1259   /**
1260    * @param miniBatchOp
1261    * @throws IOException
1262    */
1263   public void postBatchMutate(
1264       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1265     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1266     for (RegionEnvironment env : coprocessors) {
1267       if (env.getInstance() instanceof RegionObserver) {
1268         ctx = ObserverContext.createAndPrepare(env, ctx);
1269         Thread currentThread = Thread.currentThread();
1270         ClassLoader cl = currentThread.getContextClassLoader();
1271         try {
1272           currentThread.setContextClassLoader(env.getClassLoader());
1273           ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp);
1274         } catch (Throwable e) {
1275           handleCoprocessorThrowable(env, e);
1276         } finally {
1277           currentThread.setContextClassLoader(cl);
1278         }
1279         if (ctx.shouldComplete()) {
1280           break;
1281         }
1282       }
1283     }
1284   }
1285 
1286   public void postBatchMutateIndispensably(
1287       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1288       throws IOException {
1289     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1290     for (RegionEnvironment env : coprocessors) {
1291       if (env.getInstance() instanceof RegionObserver) {
1292         ctx = ObserverContext.createAndPrepare(env, ctx);
1293         Thread currentThread = Thread.currentThread();
1294         ClassLoader cl = currentThread.getContextClassLoader();
1295         try {
1296           currentThread.setContextClassLoader(env.getClassLoader());
1297           ((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp,
1298             success);
1299         } catch (Throwable e) {
1300           handleCoprocessorThrowable(env, e);
1301         } finally {
1302           currentThread.setContextClassLoader(cl);
1303         }
1304         if (ctx.shouldComplete()) {
1305           break;
1306         }
1307       }
1308     }
1309   }
1310 
1311   /**
1312    * @param row row to check
1313    * @param family column family
1314    * @param qualifier column qualifier
1315    * @param compareOp the comparison operation
1316    * @param comparator the comparator
1317    * @param put data to put if check succeeds
1318    * @return true or false to return to client if default processing should
1319    * be bypassed, or null otherwise
1320    * @throws IOException e
1321    */
1322   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1323       final byte [] qualifier, final CompareOp compareOp,
1324       final ByteArrayComparable comparator, final Put put)
1325     throws IOException {
1326     boolean bypass = false;
1327     boolean result = false;
1328     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1329     for (RegionEnvironment env: coprocessors) {
1330       if (env.getInstance() instanceof RegionObserver) {
1331         ctx = ObserverContext.createAndPrepare(env, ctx);
1332         Thread currentThread = Thread.currentThread();
1333         ClassLoader cl = currentThread.getContextClassLoader();
1334         try {
1335           currentThread.setContextClassLoader(env.getClassLoader());
1336           result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family, qualifier,
1337             compareOp, comparator, put, result);
1338         } catch (Throwable e) {
1339           handleCoprocessorThrowable(env, e);
1340         } finally {
1341           currentThread.setContextClassLoader(cl);
1342         }
1343         bypass |= ctx.shouldBypass();
1344         if (ctx.shouldComplete()) {
1345           break;
1346         }
1347       }
1348     }
1349     return bypass ? result : null;
1350   }
1351 
1352   /**
1353    * @param row row to check
1354    * @param family column family
1355    * @param qualifier column qualifier
1356    * @param compareOp the comparison operation
1357    * @param comparator the comparator
1358    * @param put data to put if check succeeds
1359    * @throws IOException e
1360    */
1361   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1362       final byte [] qualifier, final CompareOp compareOp,
1363       final ByteArrayComparable comparator, final Put put,
1364       boolean result)
1365     throws IOException {
1366     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1367     for (RegionEnvironment env: coprocessors) {
1368       if (env.getInstance() instanceof RegionObserver) {
1369         ctx = ObserverContext.createAndPrepare(env, ctx);
1370         Thread currentThread = Thread.currentThread();
1371         ClassLoader cl = currentThread.getContextClassLoader();
1372         try {
1373           currentThread.setContextClassLoader(env.getClassLoader());
1374           result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row, family,
1375             qualifier, compareOp, comparator, put, result);
1376         } catch (Throwable e) {
1377           handleCoprocessorThrowable(env, e);
1378         } finally {
1379           currentThread.setContextClassLoader(cl);
1380         }
1381         if (ctx.shouldComplete()) {
1382           break;
1383         }
1384       }
1385     }
1386     return result;
1387   }
1388 
1389   /**
1390    * @param row row to check
1391    * @param family column family
1392    * @param qualifier column qualifier
1393    * @param compareOp the comparison operation
1394    * @param comparator the comparator
1395    * @param delete delete to commit if check succeeds
1396    * @return true or false to return to client if default processing should
1397    * be bypassed, or null otherwise
1398    * @throws IOException e
1399    */
1400   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1401       final byte [] qualifier, final CompareOp compareOp,
1402       final ByteArrayComparable comparator, final Delete delete)
1403       throws IOException {
1404     boolean bypass = false;
1405     boolean result = false;
1406     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1407     for (RegionEnvironment env: coprocessors) {
1408       if (env.getInstance() instanceof RegionObserver) {
1409         ctx = ObserverContext.createAndPrepare(env, ctx);
1410         Thread currentThread = Thread.currentThread();
1411         ClassLoader cl = currentThread.getContextClassLoader();
1412         try {
1413           currentThread.setContextClassLoader(env.getClassLoader());
1414           result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row, family,
1415             qualifier, compareOp, comparator, delete, result);
1416         } catch (Throwable e) {
1417           handleCoprocessorThrowable(env, e);
1418         } finally {
1419           currentThread.setContextClassLoader(cl);
1420         }
1421         bypass |= ctx.shouldBypass();
1422         if (ctx.shouldComplete()) {
1423           break;
1424         }
1425       }
1426     }
1427     return bypass ? result : null;
1428   }
1429 
1430   /**
1431    * @param row row to check
1432    * @param family column family
1433    * @param qualifier column qualifier
1434    * @param compareOp the comparison operation
1435    * @param comparator the comparator
1436    * @param delete delete to commit if check succeeds
1437    * @throws IOException e
1438    */
1439   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1440       final byte [] qualifier, final CompareOp compareOp,
1441       final ByteArrayComparable comparator, final Delete delete,
1442       boolean result) throws IOException {
1443     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1444     for (RegionEnvironment env: coprocessors) {
1445       if (env.getInstance() instanceof RegionObserver) {
1446         ctx = ObserverContext.createAndPrepare(env, ctx);
1447         Thread currentThread = Thread.currentThread();
1448         ClassLoader cl = currentThread.getContextClassLoader();
1449         try {
1450           currentThread.setContextClassLoader(env.getClassLoader());
1451           result = ((RegionObserver)env.getInstance()).postCheckAndDelete(ctx, row, family,
1452             qualifier, compareOp, comparator, delete, result);
1453         } catch (Throwable e) {
1454           handleCoprocessorThrowable(env, e);
1455         } finally {
1456           currentThread.setContextClassLoader(cl);
1457         }
1458         if (ctx.shouldComplete()) {
1459           break;
1460         }
1461       }
1462     }
1463     return result;
1464   }
1465 
1466   /**
1467    * @param append append object
1468    * @return result to return to client if default operation should be
1469    * bypassed, null otherwise
1470    * @throws IOException if an error occurred on the coprocessor
1471    */
1472   public Result preAppend(final Append append) throws IOException {
1473     boolean bypass = false;
1474     Result result = null;
1475     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1476     for (RegionEnvironment env: coprocessors) {
1477       if (env.getInstance() instanceof RegionObserver) {
1478         ctx = ObserverContext.createAndPrepare(env, ctx);
1479         Thread currentThread = Thread.currentThread();
1480         ClassLoader cl = currentThread.getContextClassLoader();
1481         try {
1482           currentThread.setContextClassLoader(env.getClassLoader());
1483           result = ((RegionObserver)env.getInstance()).preAppend(ctx, append);
1484         } catch (Throwable e) {
1485           handleCoprocessorThrowable(env, e);
1486         } finally {
1487           currentThread.setContextClassLoader(cl);
1488         }
1489         bypass |= ctx.shouldBypass();
1490         if (ctx.shouldComplete()) {
1491           break;
1492         }
1493       }
1494     }
1495     return bypass ? result : null;
1496   }
1497 
1498   /**
1499    * @param increment increment object
1500    * @return result to return to client if default operation should be
1501    * bypassed, null otherwise
1502    * @throws IOException if an error occurred on the coprocessor
1503    */
1504   public Result preIncrement(final Increment increment) throws IOException {
1505     boolean bypass = false;
1506     Result result = null;
1507     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1508     for (RegionEnvironment env: coprocessors) {
1509       if (env.getInstance() instanceof RegionObserver) {
1510         ctx = ObserverContext.createAndPrepare(env, ctx);
1511         Thread currentThread = Thread.currentThread();
1512         ClassLoader cl = currentThread.getContextClassLoader();
1513         try {
1514           currentThread.setContextClassLoader(env.getClassLoader());
1515           result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment);
1516         } catch (Throwable e) {
1517           handleCoprocessorThrowable(env, e);
1518         } finally {
1519           currentThread.setContextClassLoader(cl);
1520         }
1521         bypass |= ctx.shouldBypass();
1522         if (ctx.shouldComplete()) {
1523           break;
1524         }
1525       }
1526     }
1527     return bypass ? result : null;
1528   }
1529 
1530   /**
1531    * @param append Append object
1532    * @param result the result returned by the append
1533    * @throws IOException if an error occurred on the coprocessor
1534    */
1535   public void postAppend(final Append append, final Result result) throws IOException {
1536     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1537     for (RegionEnvironment env: coprocessors) {
1538       if (env.getInstance() instanceof RegionObserver) {
1539         ctx = ObserverContext.createAndPrepare(env, ctx);
1540         Thread currentThread = Thread.currentThread();
1541         ClassLoader cl = currentThread.getContextClassLoader();
1542         try {
1543           currentThread.setContextClassLoader(env.getClassLoader());
1544           ((RegionObserver)env.getInstance()).postAppend(ctx, append, result);
1545         } catch (Throwable e) {
1546           handleCoprocessorThrowable(env, e);
1547         } finally {
1548           currentThread.setContextClassLoader(cl);
1549         }
1550         if (ctx.shouldComplete()) {
1551           break;
1552         }
1553       }
1554     }
1555   }
1556 
1557   /**
1558    * @param increment increment object
1559    * @param result the result returned by postIncrement
1560    * @throws IOException if an error occurred on the coprocessor
1561    */
1562   public Result postIncrement(final Increment increment, Result result) throws IOException {
1563     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1564     for (RegionEnvironment env: coprocessors) {
1565       if (env.getInstance() instanceof RegionObserver) {
1566         ctx = ObserverContext.createAndPrepare(env, ctx);
1567         Thread currentThread = Thread.currentThread();
1568         ClassLoader cl = currentThread.getContextClassLoader();
1569         try {
1570           currentThread.setContextClassLoader(env.getClassLoader());
1571           result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result);
1572         } catch (Throwable e) {
1573           handleCoprocessorThrowable(env, e);
1574         } finally {
1575           currentThread.setContextClassLoader(cl);
1576         }
1577         if (ctx.shouldComplete()) {
1578           break;
1579         }
1580       }
1581     }
1582     return result;
1583   }
1584 
1585   /**
1586    * @param scan the Scan specification
1587    * @return scanner id to return to client if default operation should be
1588    * bypassed, false otherwise
1589    * @exception IOException Exception
1590    */
1591   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1592     boolean bypass = false;
1593     RegionScanner s = null;
1594     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1595     for (RegionEnvironment env: coprocessors) {
1596       if (env.getInstance() instanceof RegionObserver) {
1597         ctx = ObserverContext.createAndPrepare(env, ctx);
1598         Thread currentThread = Thread.currentThread();
1599         ClassLoader cl = currentThread.getContextClassLoader();
1600         try {
1601           currentThread.setContextClassLoader(env.getClassLoader());
1602           s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s);
1603         } catch (Throwable e) {
1604           handleCoprocessorThrowable(env, e);
1605         } finally {
1606           currentThread.setContextClassLoader(cl);
1607         }
1608         bypass |= ctx.shouldBypass();
1609         if (ctx.shouldComplete()) {
1610           break;
1611         }
1612       }
1613     }
1614     return bypass ? s : null;
1615   }
1616 
1617   /**
1618    * See
1619    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1620    *    Store, Scan, NavigableSet, KeyValueScanner)}
1621    */
1622   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1623       final NavigableSet<byte[]> targetCols) throws IOException {
1624     KeyValueScanner s = null;
1625     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1626     for (RegionEnvironment env: coprocessors) {
1627       if (env.getInstance() instanceof RegionObserver) {
1628         ctx = ObserverContext.createAndPrepare(env, ctx);
1629         Thread currentThread = Thread.currentThread();
1630         ClassLoader cl = currentThread.getContextClassLoader();
1631         try {
1632           currentThread.setContextClassLoader(env.getClassLoader());
1633           s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
1634             targetCols, s);
1635         } catch (Throwable e) {
1636           handleCoprocessorThrowable(env, e);
1637         } finally {
1638           currentThread.setContextClassLoader(cl);
1639         }
1640         if (ctx.shouldComplete()) {
1641           break;
1642         }
1643       }
1644     }
1645     return s;
1646   }
1647 
1648   /**
1649    * @param scan the Scan specification
1650    * @param s the scanner
1651    * @return the scanner instance to use
1652    * @exception IOException Exception
1653    */
1654   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1655     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1656     for (RegionEnvironment env: coprocessors) {
1657       if (env.getInstance() instanceof RegionObserver) {
1658         ctx = ObserverContext.createAndPrepare(env, ctx);
1659         Thread currentThread = Thread.currentThread();
1660         ClassLoader cl = currentThread.getContextClassLoader();
1661         try {
1662           currentThread.setContextClassLoader(env.getClassLoader());
1663           s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s);
1664         } catch (Throwable e) {
1665           handleCoprocessorThrowable(env, e);
1666         } finally {
1667           currentThread.setContextClassLoader(cl);
1668         }
1669         if (ctx.shouldComplete()) {
1670           break;
1671         }
1672       }
1673     }
1674     return s;
1675   }
1676 
1677   /**
1678    * @param s the scanner
1679    * @param results the result set returned by the region server
1680    * @param limit the maximum number of results to return
1681    * @return 'has next' indication to client if bypassing default behavior, or
1682    * null otherwise
1683    * @exception IOException Exception
1684    */
1685   public Boolean preScannerNext(final InternalScanner s,
1686       final List<Result> results, final int limit) throws IOException {
1687     boolean bypass = false;
1688     boolean hasNext = false;
1689     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1690     for (RegionEnvironment env: coprocessors) {
1691       if (env.getInstance() instanceof RegionObserver) {
1692         ctx = ObserverContext.createAndPrepare(env, ctx);
1693         Thread currentThread = Thread.currentThread();
1694         ClassLoader cl = currentThread.getContextClassLoader();
1695         try {
1696           currentThread.setContextClassLoader(env.getClassLoader());
1697           hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results, limit,
1698             hasNext);
1699         } catch (Throwable e) {
1700           handleCoprocessorThrowable(env, e);
1701         } finally {
1702           currentThread.setContextClassLoader(cl);
1703         }
1704         bypass |= ctx.shouldBypass();
1705         if (ctx.shouldComplete()) {
1706           break;
1707         }
1708       }
1709     }
1710     return bypass ? hasNext : null;
1711   }
1712 
1713   /**
1714    * @param s the scanner
1715    * @param results the result set returned by the region server
1716    * @param limit the maximum number of results to return
1717    * @param hasMore
1718    * @return 'has more' indication to give to client
1719    * @exception IOException Exception
1720    */
1721   public boolean postScannerNext(final InternalScanner s,
1722       final List<Result> results, final int limit, boolean hasMore)
1723       throws IOException {
1724     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1725     for (RegionEnvironment env: coprocessors) {
1726       if (env.getInstance() instanceof RegionObserver) {
1727         ctx = ObserverContext.createAndPrepare(env, ctx);
1728         Thread currentThread = Thread.currentThread();
1729         ClassLoader cl = currentThread.getContextClassLoader();
1730         try {
1731           currentThread.setContextClassLoader(env.getClassLoader());
1732           hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s, results, limit,
1733             hasMore);
1734         } catch (Throwable e) {
1735           handleCoprocessorThrowable(env, e);
1736         } finally {
1737           currentThread.setContextClassLoader(cl);
1738         }
1739         if (ctx.shouldComplete()) {
1740           break;
1741         }
1742       }
1743     }
1744     return hasMore;
1745   }
1746 
1747   /**
1748    * This will be called by the scan flow when the current scanned row is being filtered out by the
1749    * filter.
1750    * @param s the scanner
1751    * @param currentRow The current rowkey which got filtered out
1752    * @param offset offset to rowkey
1753    * @param length length of rowkey
1754    * @return whether more rows are available for the scanner or not
1755    * @throws IOException
1756    */
1757   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1758       final int offset, final short length) throws IOException {
1759     boolean hasMore = true; // By default assume more rows there.
1760     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1761     for (RegionEnvironment env : coprocessors) {
1762       if (env.getInstance() instanceof RegionObserver) {
1763         ctx = ObserverContext.createAndPrepare(env, ctx);
1764         Thread currentThread = Thread.currentThread();
1765         ClassLoader cl = currentThread.getContextClassLoader();
1766         try {
1767           currentThread.setContextClassLoader(env.getClassLoader());
1768           hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow,
1769             offset, length, hasMore);
1770         } catch (Throwable e) {
1771           handleCoprocessorThrowable(env, e);
1772         } finally {
1773           currentThread.setContextClassLoader(cl);
1774         }
1775         if (ctx.shouldComplete()) {
1776           break;
1777         }
1778       }
1779     }
1780     return hasMore;
1781   }
1782   
1783   /**
1784    * @param s the scanner
1785    * @return true if default behavior should be bypassed, false otherwise
1786    * @exception IOException Exception
1787    */
1788   public boolean preScannerClose(final InternalScanner s) throws IOException {
1789     boolean bypass = false;
1790     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1791     for (RegionEnvironment env: coprocessors) {
1792       if (env.getInstance() instanceof RegionObserver) {
1793         ctx = ObserverContext.createAndPrepare(env, ctx);
1794         Thread currentThread = Thread.currentThread();
1795         ClassLoader cl = currentThread.getContextClassLoader();
1796         try {
1797           currentThread.setContextClassLoader(env.getClassLoader());
1798           ((RegionObserver)env.getInstance()).preScannerClose(ctx, s);
1799         } catch (Throwable e) {
1800           handleCoprocessorThrowable(env, e);
1801         } finally {
1802           currentThread.setContextClassLoader(cl);
1803         }
1804         bypass |= ctx.shouldBypass();
1805         if (ctx.shouldComplete()) {
1806           break;
1807         }
1808       }
1809     }
1810     return bypass;
1811   }
1812 
1813   /**
1814    * @param s the scanner
1815    * @exception IOException Exception
1816    */
1817   public void postScannerClose(final InternalScanner s) throws IOException {
1818     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1819     for (RegionEnvironment env: coprocessors) {
1820       if (env.getInstance() instanceof RegionObserver) {
1821         ctx = ObserverContext.createAndPrepare(env, ctx);
1822         Thread currentThread = Thread.currentThread();
1823         ClassLoader cl = currentThread.getContextClassLoader();
1824         try {
1825           currentThread.setContextClassLoader(env.getClassLoader());
1826           ((RegionObserver)env.getInstance()).postScannerClose(ctx, s);
1827         } catch (Throwable e) {
1828           handleCoprocessorThrowable(env, e);
1829         } finally {
1830           currentThread.setContextClassLoader(cl);
1831         }
1832         if (ctx.shouldComplete()) {
1833           break;
1834         }
1835       }
1836     }
1837   }
1838 
1839   /**
1840    * @param info
1841    * @param logKey
1842    * @param logEdit
1843    * @return true if default behavior should be bypassed, false otherwise
1844    * @throws IOException
1845    */
1846   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1847       final WALEdit logEdit) throws IOException {
1848     boolean bypass = false;
1849     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1850     for (RegionEnvironment env: coprocessors) {
1851       if (env.getInstance() instanceof RegionObserver) {
1852         ctx = ObserverContext.createAndPrepare(env, ctx);
1853         Thread currentThread = Thread.currentThread();
1854         ClassLoader cl = currentThread.getContextClassLoader();
1855         try {
1856           currentThread.setContextClassLoader(env.getClassLoader());
1857           ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit);
1858         } catch (Throwable e) {
1859           handleCoprocessorThrowable(env, e);
1860         } finally {
1861           currentThread.setContextClassLoader(cl);
1862         }
1863         bypass |= ctx.shouldBypass();
1864         if (ctx.shouldComplete()) {
1865           break;
1866         }
1867       }
1868     }
1869     return bypass;
1870   }
1871 
1872   /**
1873    * @param info
1874    * @param logKey
1875    * @param logEdit
1876    * @throws IOException
1877    */
1878   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1879       throws IOException {
1880     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1881     for (RegionEnvironment env: coprocessors) {
1882       if (env.getInstance() instanceof RegionObserver) {
1883         ctx = ObserverContext.createAndPrepare(env, ctx);
1884         Thread currentThread = Thread.currentThread();
1885         ClassLoader cl = currentThread.getContextClassLoader();
1886         try {
1887           currentThread.setContextClassLoader(env.getClassLoader());
1888           ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit);
1889         } catch (Throwable e) {
1890           handleCoprocessorThrowable(env, e);
1891         } finally {
1892           currentThread.setContextClassLoader(cl);
1893         }
1894         if (ctx.shouldComplete()) {
1895           break;
1896         }
1897       }
1898     }
1899   }
1900 
1901   /**
1902    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1903    * @return true if the default operation should be bypassed
1904    * @throws IOException
1905    */
1906   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1907     boolean bypass = false;
1908     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1909     for (RegionEnvironment env: coprocessors) {
1910       if (env.getInstance() instanceof RegionObserver) {
1911         ctx = ObserverContext.createAndPrepare(env, ctx);
1912         Thread currentThread = Thread.currentThread();
1913         ClassLoader cl = currentThread.getContextClassLoader();
1914         try {
1915           currentThread.setContextClassLoader(env.getClassLoader());
1916           ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths);
1917         } catch (Throwable e) {
1918           handleCoprocessorThrowable(env, e);
1919         } finally {
1920           currentThread.setContextClassLoader(cl);
1921         }
1922         bypass |= ctx.shouldBypass();
1923         if (ctx.shouldComplete()) {
1924           break;
1925         }
1926       }
1927     }
1928     return bypass;
1929   }
1930 
1931   /**
1932    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1933    * @param hasLoaded whether load was successful or not
1934    * @return the possibly modified value of hasLoaded
1935    * @throws IOException
1936    */
1937   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1938       boolean hasLoaded) throws IOException {
1939     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1940     for (RegionEnvironment env: coprocessors) {
1941       if (env.getInstance() instanceof RegionObserver) {
1942         ctx = ObserverContext.createAndPrepare(env, ctx);
1943         Thread currentThread = Thread.currentThread();
1944         ClassLoader cl = currentThread.getContextClassLoader();
1945         try {
1946           currentThread.setContextClassLoader(env.getClassLoader());
1947           hasLoaded = ((RegionObserver)env.getInstance()).postBulkLoadHFile(ctx, familyPaths,
1948             hasLoaded);
1949         } catch (Throwable e) {
1950           handleCoprocessorThrowable(env, e);
1951         } finally {
1952           currentThread.setContextClassLoader(cl);
1953         }
1954         if (ctx.shouldComplete()) {
1955           break;
1956         }
1957       }
1958     }
1959     return hasLoaded;
1960   }
1961 
1962   public void postStartRegionOperation(final Operation op) throws IOException {
1963     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1964     for (RegionEnvironment env : coprocessors) {
1965       if (env.getInstance() instanceof RegionObserver) {
1966         ctx = ObserverContext.createAndPrepare(env, ctx);
1967         Thread currentThread = Thread.currentThread();
1968         ClassLoader cl = currentThread.getContextClassLoader();
1969         try {
1970           currentThread.setContextClassLoader(env.getClassLoader());
1971           ((RegionObserver) env.getInstance()).postStartRegionOperation(ctx, op);
1972         } catch (Throwable e) {
1973           handleCoprocessorThrowable(env, e);
1974         } finally {
1975           currentThread.setContextClassLoader(cl);
1976         }
1977         if (ctx.shouldComplete()) {
1978           break;
1979         }
1980       }
1981     }
1982   }
1983 
1984   public void postCloseRegionOperation(final Operation op) throws IOException {
1985     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
1986     for (RegionEnvironment env : coprocessors) {
1987       if (env.getInstance() instanceof RegionObserver) {
1988         ctx = ObserverContext.createAndPrepare(env, ctx);
1989         Thread currentThread = Thread.currentThread();
1990         ClassLoader cl = currentThread.getContextClassLoader();
1991         try {
1992           currentThread.setContextClassLoader(env.getClassLoader());
1993           ((RegionObserver) env.getInstance()).postCloseRegionOperation(ctx, op);
1994         } catch (Throwable e) {
1995           handleCoprocessorThrowable(env, e);
1996         } finally {
1997           currentThread.setContextClassLoader(cl);
1998         }
1999         if (ctx.shouldComplete()) {
2000           break;
2001         }
2002       }
2003     }
2004   }
2005 
2006   /**
2007    * @param fs fileystem to read from
2008    * @param p path to the file
2009    * @param in {@link FSDataInputStreamWrapper}
2010    * @param size Full size of the file
2011    * @param cacheConf
2012    * @param r original reference file. This will be not null only when reading a split file.
2013    * @return a Reader instance to use instead of the base reader if overriding
2014    * default behavior, null otherwise
2015    * @throws IOException
2016    */
2017   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
2018       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
2019       final Reference r) throws IOException {
2020     StoreFile.Reader reader = null;
2021     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
2022     for (RegionEnvironment env : coprocessors) {
2023       if (env.getInstance() instanceof RegionObserver) {
2024         ctx = ObserverContext.createAndPrepare(env, ctx);
2025         Thread currentThread = Thread.currentThread();
2026         ClassLoader cl = currentThread.getContextClassLoader();
2027         try {
2028           currentThread.setContextClassLoader(env.getClassLoader());
2029           reader = ((RegionObserver) env.getInstance()).preStoreFileReaderOpen(ctx, fs, p, in,
2030             size, cacheConf, r, reader);
2031         } catch (Throwable e) {
2032           handleCoprocessorThrowable(env, e);
2033         } finally {
2034           currentThread.setContextClassLoader(cl);
2035         }
2036         if (ctx.shouldComplete()) {
2037           break;
2038         }
2039       }
2040     }
2041     return reader;
2042   }
2043 
2044   /**
2045    * @param fs fileystem to read from
2046    * @param p path to the file
2047    * @param in {@link FSDataInputStreamWrapper}
2048    * @param size Full size of the file
2049    * @param cacheConf
2050    * @param r original reference file. This will be not null only when reading a split file.
2051    * @param reader the base reader instance
2052    * @return The reader to use
2053    * @throws IOException
2054    */
2055   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
2056       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
2057       final Reference r, StoreFile.Reader reader) throws IOException {
2058     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
2059     for (RegionEnvironment env : coprocessors) {
2060       if (env.getInstance() instanceof RegionObserver) {
2061         ctx = ObserverContext.createAndPrepare(env, ctx);
2062         Thread currentThread = Thread.currentThread();
2063         ClassLoader cl = currentThread.getContextClassLoader();
2064         try {
2065           currentThread.setContextClassLoader(env.getClassLoader());
2066           reader = ((RegionObserver) env.getInstance()).postStoreFileReaderOpen(ctx, fs, p, in,
2067             size, cacheConf, r, reader);
2068         } catch (Throwable e) {
2069           handleCoprocessorThrowable(env, e);
2070         } finally {
2071           currentThread.setContextClassLoader(cl);
2072         }
2073         if (ctx.shouldComplete()) {
2074           break;
2075         }
2076       }
2077     }
2078     return reader;
2079   }
2080 
2081   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
2082       final Cell oldCell, Cell newCell) throws IOException {
2083     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
2084     for (RegionEnvironment env : coprocessors) {
2085       if (env.getInstance() instanceof RegionObserver) {
2086         ctx = ObserverContext.createAndPrepare(env, ctx);
2087         Thread currentThread = Thread.currentThread();
2088         ClassLoader cl = currentThread.getContextClassLoader();
2089         try {
2090           currentThread.setContextClassLoader(env.getClassLoader());
2091           newCell = ((RegionObserver) env.getInstance()).postMutationBeforeWAL(ctx, opType,
2092             mutation, oldCell, newCell);
2093         } catch (Throwable e) {
2094           handleCoprocessorThrowable(env, e);
2095         } finally {
2096           currentThread.setContextClassLoader(cl);
2097         }
2098         if (ctx.shouldComplete()) {
2099           break;
2100         }
2101       }
2102     }
2103     return newCell;
2104   }
2105 
2106   public Message preEndpointInvocation(final Service service, final String methodName,
2107       Message request) throws IOException {
2108     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
2109     for (RegionEnvironment env : coprocessors) {
2110       if (env.getInstance() instanceof EndpointObserver) {
2111         ctx = ObserverContext.createAndPrepare(env, ctx);
2112         Thread currentThread = Thread.currentThread();
2113         ClassLoader cl = currentThread.getContextClassLoader();
2114         try {
2115           currentThread.setContextClassLoader(env.getClassLoader());
2116           request = ((EndpointObserver) env.getInstance()).preEndpointInvocation(ctx, service,
2117             methodName, request);
2118         } catch (Throwable e) {
2119           handleCoprocessorThrowable(env, e);
2120         } finally {
2121           currentThread.setContextClassLoader(cl);
2122         }
2123         if (ctx.shouldComplete()) {
2124           break;
2125         }
2126       }
2127     }
2128     return request;
2129   }
2130 
2131   public void postEndpointInvocation(final Service service, final String methodName,
2132       final Message request, final Message.Builder responseBuilder) throws IOException {
2133     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
2134     for (RegionEnvironment env : coprocessors) {
2135       if (env.getInstance() instanceof EndpointObserver) {
2136         ctx = ObserverContext.createAndPrepare(env, ctx);
2137         Thread currentThread = Thread.currentThread();
2138         ClassLoader cl = currentThread.getContextClassLoader();
2139         try {
2140           currentThread.setContextClassLoader(env.getClassLoader());
2141           ((EndpointObserver) env.getInstance()).postEndpointInvocation(ctx, service,
2142             methodName, request, responseBuilder);
2143         } catch (Throwable e) {
2144           handleCoprocessorThrowable(env, e);
2145         } finally {
2146           currentThread.setContextClassLoader(cl);
2147         }
2148         if (ctx.shouldComplete()) {
2149           break;
2150         }
2151       }
2152     }
2153   }
2154 
2155 }