View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.UUID;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.regex.Matcher;
35  
36  import com.google.common.collect.ImmutableList;
37  import com.google.common.collect.Lists;
38  import com.google.protobuf.Message;
39  import com.google.protobuf.Service;
40  
41  import org.apache.commons.collections.map.AbstractReferenceMap;
42  import org.apache.commons.collections.map.ReferenceMap;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.Coprocessor;
53  import org.apache.hadoop.hbase.CoprocessorEnvironment;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.client.Append;
60  import org.apache.hadoop.hbase.client.Delete;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.client.Increment;
64  import org.apache.hadoop.hbase.client.Mutation;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
69  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
70  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
71  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
72  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
74  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
75  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
76  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
77  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
78  import org.apache.hadoop.hbase.io.Reference;
79  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
80  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
81  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
82  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
85  import org.apache.hadoop.hbase.util.Bytes;
86  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
87  import org.apache.hadoop.hbase.util.Pair;
88  
89  /**
90   * Implements the coprocessor environment and runtime support for coprocessors
91   * loaded within a {@link HRegion}.
92   */
93  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
94  @InterfaceStability.Evolving
95  public class RegionCoprocessorHost
96      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
97  
98    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
99    // The shared data map
100   private static ReferenceMap sharedDataMap =
101       new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
102 
103   /**
104    * Encapsulation of the environment of each coprocessor
105    */
106   static class RegionEnvironment extends CoprocessorHost.Environment
107       implements RegionCoprocessorEnvironment {
108 
109     private HRegion region;
110     private RegionServerServices rsServices;
111     ConcurrentMap<String, Object> sharedData;
112     private static final int LATENCY_BUFFER_SIZE = 100;
113     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
114         LATENCY_BUFFER_SIZE);
115     private final boolean useLegacyPre;
116     private final boolean useLegacyPost;
117 
118     /**
119      * Constructor
120      * @param impl the coprocessor instance
121      * @param priority chaining priority
122      */
123     public RegionEnvironment(final Coprocessor impl, final int priority,
124         final int seq, final Configuration conf, final HRegion region,
125         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
126       super(impl, priority, seq, conf);
127       this.region = region;
128       this.rsServices = services;
129       this.sharedData = sharedData;
130       // Pick which version of the WAL related events we'll call.
131       // This way we avoid calling the new version on older RegionObservers so
132       // we can maintain binary compatibility.
133       // See notes in javadoc for RegionObserver
134       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
135           HRegionInfo.class, WALKey.class, WALEdit.class);
136       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
137           HRegionInfo.class, WALKey.class, WALEdit.class);
138     }
139 
140     /** @return the region */
141     @Override
142     public HRegion getRegion() {
143       return region;
144     }
145 
146     /** @return reference to the region server services */
147     @Override
148     public RegionServerServices getRegionServerServices() {
149       return rsServices;
150     }
151 
152     public void shutdown() {
153       super.shutdown();
154     }
155 
156     @Override
157     public ConcurrentMap<String, Object> getSharedData() {
158       return sharedData;
159     }
160 
161     public void offerExecutionLatency(long latencyNanos) {
162       coprocessorTimeNanos.offer(latencyNanos);
163     }
164 
165     public Collection<Long> getExecutionLatenciesNanos() {
166       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
167       coprocessorTimeNanos.drainTo(latencies);
168       return latencies;
169     }
170 
171     @Override
172     public HRegionInfo getRegionInfo() {
173       return region.getRegionInfo();
174     }
175 
176   }
177 
178   static class TableCoprocessorAttribute {
179     private Path path;
180     private String className;
181     private int priority;
182     private Configuration conf;
183 
184     public TableCoprocessorAttribute(Path path, String className, int priority,
185         Configuration conf) {
186       this.path = path;
187       this.className = className;
188       this.priority = priority;
189       this.conf = conf;
190     }
191 
192     public Path getPath() {
193       return path;
194     }
195 
196     public String getClassName() {
197       return className;
198     }
199 
200     public int getPriority() {
201       return priority;
202     }
203 
204     public Configuration getConf() {
205       return conf;
206     }
207   }
208 
209   /** The region server services */
210   RegionServerServices rsServices;
211   /** The region */
212   HRegion region;
213 
214   /**
215    * Constructor
216    * @param region the region
217    * @param rsServices interface to available region server functionality
218    * @param conf the configuration
219    */
220   public RegionCoprocessorHost(final HRegion region,
221       final RegionServerServices rsServices, final Configuration conf) {
222     super(rsServices);
223     this.conf = conf;
224     this.rsServices = rsServices;
225     this.region = region;
226     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
227 
228     // load system default cp's from configuration.
229     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
230 
231     // load system default cp's for user tables from configuration.
232     if (!region.getRegionInfo().getTable().isSystemTable()) {
233       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
234     }
235 
236     // load Coprocessor From HDFS
237     loadTableCoprocessors(conf);
238   }
239 
240   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
241       HTableDescriptor htd) {
242     List<TableCoprocessorAttribute> result = Lists.newArrayList();
243     for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) {
244       String key = Bytes.toString(e.getKey().get()).trim();
245       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
246         String spec = Bytes.toString(e.getValue().get()).trim();
247         // found one
248         try {
249           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
250           if (matcher.matches()) {
251             // jar file path can be empty if the cp class can be loaded
252             // from class loader.
253             Path path = matcher.group(1).trim().isEmpty() ?
254                 null : new Path(matcher.group(1).trim());
255             String className = matcher.group(2).trim();
256             if (className.isEmpty()) {
257               LOG.error("Malformed table coprocessor specification: key=" +
258                 key + ", spec: " + spec);
259               continue;
260             }
261             int priority = matcher.group(3).trim().isEmpty() ?
262                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
263             String cfgSpec = null;
264             try {
265               cfgSpec = matcher.group(4);
266             } catch (IndexOutOfBoundsException ex) {
267               // ignore
268             }
269             Configuration ourConf;
270             if (cfgSpec != null) {
271               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
272               // do an explicit deep copy of the passed configuration
273               ourConf = new Configuration(false);
274               HBaseConfiguration.merge(ourConf, conf);
275               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
276               while (m.find()) {
277                 ourConf.set(m.group(1), m.group(2));
278               }
279             } else {
280               ourConf = conf;
281             }
282             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
283           } else {
284             LOG.error("Malformed table coprocessor specification: key=" + key +
285               ", spec: " + spec);
286           }
287         } catch (Exception ioe) {
288           LOG.error("Malformed table coprocessor specification: key=" + key +
289             ", spec: " + spec);
290         }
291       }
292     }
293     return result;
294   }
295 
296   /**
297    * Sanity check the table coprocessor attributes of the supplied schema. Will
298    * throw an exception if there is a problem.
299    * @param conf
300    * @param htd
301    * @throws IOException
302    */
303   public static void testTableCoprocessorAttrs(final Configuration conf,
304       final HTableDescriptor htd) throws IOException {
305     String pathPrefix = UUID.randomUUID().toString();
306     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
307       if (attr.getPriority() < 0) {
308         throw new IOException("Priority for coprocessor " + attr.getClassName() +
309           " cannot be less than 0");
310       }
311       ClassLoader old = Thread.currentThread().getContextClassLoader();
312       try {
313         ClassLoader cl;
314         if (attr.getPath() != null) {
315           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
316             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
317         } else {
318           cl = CoprocessorHost.class.getClassLoader();
319         }
320         Thread.currentThread().setContextClassLoader(cl);
321         cl.loadClass(attr.getClassName());
322       } catch (ClassNotFoundException e) {
323         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
324       } finally {
325         Thread.currentThread().setContextClassLoader(old);
326       }
327     }
328   }
329 
330   void loadTableCoprocessors(final Configuration conf) {
331     // scan the table attributes for coprocessor load specifications
332     // initialize the coprocessors
333     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
334     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
335         region.getTableDesc())) {
336       // Load encompasses classloading and coprocessor initialization
337       try {
338         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
339           attr.getConf());
340         configured.add(env);
341         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
342             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
343       } catch (Throwable t) {
344         // Coprocessor failed to load, do we abort on error?
345         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
346           abortServer(attr.getClassName(), t);
347         } else {
348           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
349         }
350       }
351     }
352     // add together to coprocessor set for COW efficiency
353     coprocessors.addAll(configured);
354   }
355 
356   @Override
357   public RegionEnvironment createEnvironment(Class<?> implClass,
358       Coprocessor instance, int priority, int seq, Configuration conf) {
359     // Check if it's an Endpoint.
360     // Due to current dynamic protocol design, Endpoint
361     // uses a different way to be registered and executed.
362     // It uses a visitor pattern to invoke registered Endpoint
363     // method.
364     for (Class<?> c : implClass.getInterfaces()) {
365       if (CoprocessorService.class.isAssignableFrom(c)) {
366         region.registerService( ((CoprocessorService)instance).getService() );
367       }
368     }
369     ConcurrentMap<String, Object> classData;
370     // make sure only one thread can add maps
371     synchronized (sharedDataMap) {
372       // as long as at least one RegionEnvironment holds on to its classData it will
373       // remain in this map
374       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
375       if (classData == null) {
376         classData = new ConcurrentHashMap<String, Object>();
377         sharedDataMap.put(implClass.getName(), classData);
378       }
379     }
380     return new RegionEnvironment(instance, priority, seq, conf, region,
381         rsServices, classData);
382   }
383 
384   /**
385    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
386    *
387    * For example, {@link
388    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
389    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
390    *
391    * See also
392    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
393    *    CoprocessorEnvironment, Throwable)}
394    * @param env The coprocessor that threw the exception.
395    * @param e The exception that was thrown.
396    */
397   private void handleCoprocessorThrowableNoRethrow(
398       final CoprocessorEnvironment env, final Throwable e) {
399     try {
400       handleCoprocessorThrowable(env,e);
401     } catch (IOException ioe) {
402       // We cannot throw exceptions from the caller hook, so ignore.
403       LOG.warn(
404         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
405         e + ". Ignoring.",e);
406     }
407   }
408 
409   /**
410    * Invoked before a region open.
411    *
412    * @throws IOException Signals that an I/O exception has occurred.
413    */
414   public void preOpen() throws IOException {
415     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
416       @Override
417       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
418           throws IOException {
419         oserver.preOpen(ctx);
420       }
421     });
422   }
423 
424   /**
425    * Invoked after a region open
426    */
427   public void postOpen() {
428     try {
429       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
430         @Override
431         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
432             throws IOException {
433           oserver.postOpen(ctx);
434         }
435       });
436     } catch (IOException e) {
437       LOG.warn(e);
438     }
439   }
440 
441   /**
442    * Invoked after log replay on region
443    */
444   public void postLogReplay() {
445     try {
446       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
447         @Override
448         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
449             throws IOException {
450           oserver.postLogReplay(ctx);
451         }
452       });
453     } catch (IOException e) {
454       LOG.warn(e);
455     }
456   }
457 
458   /**
459    * Invoked before a region is closed
460    * @param abortRequested true if the server is aborting
461    */
462   public void preClose(final boolean abortRequested) throws IOException {
463     execOperation(false, new RegionOperation() {
464       @Override
465       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
466           throws IOException {
467         oserver.preClose(ctx, abortRequested);
468       }
469     });
470   }
471 
472   /**
473    * Invoked after a region is closed
474    * @param abortRequested true if the server is aborting
475    */
476   public void postClose(final boolean abortRequested) {
477     try {
478       execOperation(false, new RegionOperation() {
479         @Override
480         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
481             throws IOException {
482           oserver.postClose(ctx, abortRequested);
483         }
484         public void postEnvCall(RegionEnvironment env) {
485           shutdown(env);
486         }
487       });
488     } catch (IOException e) {
489       LOG.warn(e);
490     }
491   }
492 
493   /**
494    * See
495    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
496    */
497   public InternalScanner preCompactScannerOpen(final Store store,
498       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
499       final CompactionRequest request) throws IOException {
500     return execOperationWithResult(null,
501         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
502       @Override
503       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
504           throws IOException {
505         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
506           earliestPutTs, getResult(), request));
507       }
508     });
509   }
510 
511   /**
512    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
513    * available candidates.
514    * @param store The store where compaction is being requested
515    * @param candidates The currently available store files
516    * @param request custom compaction request
517    * @return If {@code true}, skip the normal selection process and use the current list
518    * @throws IOException
519    */
520   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
521       final CompactionRequest request) throws IOException {
522     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
523       @Override
524       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
525           throws IOException {
526         oserver.preCompactSelection(ctx, store, candidates, request);
527       }
528     });
529   }
530 
531   /**
532    * Called after the {@link StoreFile}s to be compacted have been selected from the available
533    * candidates.
534    * @param store The store where compaction is being requested
535    * @param selected The store files selected to compact
536    * @param request custom compaction
537    */
538   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
539       final CompactionRequest request) {
540     try {
541       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
542         @Override
543         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
544             throws IOException {
545           oserver.postCompactSelection(ctx, store, selected, request);
546         }
547       });
548     } catch (IOException e) {
549       LOG.warn(e);
550     }
551   }
552 
553   /**
554    * Called prior to rewriting the store files selected for compaction
555    * @param store the store being compacted
556    * @param scanner the scanner used to read store data during compaction
557    * @param scanType type of Scan
558    * @param request the compaction that will be executed
559    * @throws IOException
560    */
561   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
562       final ScanType scanType, final CompactionRequest request) throws IOException {
563     return execOperationWithResult(false, scanner,
564         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
565       @Override
566       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
567           throws IOException {
568         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
569       }
570     });
571   }
572 
573   /**
574    * Called after the store compaction has completed.
575    * @param store the store being compacted
576    * @param resultFile the new store file written during compaction
577    * @param request the compaction that is being executed
578    * @throws IOException
579    */
580   public void postCompact(final Store store, final StoreFile resultFile,
581       final CompactionRequest request) throws IOException {
582     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
583       @Override
584       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
585           throws IOException {
586         oserver.postCompact(ctx, store, resultFile, request);
587       }
588     });
589   }
590 
591   /**
592    * Invoked before a memstore flush
593    * @throws IOException
594    */
595   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
596       throws IOException {
597     return execOperationWithResult(false, scanner,
598         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
599       @Override
600       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
601           throws IOException {
602         setResult(oserver.preFlush(ctx, store, getResult()));
603       }
604     });
605   }
606 
607   /**
608    * Invoked before a memstore flush
609    * @throws IOException
610    */
611   public void preFlush() throws IOException {
612     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
613       @Override
614       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
615           throws IOException {
616         oserver.preFlush(ctx);
617       }
618     });
619   }
620 
621   /**
622    * See
623    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
624    *    Store, KeyValueScanner, InternalScanner)}
625    */
626   public InternalScanner preFlushScannerOpen(final Store store,
627       final KeyValueScanner memstoreScanner) throws IOException {
628     return execOperationWithResult(null,
629         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
630       @Override
631       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
632           throws IOException {
633         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
634       }
635     });
636   }
637 
638   /**
639    * Invoked after a memstore flush
640    * @throws IOException
641    */
642   public void postFlush() throws IOException {
643     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
644       @Override
645       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
646           throws IOException {
647         oserver.postFlush(ctx);
648       }
649     });
650   }
651 
652   /**
653    * Invoked after a memstore flush
654    * @throws IOException
655    */
656   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
657     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
658       @Override
659       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
660           throws IOException {
661         oserver.postFlush(ctx, store, storeFile);
662       }
663     });
664   }
665 
666   /**
667    * Invoked just before a split
668    * @throws IOException
669    */
670   // TODO: Deprecate this
671   public void preSplit() throws IOException {
672     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
673       @Override
674       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
675           throws IOException {
676         oserver.preSplit(ctx);
677       }
678     });
679   }
680 
681   /**
682    * Invoked just before a split
683    * @throws IOException
684    */
685   public void preSplit(final byte[] splitRow) throws IOException {
686     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
687       @Override
688       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
689           throws IOException {
690         oserver.preSplit(ctx, splitRow);
691       }
692     });
693   }
694 
695   /**
696    * Invoked just after a split
697    * @param l the new left-hand daughter region
698    * @param r the new right-hand daughter region
699    * @throws IOException
700    */
701   public void postSplit(final HRegion l, final HRegion r) throws IOException {
702     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
703       @Override
704       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
705           throws IOException {
706         oserver.postSplit(ctx, l, r);
707       }
708     });
709   }
710 
711   public boolean preSplitBeforePONR(final byte[] splitKey,
712       final List<Mutation> metaEntries) throws IOException {
713     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
714       @Override
715       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
716           throws IOException {
717         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
718       }
719     });
720   }
721 
722   public void preSplitAfterPONR() throws IOException {
723     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
724       @Override
725       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
726           throws IOException {
727         oserver.preSplitAfterPONR(ctx);
728       }
729     });
730   }
731 
732   /**
733    * Invoked just before the rollback of a failed split is started
734    * @throws IOException
735    */
736   public void preRollBackSplit() throws IOException {
737     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
738       @Override
739       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
740           throws IOException {
741         oserver.preRollBackSplit(ctx);
742       }
743     });
744   }
745 
746   /**
747    * Invoked just after the rollback of a failed split is done
748    * @throws IOException
749    */
750   public void postRollBackSplit() throws IOException {
751     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
752       @Override
753       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
754           throws IOException {
755         oserver.postRollBackSplit(ctx);
756       }
757     });
758   }
759 
760   /**
761    * Invoked after a split is completed irrespective of a failure or success.
762    * @throws IOException
763    */
764   public void postCompleteSplit() throws IOException {
765     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
766       @Override
767       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
768           throws IOException {
769         oserver.postCompleteSplit(ctx);
770       }
771     });
772   }
773 
774   // RegionObserver support
775 
776   /**
777    * @param row the row key
778    * @param family the family
779    * @param result the result set from the region
780    * @return true if default processing should be bypassed
781    * @exception IOException Exception
782    */
783   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
784       final Result result) throws IOException {
785     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
786       @Override
787       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
788           throws IOException {
789         oserver.preGetClosestRowBefore(ctx, row, family, result);
790       }
791     });
792   }
793 
794   /**
795    * @param row the row key
796    * @param family the family
797    * @param result the result set from the region
798    * @exception IOException Exception
799    */
800   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
801       final Result result) throws IOException {
802     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
803       @Override
804       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
805           throws IOException {
806         oserver.postGetClosestRowBefore(ctx, row, family, result);
807       }
808     });
809   }
810 
811   /**
812    * @param get the Get request
813    * @return true if default processing should be bypassed
814    * @exception IOException Exception
815    */
816   public boolean preGet(final Get get, final List<Cell> results)
817       throws IOException {
818     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
819       @Override
820       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
821           throws IOException {
822         oserver.preGetOp(ctx, get, results);
823       }
824     });
825   }
826 
827   /**
828    * @param get the Get request
829    * @param results the result sett
830    * @exception IOException Exception
831    */
832   public void postGet(final Get get, final List<Cell> results)
833       throws IOException {
834     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
835       @Override
836       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
837           throws IOException {
838         oserver.postGetOp(ctx, get, results);
839       }
840     });
841   }
842 
843   /**
844    * @param get the Get request
845    * @return true or false to return to client if bypassing normal operation,
846    * or null otherwise
847    * @exception IOException Exception
848    */
849   public Boolean preExists(final Get get) throws IOException {
850     return execOperationWithResult(true, false,
851         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
852       @Override
853       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
854           throws IOException {
855         setResult(oserver.preExists(ctx, get, getResult()));
856       }
857     });
858   }
859 
860   /**
861    * @param get the Get request
862    * @param exists the result returned by the region server
863    * @return the result to return to the client
864    * @exception IOException Exception
865    */
866   public boolean postExists(final Get get, boolean exists)
867       throws IOException {
868     return execOperationWithResult(exists,
869         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
870       @Override
871       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
872           throws IOException {
873         setResult(oserver.postExists(ctx, get, getResult()));
874       }
875     });
876   }
877 
878   /**
879    * @param put The Put object
880    * @param edit The WALEdit object.
881    * @param durability The durability used
882    * @return true if default processing should be bypassed
883    * @exception IOException Exception
884    */
885   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
886       throws IOException {
887     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
888       @Override
889       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
890           throws IOException {
891         oserver.prePut(ctx, put, edit, durability);
892       }
893     });
894   }
895 
896   /**
897    * @param mutation - the current mutation
898    * @param kv - the current cell
899    * @param byteNow - current timestamp in bytes
900    * @param get - the get that could be used
901    * Note that the get only does not specify the family and qualifier that should be used
902    * @return true if default processing should be bypassed
903    * @exception IOException
904    *              Exception
905    */
906   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
907       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
908     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
909       @Override
910       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
911           throws IOException {
912         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
913       }
914     });
915   }
916 
917   /**
918    * @param put The Put object
919    * @param edit The WALEdit object.
920    * @param durability The durability used
921    * @exception IOException Exception
922    */
923   public void postPut(final Put put, final WALEdit edit, final Durability durability)
924       throws IOException {
925     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
926       @Override
927       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
928           throws IOException {
929         oserver.postPut(ctx, put, edit, durability);
930       }
931     });
932   }
933 
934   /**
935    * @param delete The Delete object
936    * @param edit The WALEdit object.
937    * @param durability The durability used
938    * @return true if default processing should be bypassed
939    * @exception IOException Exception
940    */
941   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
942       throws IOException {
943     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
944       @Override
945       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
946           throws IOException {
947         oserver.preDelete(ctx, delete, edit, durability);
948       }
949     });
950   }
951 
952   /**
953    * @param delete The Delete object
954    * @param edit The WALEdit object.
955    * @param durability The durability used
956    * @exception IOException Exception
957    */
958   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
959       throws IOException {
960     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
961       @Override
962       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
963           throws IOException {
964         oserver.postDelete(ctx, delete, edit, durability);
965       }
966     });
967   }
968 
969   /**
970    * @param miniBatchOp
971    * @return true if default processing should be bypassed
972    * @throws IOException
973    */
974   public boolean preBatchMutate(
975       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
976     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
977       @Override
978       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
979           throws IOException {
980         oserver.preBatchMutate(ctx, miniBatchOp);
981       }
982     });
983   }
984 
985   /**
986    * @param miniBatchOp
987    * @throws IOException
988    */
989   public void postBatchMutate(
990       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
991     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
992       @Override
993       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
994           throws IOException {
995         oserver.postBatchMutate(ctx, miniBatchOp);
996       }
997     });
998   }
999 
1000   public void postBatchMutateIndispensably(
1001       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1002       throws IOException {
1003     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1004       @Override
1005       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1006           throws IOException {
1007         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1008       }
1009     });
1010   }
1011 
1012   /**
1013    * @param row row to check
1014    * @param family column family
1015    * @param qualifier column qualifier
1016    * @param compareOp the comparison operation
1017    * @param comparator the comparator
1018    * @param put data to put if check succeeds
1019    * @return true or false to return to client if default processing should
1020    * be bypassed, or null otherwise
1021    * @throws IOException e
1022    */
1023   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1024       final byte [] qualifier, final CompareOp compareOp,
1025       final ByteArrayComparable comparator, final Put put)
1026       throws IOException {
1027     return execOperationWithResult(true, false,
1028         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1029       @Override
1030       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1031           throws IOException {
1032         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1033           compareOp, comparator, put, getResult()));
1034       }
1035     });
1036   }
1037 
1038   /**
1039    * @param row row to check
1040    * @param family column family
1041    * @param qualifier column qualifier
1042    * @param compareOp the comparison operation
1043    * @param comparator the comparator
1044    * @param put data to put if check succeeds
1045    * @return true or false to return to client if default processing should
1046    * be bypassed, or null otherwise
1047    * @throws IOException e
1048    */
1049   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1050       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1051       final Put put) throws IOException {
1052     return execOperationWithResult(true, false,
1053         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1054       @Override
1055       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1056           throws IOException {
1057         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1058           compareOp, comparator, put, getResult()));
1059       }
1060     });
1061   }
1062 
1063   /**
1064    * @param row row to check
1065    * @param family column family
1066    * @param qualifier column qualifier
1067    * @param compareOp the comparison operation
1068    * @param comparator the comparator
1069    * @param put data to put if check succeeds
1070    * @throws IOException e
1071    */
1072   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1073       final byte [] qualifier, final CompareOp compareOp,
1074       final ByteArrayComparable comparator, final Put put,
1075       boolean result) throws IOException {
1076     return execOperationWithResult(result,
1077         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1078       @Override
1079       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1080           throws IOException {
1081         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1082           compareOp, comparator, put, getResult()));
1083       }
1084     });
1085   }
1086 
1087   /**
1088    * @param row row to check
1089    * @param family column family
1090    * @param qualifier column qualifier
1091    * @param compareOp the comparison operation
1092    * @param comparator the comparator
1093    * @param delete delete to commit if check succeeds
1094    * @return true or false to return to client if default processing should
1095    * be bypassed, or null otherwise
1096    * @throws IOException e
1097    */
1098   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1099       final byte [] qualifier, final CompareOp compareOp,
1100       final ByteArrayComparable comparator, final Delete delete)
1101       throws IOException {
1102     return execOperationWithResult(true, false,
1103         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1104       @Override
1105       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1106           throws IOException {
1107         setResult(oserver.preCheckAndDelete(ctx, row, family,
1108             qualifier, compareOp, comparator, delete, getResult()));
1109       }
1110     });
1111   }
1112 
1113   /**
1114    * @param row row to check
1115    * @param family column family
1116    * @param qualifier column qualifier
1117    * @param compareOp the comparison operation
1118    * @param comparator the comparator
1119    * @param delete delete to commit if check succeeds
1120    * @return true or false to return to client if default processing should
1121    * be bypassed, or null otherwise
1122    * @throws IOException e
1123    */
1124   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1125       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1126       final Delete delete) throws IOException {
1127     return execOperationWithResult(true, false,
1128         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1129       @Override
1130       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1131           throws IOException {
1132         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1133               family, qualifier, compareOp, comparator, delete, getResult()));
1134       }
1135     });
1136   }
1137 
1138   /**
1139    * @param row row to check
1140    * @param family column family
1141    * @param qualifier column qualifier
1142    * @param compareOp the comparison operation
1143    * @param comparator the comparator
1144    * @param delete delete to commit if check succeeds
1145    * @throws IOException e
1146    */
1147   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1148       final byte [] qualifier, final CompareOp compareOp,
1149       final ByteArrayComparable comparator, final Delete delete,
1150       boolean result) throws IOException {
1151     return execOperationWithResult(result,
1152         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1153       @Override
1154       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1155           throws IOException {
1156         setResult(oserver.postCheckAndDelete(ctx, row, family,
1157             qualifier, compareOp, comparator, delete, getResult()));
1158       }
1159     });
1160   }
1161 
1162   /**
1163    * @param append append object
1164    * @return result to return to client if default operation should be
1165    * bypassed, null otherwise
1166    * @throws IOException if an error occurred on the coprocessor
1167    */
1168   public Result preAppend(final Append append) throws IOException {
1169     return execOperationWithResult(true, null,
1170         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1171       @Override
1172       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1173           throws IOException {
1174         setResult(oserver.preAppend(ctx, append));
1175       }
1176     });
1177   }
1178 
1179   /**
1180    * @param append append object
1181    * @return result to return to client if default operation should be
1182    * bypassed, null otherwise
1183    * @throws IOException if an error occurred on the coprocessor
1184    */
1185   public Result preAppendAfterRowLock(final Append append) throws IOException {
1186     return execOperationWithResult(true, null,
1187         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1188       @Override
1189       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1190           throws IOException {
1191         setResult(oserver.preAppendAfterRowLock(ctx, append));
1192       }
1193     });
1194   }
1195 
1196   /**
1197    * @param increment increment object
1198    * @return result to return to client if default operation should be
1199    * bypassed, null otherwise
1200    * @throws IOException if an error occurred on the coprocessor
1201    */
1202   public Result preIncrement(final Increment increment) throws IOException {
1203     return execOperationWithResult(true, null,
1204         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1205       @Override
1206       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1207           throws IOException {
1208         setResult(oserver.preIncrement(ctx, increment));
1209       }
1210     });
1211   }
1212 
1213   /**
1214    * @param increment increment object
1215    * @return result to return to client if default operation should be
1216    * bypassed, null otherwise
1217    * @throws IOException if an error occurred on the coprocessor
1218    */
1219   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1220     return execOperationWithResult(true, null,
1221         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1222       @Override
1223       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1224           throws IOException {
1225         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1226       }
1227     });
1228   }
1229 
1230   /**
1231    * @param append Append object
1232    * @param result the result returned by the append
1233    * @throws IOException if an error occurred on the coprocessor
1234    */
1235   public void postAppend(final Append append, final Result result) throws IOException {
1236     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1237       @Override
1238       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1239           throws IOException {
1240         oserver.postAppend(ctx, append, result);
1241       }
1242     });
1243   }
1244 
1245   /**
1246    * @param increment increment object
1247    * @param result the result returned by postIncrement
1248    * @throws IOException if an error occurred on the coprocessor
1249    */
1250   public Result postIncrement(final Increment increment, Result result) throws IOException {
1251     return execOperationWithResult(result,
1252         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1253       @Override
1254       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1255           throws IOException {
1256         setResult(oserver.postIncrement(ctx, increment, getResult()));
1257       }
1258     });
1259   }
1260 
1261   /**
1262    * @param scan the Scan specification
1263    * @return scanner id to return to client if default operation should be
1264    * bypassed, false otherwise
1265    * @exception IOException Exception
1266    */
1267   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1268     return execOperationWithResult(true, null,
1269         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1270       @Override
1271       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1272           throws IOException {
1273         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1274       }
1275     });
1276   }
1277 
1278   /**
1279    * See
1280    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1281    *    Store, Scan, NavigableSet, KeyValueScanner)}
1282    */
1283   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1284       final NavigableSet<byte[]> targetCols) throws IOException {
1285     return execOperationWithResult(null,
1286         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1287       @Override
1288       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1289           throws IOException {
1290         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1291       }
1292     });
1293   }
1294 
1295   /**
1296    * @param scan the Scan specification
1297    * @param s the scanner
1298    * @return the scanner instance to use
1299    * @exception IOException Exception
1300    */
1301   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1302     return execOperationWithResult(s,
1303         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1304       @Override
1305       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1306           throws IOException {
1307         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1308       }
1309     });
1310   }
1311 
1312   /**
1313    * @param s the scanner
1314    * @param results the result set returned by the region server
1315    * @param limit the maximum number of results to return
1316    * @return 'has next' indication to client if bypassing default behavior, or
1317    * null otherwise
1318    * @exception IOException Exception
1319    */
1320   public Boolean preScannerNext(final InternalScanner s,
1321       final List<Result> results, final int limit) throws IOException {
1322     return execOperationWithResult(true, false,
1323         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1324       @Override
1325       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1326           throws IOException {
1327         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1328       }
1329     });
1330   }
1331 
1332   /**
1333    * @param s the scanner
1334    * @param results the result set returned by the region server
1335    * @param limit the maximum number of results to return
1336    * @param hasMore
1337    * @return 'has more' indication to give to client
1338    * @exception IOException Exception
1339    */
1340   public boolean postScannerNext(final InternalScanner s,
1341       final List<Result> results, final int limit, boolean hasMore)
1342       throws IOException {
1343     return execOperationWithResult(hasMore,
1344         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1345       @Override
1346       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1347           throws IOException {
1348         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1349       }
1350     });
1351   }
1352 
1353   /**
1354    * This will be called by the scan flow when the current scanned row is being filtered out by the
1355    * filter.
1356    * @param s the scanner
1357    * @param currentRow The current rowkey which got filtered out
1358    * @param offset offset to rowkey
1359    * @param length length of rowkey
1360    * @return whether more rows are available for the scanner or not
1361    * @throws IOException
1362    */
1363   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1364       final int offset, final short length) throws IOException {
1365     return execOperationWithResult(true,
1366         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1367       @Override
1368       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1369           throws IOException {
1370         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1371       }
1372     });
1373   }
1374 
1375   /**
1376    * @param s the scanner
1377    * @return true if default behavior should be bypassed, false otherwise
1378    * @exception IOException Exception
1379    */
1380   public boolean preScannerClose(final InternalScanner s) throws IOException {
1381     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1382       @Override
1383       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1384           throws IOException {
1385         oserver.preScannerClose(ctx, s);
1386       }
1387     });
1388   }
1389 
1390   /**
1391    * @exception IOException Exception
1392    */
1393   public void postScannerClose(final InternalScanner s) throws IOException {
1394     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1395       @Override
1396       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1397           throws IOException {
1398         oserver.postScannerClose(ctx, s);
1399       }
1400     });
1401   }
1402 
1403   /**
1404    * @param info
1405    * @param logKey
1406    * @param logEdit
1407    * @return true if default behavior should be bypassed, false otherwise
1408    * @throws IOException
1409    */
1410   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1411       final WALEdit logEdit) throws IOException {
1412     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1413       @Override
1414       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1415           throws IOException {
1416         // Once we don't need to support the legacy call, replace RegionOperation with a version
1417         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1418         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1419         if (env.useLegacyPre) {
1420           if (logKey instanceof HLogKey) {
1421             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1422           } else {
1423             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1424           }
1425         } else {
1426           oserver.preWALRestore(ctx, info, logKey, logEdit);
1427         }
1428       }
1429     });
1430   }
1431 
1432   /**
1433    * @return true if default behavior should be bypassed, false otherwise
1434    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1435    */
1436   @Deprecated
1437   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1438       final WALEdit logEdit) throws IOException {
1439     return preWALRestore(info, (WALKey)logKey, logEdit);
1440   }
1441 
1442   /**
1443    * @param info
1444    * @param logKey
1445    * @param logEdit
1446    * @throws IOException
1447    */
1448   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1449       throws IOException {
1450     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1451       @Override
1452       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1453           throws IOException {
1454         // Once we don't need to support the legacy call, replace RegionOperation with a version
1455         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1456         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1457         if (env.useLegacyPost) {
1458           if (logKey instanceof HLogKey) {
1459             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1460           } else {
1461             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1462           }
1463         } else {
1464           oserver.postWALRestore(ctx, info, logKey, logEdit);
1465         }
1466       }
1467     });
1468   }
1469 
1470   /**
1471    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1472    */
1473   @Deprecated
1474   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1475       throws IOException {
1476     postWALRestore(info, (WALKey)logKey, logEdit);
1477   }
1478 
1479   /**
1480    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1481    * @return true if the default operation should be bypassed
1482    * @throws IOException
1483    */
1484   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1485     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1486       @Override
1487       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1488           throws IOException {
1489         oserver.preBulkLoadHFile(ctx, familyPaths);
1490       }
1491     });
1492   }
1493 
1494   /**
1495    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1496    * @param hasLoaded whether load was successful or not
1497    * @return the possibly modified value of hasLoaded
1498    * @throws IOException
1499    */
1500   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1501       boolean hasLoaded) throws IOException {
1502     return execOperationWithResult(hasLoaded,
1503         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1504       @Override
1505       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1506           throws IOException {
1507         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1508       }
1509     });
1510   }
1511 
1512   public void postStartRegionOperation(final Operation op) throws IOException {
1513     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1514       @Override
1515       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1516           throws IOException {
1517         oserver.postStartRegionOperation(ctx, op);
1518       }
1519     });
1520   }
1521 
1522   public void postCloseRegionOperation(final Operation op) throws IOException {
1523     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1524       @Override
1525       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1526           throws IOException {
1527         oserver.postCloseRegionOperation(ctx, op);
1528       }
1529     });
1530   }
1531 
1532   /**
1533    * @param fs fileystem to read from
1534    * @param p path to the file
1535    * @param in {@link FSDataInputStreamWrapper}
1536    * @param size Full size of the file
1537    * @param cacheConf
1538    * @param r original reference file. This will be not null only when reading a split file.
1539    * @return a Reader instance to use instead of the base reader if overriding
1540    * default behavior, null otherwise
1541    * @throws IOException
1542    */
1543   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1544       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1545       final Reference r) throws IOException {
1546     return execOperationWithResult(null,
1547         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1548       @Override
1549       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1550           throws IOException {
1551         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1552       }
1553     });
1554   }
1555 
1556   /**
1557    * @param fs fileystem to read from
1558    * @param p path to the file
1559    * @param in {@link FSDataInputStreamWrapper}
1560    * @param size Full size of the file
1561    * @param cacheConf
1562    * @param r original reference file. This will be not null only when reading a split file.
1563    * @param reader the base reader instance
1564    * @return The reader to use
1565    * @throws IOException
1566    */
1567   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1568       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1569       final Reference r, final StoreFile.Reader reader) throws IOException {
1570     return execOperationWithResult(reader,
1571         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1572       @Override
1573       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1574           throws IOException {
1575         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1576       }
1577     });
1578   }
1579 
1580   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1581       final Cell oldCell, Cell newCell) throws IOException {
1582     return execOperationWithResult(newCell,
1583         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1584       @Override
1585       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1586           throws IOException {
1587         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1588       }
1589     });
1590   }
1591 
1592   public Message preEndpointInvocation(final Service service, final String methodName,
1593       Message request) throws IOException {
1594     return execOperationWithResult(request,
1595         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1596       @Override
1597       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1598           throws IOException {
1599         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1600       }
1601     });
1602   }
1603 
1604   public void postEndpointInvocation(final Service service, final String methodName,
1605       final Message request, final Message.Builder responseBuilder) throws IOException {
1606     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1607       @Override
1608       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1609           throws IOException {
1610         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1611       }
1612     });
1613   }
1614 
1615   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1616     return execOperationWithResult(tracker,
1617         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1618       @Override
1619       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1620           throws IOException {
1621         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1622       }
1623     });
1624   }
1625 
1626   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1627     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1628     for (RegionEnvironment env : coprocessors) {
1629       DescriptiveStatistics ds = new DescriptiveStatistics();
1630       if (env.getInstance() instanceof RegionObserver) {
1631         for (Long time : env.getExecutionLatenciesNanos()) {
1632           ds.addValue(time);
1633         }
1634         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1635         if (ds.getN() == 0) {
1636           ds.addValue(0);
1637         }
1638         results.put(env.getInstance().getClass().getSimpleName(), ds);
1639       }
1640     }
1641     return results;
1642   }
1643 
1644   private static abstract class CoprocessorOperation
1645       extends ObserverContext<RegionCoprocessorEnvironment> {
1646     public abstract void call(Coprocessor observer,
1647         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1648     public abstract boolean hasCall(Coprocessor observer);
1649     public void postEnvCall(RegionEnvironment env) { }
1650   }
1651 
1652   private static abstract class RegionOperation extends CoprocessorOperation {
1653     public abstract void call(RegionObserver observer,
1654         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1655 
1656     public boolean hasCall(Coprocessor observer) {
1657       return observer instanceof RegionObserver;
1658     }
1659 
1660     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1661         throws IOException {
1662       call((RegionObserver)observer, ctx);
1663     }
1664   }
1665 
1666   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1667     private T result = null;
1668     public void setResult(final T result) { this.result = result; }
1669     public T getResult() { return this.result; }
1670   }
1671 
1672   private static abstract class EndpointOperation extends CoprocessorOperation {
1673     public abstract void call(EndpointObserver observer,
1674         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1675 
1676     public boolean hasCall(Coprocessor observer) {
1677       return observer instanceof EndpointObserver;
1678     }
1679 
1680     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1681         throws IOException {
1682       call((EndpointObserver)observer, ctx);
1683     }
1684   }
1685 
1686   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1687     private T result = null;
1688     public void setResult(final T result) { this.result = result; }
1689     public T getResult() { return this.result; }
1690   }
1691 
1692   private boolean execOperation(final CoprocessorOperation ctx)
1693       throws IOException {
1694     return execOperation(true, ctx);
1695   }
1696 
1697   private <T> T execOperationWithResult(final T defaultValue,
1698       final RegionOperationWithResult<T> ctx) throws IOException {
1699     if (ctx == null) return defaultValue;
1700     ctx.setResult(defaultValue);
1701     execOperation(true, ctx);
1702     return ctx.getResult();
1703   }
1704 
1705   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1706       final RegionOperationWithResult<T> ctx) throws IOException {
1707     boolean bypass = false;
1708     T result = defaultValue;
1709     if (ctx != null) {
1710       ctx.setResult(defaultValue);
1711       bypass = execOperation(true, ctx);
1712       result = ctx.getResult();
1713     }
1714     return bypass == ifBypass ? result : null;
1715   }
1716 
1717   private <T> T execOperationWithResult(final T defaultValue,
1718       final EndpointOperationWithResult<T> ctx) throws IOException {
1719     if (ctx == null) return defaultValue;
1720     ctx.setResult(defaultValue);
1721     execOperation(true, ctx);
1722     return ctx.getResult();
1723   }
1724 
1725   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1726       throws IOException {
1727     boolean bypass = false;
1728     for (RegionEnvironment env: coprocessors) {
1729       Coprocessor observer = env.getInstance();
1730       if (ctx.hasCall(observer)) {
1731         long startTime = System.nanoTime();
1732         ctx.prepare(env);
1733         Thread currentThread = Thread.currentThread();
1734         ClassLoader cl = currentThread.getContextClassLoader();
1735         try {
1736           currentThread.setContextClassLoader(env.getClassLoader());
1737           ctx.call(observer, ctx);
1738         } catch (Throwable e) {
1739           handleCoprocessorThrowable(env, e);
1740         } finally {
1741           currentThread.setContextClassLoader(cl);
1742         }
1743         env.offerExecutionLatency(System.nanoTime() - startTime);
1744         bypass |= ctx.shouldBypass();
1745         if (earlyExit && ctx.shouldComplete()) {
1746           break;
1747         }
1748       }
1749 
1750       ctx.postEnvCall(env);
1751     }
1752     return bypass;
1753   }
1754 }