View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.UUID;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.regex.Matcher;
35  
36  import com.google.common.collect.ImmutableList;
37  import com.google.common.collect.Lists;
38  import com.google.protobuf.Message;
39  import com.google.protobuf.Service;
40  
41  import org.apache.commons.collections.map.AbstractReferenceMap;
42  import org.apache.commons.collections.map.ReferenceMap;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.Coprocessor;
53  import org.apache.hadoop.hbase.CoprocessorEnvironment;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.client.Append;
60  import org.apache.hadoop.hbase.client.Delete;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.client.Increment;
64  import org.apache.hadoop.hbase.client.Mutation;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
69  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
70  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
71  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
72  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
74  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
75  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
76  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
77  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
78  import org.apache.hadoop.hbase.io.Reference;
79  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
80  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
81  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
82  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
85  import org.apache.hadoop.hbase.util.Bytes;
86  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
87  import org.apache.hadoop.hbase.util.Pair;
88  
89  /**
90   * Implements the coprocessor environment and runtime support for coprocessors
91   * loaded within a {@link HRegion}.
92   */
93  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
94  @InterfaceStability.Evolving
95  public class RegionCoprocessorHost
96      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
97  
98    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
99    // The shared data map
100   private static ReferenceMap sharedDataMap =
101       new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
102 
103   /**
104    * Encapsulation of the environment of each coprocessor
105    */
106   static class RegionEnvironment extends CoprocessorHost.Environment
107       implements RegionCoprocessorEnvironment {
108 
109     private HRegion region;
110     private RegionServerServices rsServices;
111     ConcurrentMap<String, Object> sharedData;
112     private static final int LATENCY_BUFFER_SIZE = 100;
113     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
114         LATENCY_BUFFER_SIZE);
115     private final boolean useLegacyPre;
116     private final boolean useLegacyPost;
117 
118     /**
119      * Constructor
120      * @param impl the coprocessor instance
121      * @param priority chaining priority
122      */
123     public RegionEnvironment(final Coprocessor impl, final int priority,
124         final int seq, final Configuration conf, final HRegion region,
125         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
126       super(impl, priority, seq, conf);
127       this.region = region;
128       this.rsServices = services;
129       this.sharedData = sharedData;
130       // Pick which version of the WAL related events we'll call.
131       // This way we avoid calling the new version on older RegionObservers so
132       // we can maintain binary compatibility.
133       // See notes in javadoc for RegionObserver
134       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
135           HRegionInfo.class, WALKey.class, WALEdit.class);
136       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
137           HRegionInfo.class, WALKey.class, WALEdit.class);
138     }
139 
140     /** @return the region */
141     @Override
142     public HRegion getRegion() {
143       return region;
144     }
145 
146     /** @return reference to the region server services */
147     @Override
148     public RegionServerServices getRegionServerServices() {
149       return rsServices;
150     }
151 
152     public void shutdown() {
153       super.shutdown();
154     }
155 
156     @Override
157     public ConcurrentMap<String, Object> getSharedData() {
158       return sharedData;
159     }
160 
161     public void offerExecutionLatency(long latencyNanos) {
162       coprocessorTimeNanos.offer(latencyNanos);
163     }
164 
165     public Collection<Long> getExecutionLatenciesNanos() {
166       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
167       coprocessorTimeNanos.drainTo(latencies);
168       return latencies;
169     }
170 
171   }
172 
173   static class TableCoprocessorAttribute {
174     private Path path;
175     private String className;
176     private int priority;
177     private Configuration conf;
178 
179     public TableCoprocessorAttribute(Path path, String className, int priority,
180         Configuration conf) {
181       this.path = path;
182       this.className = className;
183       this.priority = priority;
184       this.conf = conf;
185     }
186 
187     public Path getPath() {
188       return path;
189     }
190 
191     public String getClassName() {
192       return className;
193     }
194 
195     public int getPriority() {
196       return priority;
197     }
198 
199     public Configuration getConf() {
200       return conf;
201     }
202   }
203 
204   /** The region server services */
205   RegionServerServices rsServices;
206   /** The region */
207   HRegion region;
208 
209   /**
210    * Constructor
211    * @param region the region
212    * @param rsServices interface to available region server functionality
213    * @param conf the configuration
214    */
215   public RegionCoprocessorHost(final HRegion region,
216       final RegionServerServices rsServices, final Configuration conf) {
217     super(rsServices);
218     this.conf = conf;
219     this.rsServices = rsServices;
220     this.region = region;
221     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
222 
223     // load system default cp's from configuration.
224     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
225 
226     // load system default cp's for user tables from configuration.
227     if (!region.getRegionInfo().getTable().isSystemTable()) {
228       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
229     }
230 
231     // load Coprocessor From HDFS
232     loadTableCoprocessors(conf);
233   }
234 
235   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
236       HTableDescriptor htd) {
237     List<TableCoprocessorAttribute> result = Lists.newArrayList();
238     for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) {
239       String key = Bytes.toString(e.getKey().get()).trim();
240       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
241         String spec = Bytes.toString(e.getValue().get()).trim();
242         // found one
243         try {
244           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
245           if (matcher.matches()) {
246             // jar file path can be empty if the cp class can be loaded
247             // from class loader.
248             Path path = matcher.group(1).trim().isEmpty() ?
249                 null : new Path(matcher.group(1).trim());
250             String className = matcher.group(2).trim();
251             if (className.isEmpty()) {
252               LOG.error("Malformed table coprocessor specification: key=" +
253                 key + ", spec: " + spec);
254               continue;
255             }
256             int priority = matcher.group(3).trim().isEmpty() ?
257                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
258             String cfgSpec = null;
259             try {
260               cfgSpec = matcher.group(4);
261             } catch (IndexOutOfBoundsException ex) {
262               // ignore
263             }
264             Configuration ourConf;
265             if (cfgSpec != null) {
266               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
267               // do an explicit deep copy of the passed configuration
268               ourConf = new Configuration(false);
269               HBaseConfiguration.merge(ourConf, conf);
270               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
271               while (m.find()) {
272                 ourConf.set(m.group(1), m.group(2));
273               }
274             } else {
275               ourConf = conf;
276             }
277             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
278           } else {
279             LOG.error("Malformed table coprocessor specification: key=" + key +
280               ", spec: " + spec);
281           }
282         } catch (Exception ioe) {
283           LOG.error("Malformed table coprocessor specification: key=" + key +
284             ", spec: " + spec);
285         }
286       }
287     }
288     return result;
289   }
290 
291   /**
292    * Sanity check the table coprocessor attributes of the supplied schema. Will
293    * throw an exception if there is a problem.
294    * @param conf
295    * @param htd
296    * @throws IOException
297    */
298   public static void testTableCoprocessorAttrs(final Configuration conf,
299       final HTableDescriptor htd) throws IOException {
300     String pathPrefix = UUID.randomUUID().toString();
301     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
302       if (attr.getPriority() < 0) {
303         throw new IOException("Priority for coprocessor " + attr.getClassName() +
304           " cannot be less than 0");
305       }
306       ClassLoader old = Thread.currentThread().getContextClassLoader();
307       try {
308         ClassLoader cl;
309         if (attr.getPath() != null) {
310           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
311             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
312         } else {
313           cl = CoprocessorHost.class.getClassLoader();
314         }
315         Thread.currentThread().setContextClassLoader(cl);
316         cl.loadClass(attr.getClassName());
317       } catch (ClassNotFoundException e) {
318         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
319       } finally {
320         Thread.currentThread().setContextClassLoader(old);
321       }
322     }
323   }
324 
325   void loadTableCoprocessors(final Configuration conf) {
326     // scan the table attributes for coprocessor load specifications
327     // initialize the coprocessors
328     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
329     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
330         region.getTableDesc())) {
331       // Load encompasses classloading and coprocessor initialization
332       try {
333         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
334           attr.getConf());
335         configured.add(env);
336         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
337             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
338       } catch (Throwable t) {
339         // Coprocessor failed to load, do we abort on error?
340         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
341           abortServer(attr.getClassName(), t);
342         } else {
343           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
344         }
345       }
346     }
347     // add together to coprocessor set for COW efficiency
348     coprocessors.addAll(configured);
349   }
350 
351   @Override
352   public RegionEnvironment createEnvironment(Class<?> implClass,
353       Coprocessor instance, int priority, int seq, Configuration conf) {
354     // Check if it's an Endpoint.
355     // Due to current dynamic protocol design, Endpoint
356     // uses a different way to be registered and executed.
357     // It uses a visitor pattern to invoke registered Endpoint
358     // method.
359     for (Class<?> c : implClass.getInterfaces()) {
360       if (CoprocessorService.class.isAssignableFrom(c)) {
361         region.registerService( ((CoprocessorService)instance).getService() );
362       }
363     }
364     ConcurrentMap<String, Object> classData;
365     // make sure only one thread can add maps
366     synchronized (sharedDataMap) {
367       // as long as at least one RegionEnvironment holds on to its classData it will
368       // remain in this map
369       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
370       if (classData == null) {
371         classData = new ConcurrentHashMap<String, Object>();
372         sharedDataMap.put(implClass.getName(), classData);
373       }
374     }
375     return new RegionEnvironment(instance, priority, seq, conf, region,
376         rsServices, classData);
377   }
378 
379   /**
380    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
381    *
382    * For example, {@link
383    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
384    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
385    *
386    * See also
387    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
388    *    CoprocessorEnvironment, Throwable)}
389    * @param env The coprocessor that threw the exception.
390    * @param e The exception that was thrown.
391    */
392   private void handleCoprocessorThrowableNoRethrow(
393       final CoprocessorEnvironment env, final Throwable e) {
394     try {
395       handleCoprocessorThrowable(env,e);
396     } catch (IOException ioe) {
397       // We cannot throw exceptions from the caller hook, so ignore.
398       LOG.warn(
399         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
400         e + ". Ignoring.",e);
401     }
402   }
403 
404   /**
405    * Invoked before a region open.
406    *
407    * @throws IOException Signals that an I/O exception has occurred.
408    */
409   public void preOpen() throws IOException {
410     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
411       @Override
412       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
413           throws IOException {
414         oserver.preOpen(ctx);
415       }
416     });
417   }
418 
419   /**
420    * Invoked after a region open
421    */
422   public void postOpen() {
423     try {
424       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
425         @Override
426         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
427             throws IOException {
428           oserver.postOpen(ctx);
429         }
430       });
431     } catch (IOException e) {
432       LOG.warn(e);
433     }
434   }
435 
436   /**
437    * Invoked after log replay on region
438    */
439   public void postLogReplay() {
440     try {
441       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
442         @Override
443         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
444             throws IOException {
445           oserver.postLogReplay(ctx);
446         }
447       });
448     } catch (IOException e) {
449       LOG.warn(e);
450     }
451   }
452 
453   /**
454    * Invoked before a region is closed
455    * @param abortRequested true if the server is aborting
456    */
457   public void preClose(final boolean abortRequested) throws IOException {
458     execOperation(false, new RegionOperation() {
459       @Override
460       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
461           throws IOException {
462         oserver.preClose(ctx, abortRequested);
463       }
464     });
465   }
466 
467   /**
468    * Invoked after a region is closed
469    * @param abortRequested true if the server is aborting
470    */
471   public void postClose(final boolean abortRequested) {
472     try {
473       execOperation(false, new RegionOperation() {
474         @Override
475         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
476             throws IOException {
477           oserver.postClose(ctx, abortRequested);
478         }
479         public void postEnvCall(RegionEnvironment env) {
480           shutdown(env);
481         }
482       });
483     } catch (IOException e) {
484       LOG.warn(e);
485     }
486   }
487 
488   /**
489    * See
490    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
491    */
492   public InternalScanner preCompactScannerOpen(final Store store,
493       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
494       final CompactionRequest request) throws IOException {
495     return execOperationWithResult(null,
496         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
497       @Override
498       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
499           throws IOException {
500         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
501           earliestPutTs, getResult(), request));
502       }
503     });
504   }
505 
506   /**
507    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
508    * available candidates.
509    * @param store The store where compaction is being requested
510    * @param candidates The currently available store files
511    * @param request custom compaction request
512    * @return If {@code true}, skip the normal selection process and use the current list
513    * @throws IOException
514    */
515   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
516       final CompactionRequest request) throws IOException {
517     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
518       @Override
519       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
520           throws IOException {
521         oserver.preCompactSelection(ctx, store, candidates, request);
522       }
523     });
524   }
525 
526   /**
527    * Called after the {@link StoreFile}s to be compacted have been selected from the available
528    * candidates.
529    * @param store The store where compaction is being requested
530    * @param selected The store files selected to compact
531    * @param request custom compaction
532    */
533   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
534       final CompactionRequest request) {
535     try {
536       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
537         @Override
538         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
539             throws IOException {
540           oserver.postCompactSelection(ctx, store, selected, request);
541         }
542       });
543     } catch (IOException e) {
544       LOG.warn(e);
545     }
546   }
547 
548   /**
549    * Called prior to rewriting the store files selected for compaction
550    * @param store the store being compacted
551    * @param scanner the scanner used to read store data during compaction
552    * @param scanType type of Scan
553    * @param request the compaction that will be executed
554    * @throws IOException
555    */
556   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
557       final ScanType scanType, final CompactionRequest request) throws IOException {
558     return execOperationWithResult(false, scanner,
559         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
560       @Override
561       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
562           throws IOException {
563         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
564       }
565     });
566   }
567 
568   /**
569    * Called after the store compaction has completed.
570    * @param store the store being compacted
571    * @param resultFile the new store file written during compaction
572    * @param request the compaction that is being executed
573    * @throws IOException
574    */
575   public void postCompact(final Store store, final StoreFile resultFile,
576       final CompactionRequest request) throws IOException {
577     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
578       @Override
579       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
580           throws IOException {
581         oserver.postCompact(ctx, store, resultFile, request);
582       }
583     });
584   }
585 
586   /**
587    * Invoked before a memstore flush
588    * @throws IOException
589    */
590   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
591       throws IOException {
592     return execOperationWithResult(false, scanner,
593         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
594       @Override
595       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
596           throws IOException {
597         setResult(oserver.preFlush(ctx, store, getResult()));
598       }
599     });
600   }
601 
602   /**
603    * Invoked before a memstore flush
604    * @throws IOException
605    */
606   public void preFlush() throws IOException {
607     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
608       @Override
609       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
610           throws IOException {
611         oserver.preFlush(ctx);
612       }
613     });
614   }
615 
616   /**
617    * See
618    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
619    *    Store, KeyValueScanner, InternalScanner)}
620    */
621   public InternalScanner preFlushScannerOpen(final Store store,
622       final KeyValueScanner memstoreScanner) throws IOException {
623     return execOperationWithResult(null,
624         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
625       @Override
626       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
627           throws IOException {
628         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
629       }
630     });
631   }
632 
633   /**
634    * Invoked after a memstore flush
635    * @throws IOException
636    */
637   public void postFlush() throws IOException {
638     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
639       @Override
640       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
641           throws IOException {
642         oserver.postFlush(ctx);
643       }
644     });
645   }
646 
647   /**
648    * Invoked after a memstore flush
649    * @throws IOException
650    */
651   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
652     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
653       @Override
654       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
655           throws IOException {
656         oserver.postFlush(ctx, store, storeFile);
657       }
658     });
659   }
660 
661   /**
662    * Invoked just before a split
663    * @throws IOException
664    */
665   // TODO: Deprecate this
666   public void preSplit() throws IOException {
667     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
668       @Override
669       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
670           throws IOException {
671         oserver.preSplit(ctx);
672       }
673     });
674   }
675 
676   /**
677    * Invoked just before a split
678    * @throws IOException
679    */
680   public void preSplit(final byte[] splitRow) throws IOException {
681     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
682       @Override
683       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
684           throws IOException {
685         oserver.preSplit(ctx, splitRow);
686       }
687     });
688   }
689 
690   /**
691    * Invoked just after a split
692    * @param l the new left-hand daughter region
693    * @param r the new right-hand daughter region
694    * @throws IOException
695    */
696   public void postSplit(final HRegion l, final HRegion r) throws IOException {
697     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
698       @Override
699       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
700           throws IOException {
701         oserver.postSplit(ctx, l, r);
702       }
703     });
704   }
705 
706   public boolean preSplitBeforePONR(final byte[] splitKey,
707       final List<Mutation> metaEntries) throws IOException {
708     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
709       @Override
710       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
711           throws IOException {
712         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
713       }
714     });
715   }
716 
717   public void preSplitAfterPONR() throws IOException {
718     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
719       @Override
720       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
721           throws IOException {
722         oserver.preSplitAfterPONR(ctx);
723       }
724     });
725   }
726 
727   /**
728    * Invoked just before the rollback of a failed split is started
729    * @throws IOException
730    */
731   public void preRollBackSplit() throws IOException {
732     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
733       @Override
734       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
735           throws IOException {
736         oserver.preRollBackSplit(ctx);
737       }
738     });
739   }
740 
741   /**
742    * Invoked just after the rollback of a failed split is done
743    * @throws IOException
744    */
745   public void postRollBackSplit() throws IOException {
746     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
747       @Override
748       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
749           throws IOException {
750         oserver.postRollBackSplit(ctx);
751       }
752     });
753   }
754 
755   /**
756    * Invoked after a split is completed irrespective of a failure or success.
757    * @throws IOException
758    */
759   public void postCompleteSplit() throws IOException {
760     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
761       @Override
762       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
763           throws IOException {
764         oserver.postCompleteSplit(ctx);
765       }
766     });
767   }
768 
769   // RegionObserver support
770 
771   /**
772    * @param row the row key
773    * @param family the family
774    * @param result the result set from the region
775    * @return true if default processing should be bypassed
776    * @exception IOException Exception
777    */
778   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
779       final Result result) throws IOException {
780     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
781       @Override
782       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
783           throws IOException {
784         oserver.preGetClosestRowBefore(ctx, row, family, result);
785       }
786     });
787   }
788 
789   /**
790    * @param row the row key
791    * @param family the family
792    * @param result the result set from the region
793    * @exception IOException Exception
794    */
795   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
796       final Result result) throws IOException {
797     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
798       @Override
799       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
800           throws IOException {
801         oserver.postGetClosestRowBefore(ctx, row, family, result);
802       }
803     });
804   }
805 
806   /**
807    * @param get the Get request
808    * @return true if default processing should be bypassed
809    * @exception IOException Exception
810    */
811   public boolean preGet(final Get get, final List<Cell> results)
812       throws IOException {
813     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
814       @Override
815       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
816           throws IOException {
817         oserver.preGetOp(ctx, get, results);
818       }
819     });
820   }
821 
822   /**
823    * @param get the Get request
824    * @param results the result sett
825    * @exception IOException Exception
826    */
827   public void postGet(final Get get, final List<Cell> results)
828       throws IOException {
829     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
830       @Override
831       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
832           throws IOException {
833         oserver.postGetOp(ctx, get, results);
834       }
835     });
836   }
837 
838   /**
839    * @param get the Get request
840    * @return true or false to return to client if bypassing normal operation,
841    * or null otherwise
842    * @exception IOException Exception
843    */
844   public Boolean preExists(final Get get) throws IOException {
845     return execOperationWithResult(true, false,
846         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
847       @Override
848       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
849           throws IOException {
850         setResult(oserver.preExists(ctx, get, getResult()));
851       }
852     });
853   }
854 
855   /**
856    * @param get the Get request
857    * @param exists the result returned by the region server
858    * @return the result to return to the client
859    * @exception IOException Exception
860    */
861   public boolean postExists(final Get get, boolean exists)
862       throws IOException {
863     return execOperationWithResult(exists,
864         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
865       @Override
866       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
867           throws IOException {
868         setResult(oserver.postExists(ctx, get, getResult()));
869       }
870     });
871   }
872 
873   /**
874    * @param put The Put object
875    * @param edit The WALEdit object.
876    * @param durability The durability used
877    * @return true if default processing should be bypassed
878    * @exception IOException Exception
879    */
880   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
881       throws IOException {
882     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
883       @Override
884       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
885           throws IOException {
886         oserver.prePut(ctx, put, edit, durability);
887       }
888     });
889   }
890 
891   /**
892    * @param mutation - the current mutation
893    * @param kv - the current cell
894    * @param byteNow - current timestamp in bytes
895    * @param get - the get that could be used
896    * Note that the get only does not specify the family and qualifier that should be used
897    * @return true if default processing should be bypassed
898    * @exception IOException
899    *              Exception
900    */
901   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
902       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
903     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
904       @Override
905       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
906           throws IOException {
907         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
908       }
909     });
910   }
911 
912   /**
913    * @param put The Put object
914    * @param edit The WALEdit object.
915    * @param durability The durability used
916    * @exception IOException Exception
917    */
918   public void postPut(final Put put, final WALEdit edit, final Durability durability)
919       throws IOException {
920     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
921       @Override
922       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
923           throws IOException {
924         oserver.postPut(ctx, put, edit, durability);
925       }
926     });
927   }
928 
929   /**
930    * @param delete The Delete object
931    * @param edit The WALEdit object.
932    * @param durability The durability used
933    * @return true if default processing should be bypassed
934    * @exception IOException Exception
935    */
936   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
937       throws IOException {
938     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
939       @Override
940       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
941           throws IOException {
942         oserver.preDelete(ctx, delete, edit, durability);
943       }
944     });
945   }
946 
947   /**
948    * @param delete The Delete object
949    * @param edit The WALEdit object.
950    * @param durability The durability used
951    * @exception IOException Exception
952    */
953   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
954       throws IOException {
955     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
956       @Override
957       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
958           throws IOException {
959         oserver.postDelete(ctx, delete, edit, durability);
960       }
961     });
962   }
963 
964   /**
965    * @param miniBatchOp
966    * @return true if default processing should be bypassed
967    * @throws IOException
968    */
969   public boolean preBatchMutate(
970       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
971     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
972       @Override
973       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
974           throws IOException {
975         oserver.preBatchMutate(ctx, miniBatchOp);
976       }
977     });
978   }
979 
980   /**
981    * @param miniBatchOp
982    * @throws IOException
983    */
984   public void postBatchMutate(
985       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
986     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
987       @Override
988       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
989           throws IOException {
990         oserver.postBatchMutate(ctx, miniBatchOp);
991       }
992     });
993   }
994 
995   public void postBatchMutateIndispensably(
996       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
997       throws IOException {
998     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
999       @Override
1000       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1001           throws IOException {
1002         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1003       }
1004     });
1005   }
1006 
1007   /**
1008    * @param row row to check
1009    * @param family column family
1010    * @param qualifier column qualifier
1011    * @param compareOp the comparison operation
1012    * @param comparator the comparator
1013    * @param put data to put if check succeeds
1014    * @return true or false to return to client if default processing should
1015    * be bypassed, or null otherwise
1016    * @throws IOException e
1017    */
1018   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1019       final byte [] qualifier, final CompareOp compareOp,
1020       final ByteArrayComparable comparator, final Put put)
1021       throws IOException {
1022     return execOperationWithResult(true, false,
1023         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1024       @Override
1025       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1026           throws IOException {
1027         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1028           compareOp, comparator, put, getResult()));
1029       }
1030     });
1031   }
1032 
1033   /**
1034    * @param row row to check
1035    * @param family column family
1036    * @param qualifier column qualifier
1037    * @param compareOp the comparison operation
1038    * @param comparator the comparator
1039    * @param put data to put if check succeeds
1040    * @return true or false to return to client if default processing should
1041    * be bypassed, or null otherwise
1042    * @throws IOException e
1043    */
1044   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1045       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1046       final Put put) throws IOException {
1047     return execOperationWithResult(true, false,
1048         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1049       @Override
1050       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1051           throws IOException {
1052         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1053           compareOp, comparator, put, getResult()));
1054       }
1055     });
1056   }
1057 
1058   /**
1059    * @param row row to check
1060    * @param family column family
1061    * @param qualifier column qualifier
1062    * @param compareOp the comparison operation
1063    * @param comparator the comparator
1064    * @param put data to put if check succeeds
1065    * @throws IOException e
1066    */
1067   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1068       final byte [] qualifier, final CompareOp compareOp,
1069       final ByteArrayComparable comparator, final Put put,
1070       boolean result) throws IOException {
1071     return execOperationWithResult(result,
1072         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1073       @Override
1074       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1075           throws IOException {
1076         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1077           compareOp, comparator, put, getResult()));
1078       }
1079     });
1080   }
1081 
1082   /**
1083    * @param row row to check
1084    * @param family column family
1085    * @param qualifier column qualifier
1086    * @param compareOp the comparison operation
1087    * @param comparator the comparator
1088    * @param delete delete to commit if check succeeds
1089    * @return true or false to return to client if default processing should
1090    * be bypassed, or null otherwise
1091    * @throws IOException e
1092    */
1093   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1094       final byte [] qualifier, final CompareOp compareOp,
1095       final ByteArrayComparable comparator, final Delete delete)
1096       throws IOException {
1097     return execOperationWithResult(true, false,
1098         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1099       @Override
1100       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1101           throws IOException {
1102         setResult(oserver.preCheckAndDelete(ctx, row, family,
1103             qualifier, compareOp, comparator, delete, getResult()));
1104       }
1105     });
1106   }
1107 
1108   /**
1109    * @param row row to check
1110    * @param family column family
1111    * @param qualifier column qualifier
1112    * @param compareOp the comparison operation
1113    * @param comparator the comparator
1114    * @param delete delete to commit if check succeeds
1115    * @return true or false to return to client if default processing should
1116    * be bypassed, or null otherwise
1117    * @throws IOException e
1118    */
1119   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1120       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1121       final Delete delete) throws IOException {
1122     return execOperationWithResult(true, false,
1123         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1124       @Override
1125       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1126           throws IOException {
1127         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1128               family, qualifier, compareOp, comparator, delete, getResult()));
1129       }
1130     });
1131   }
1132 
1133   /**
1134    * @param row row to check
1135    * @param family column family
1136    * @param qualifier column qualifier
1137    * @param compareOp the comparison operation
1138    * @param comparator the comparator
1139    * @param delete delete to commit if check succeeds
1140    * @throws IOException e
1141    */
1142   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1143       final byte [] qualifier, final CompareOp compareOp,
1144       final ByteArrayComparable comparator, final Delete delete,
1145       boolean result) throws IOException {
1146     return execOperationWithResult(result,
1147         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1148       @Override
1149       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1150           throws IOException {
1151         setResult(oserver.postCheckAndDelete(ctx, row, family,
1152             qualifier, compareOp, comparator, delete, getResult()));
1153       }
1154     });
1155   }
1156 
1157   /**
1158    * @param append append object
1159    * @return result to return to client if default operation should be
1160    * bypassed, null otherwise
1161    * @throws IOException if an error occurred on the coprocessor
1162    */
1163   public Result preAppend(final Append append) throws IOException {
1164     return execOperationWithResult(true, null,
1165         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1166       @Override
1167       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1168           throws IOException {
1169         setResult(oserver.preAppend(ctx, append));
1170       }
1171     });
1172   }
1173 
1174   /**
1175    * @param append append object
1176    * @return result to return to client if default operation should be
1177    * bypassed, null otherwise
1178    * @throws IOException if an error occurred on the coprocessor
1179    */
1180   public Result preAppendAfterRowLock(final Append append) throws IOException {
1181     return execOperationWithResult(true, null,
1182         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1183       @Override
1184       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1185           throws IOException {
1186         setResult(oserver.preAppendAfterRowLock(ctx, append));
1187       }
1188     });
1189   }
1190 
1191   /**
1192    * @param increment increment object
1193    * @return result to return to client if default operation should be
1194    * bypassed, null otherwise
1195    * @throws IOException if an error occurred on the coprocessor
1196    */
1197   public Result preIncrement(final Increment increment) throws IOException {
1198     return execOperationWithResult(true, null,
1199         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1200       @Override
1201       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1202           throws IOException {
1203         setResult(oserver.preIncrement(ctx, increment));
1204       }
1205     });
1206   }
1207 
1208   /**
1209    * @param increment increment object
1210    * @return result to return to client if default operation should be
1211    * bypassed, null otherwise
1212    * @throws IOException if an error occurred on the coprocessor
1213    */
1214   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1215     return execOperationWithResult(true, null,
1216         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1217       @Override
1218       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1219           throws IOException {
1220         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1221       }
1222     });
1223   }
1224 
1225   /**
1226    * @param append Append object
1227    * @param result the result returned by the append
1228    * @throws IOException if an error occurred on the coprocessor
1229    */
1230   public void postAppend(final Append append, final Result result) throws IOException {
1231     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1232       @Override
1233       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1234           throws IOException {
1235         oserver.postAppend(ctx, append, result);
1236       }
1237     });
1238   }
1239 
1240   /**
1241    * @param increment increment object
1242    * @param result the result returned by postIncrement
1243    * @throws IOException if an error occurred on the coprocessor
1244    */
1245   public Result postIncrement(final Increment increment, Result result) throws IOException {
1246     return execOperationWithResult(result,
1247         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1248       @Override
1249       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1250           throws IOException {
1251         setResult(oserver.postIncrement(ctx, increment, getResult()));
1252       }
1253     });
1254   }
1255 
1256   /**
1257    * @param scan the Scan specification
1258    * @return scanner id to return to client if default operation should be
1259    * bypassed, false otherwise
1260    * @exception IOException Exception
1261    */
1262   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1263     return execOperationWithResult(true, null,
1264         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1265       @Override
1266       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1267           throws IOException {
1268         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1269       }
1270     });
1271   }
1272 
1273   /**
1274    * See
1275    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1276    *    Store, Scan, NavigableSet, KeyValueScanner)}
1277    */
1278   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1279       final NavigableSet<byte[]> targetCols) throws IOException {
1280     return execOperationWithResult(null,
1281         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1282       @Override
1283       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1284           throws IOException {
1285         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1286       }
1287     });
1288   }
1289 
1290   /**
1291    * @param scan the Scan specification
1292    * @param s the scanner
1293    * @return the scanner instance to use
1294    * @exception IOException Exception
1295    */
1296   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1297     return execOperationWithResult(s,
1298         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1299       @Override
1300       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1301           throws IOException {
1302         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1303       }
1304     });
1305   }
1306 
1307   /**
1308    * @param s the scanner
1309    * @param results the result set returned by the region server
1310    * @param limit the maximum number of results to return
1311    * @return 'has next' indication to client if bypassing default behavior, or
1312    * null otherwise
1313    * @exception IOException Exception
1314    */
1315   public Boolean preScannerNext(final InternalScanner s,
1316       final List<Result> results, final int limit) throws IOException {
1317     return execOperationWithResult(true, false,
1318         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1319       @Override
1320       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1321           throws IOException {
1322         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1323       }
1324     });
1325   }
1326 
1327   /**
1328    * @param s the scanner
1329    * @param results the result set returned by the region server
1330    * @param limit the maximum number of results to return
1331    * @param hasMore
1332    * @return 'has more' indication to give to client
1333    * @exception IOException Exception
1334    */
1335   public boolean postScannerNext(final InternalScanner s,
1336       final List<Result> results, final int limit, boolean hasMore)
1337       throws IOException {
1338     return execOperationWithResult(hasMore,
1339         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1340       @Override
1341       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1342           throws IOException {
1343         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1344       }
1345     });
1346   }
1347 
1348   /**
1349    * This will be called by the scan flow when the current scanned row is being filtered out by the
1350    * filter.
1351    * @param s the scanner
1352    * @param currentRow The current rowkey which got filtered out
1353    * @param offset offset to rowkey
1354    * @param length length of rowkey
1355    * @return whether more rows are available for the scanner or not
1356    * @throws IOException
1357    */
1358   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1359       final int offset, final short length) throws IOException {
1360     return execOperationWithResult(true,
1361         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1362       @Override
1363       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1364           throws IOException {
1365         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1366       }
1367     });
1368   }
1369 
1370   /**
1371    * @param s the scanner
1372    * @return true if default behavior should be bypassed, false otherwise
1373    * @exception IOException Exception
1374    */
1375   public boolean preScannerClose(final InternalScanner s) throws IOException {
1376     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1377       @Override
1378       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1379           throws IOException {
1380         oserver.preScannerClose(ctx, s);
1381       }
1382     });
1383   }
1384 
1385   /**
1386    * @exception IOException Exception
1387    */
1388   public void postScannerClose(final InternalScanner s) throws IOException {
1389     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1390       @Override
1391       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1392           throws IOException {
1393         oserver.postScannerClose(ctx, s);
1394       }
1395     });
1396   }
1397 
1398   /**
1399    * @param info
1400    * @param logKey
1401    * @param logEdit
1402    * @return true if default behavior should be bypassed, false otherwise
1403    * @throws IOException
1404    */
1405   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1406       final WALEdit logEdit) throws IOException {
1407     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1408       @Override
1409       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1410           throws IOException {
1411         // Once we don't need to support the legacy call, replace RegionOperation with a version
1412         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1413         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1414         if (env.useLegacyPre) {
1415           if (logKey instanceof HLogKey) {
1416             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1417           } else {
1418             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1419           }
1420         } else {
1421           oserver.preWALRestore(ctx, info, logKey, logEdit);
1422         }
1423       }
1424     });
1425   }
1426 
1427   /**
1428    * @return true if default behavior should be bypassed, false otherwise
1429    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1430    */
1431   @Deprecated
1432   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1433       final WALEdit logEdit) throws IOException {
1434     return preWALRestore(info, (WALKey)logKey, logEdit);
1435   }
1436 
1437   /**
1438    * @param info
1439    * @param logKey
1440    * @param logEdit
1441    * @throws IOException
1442    */
1443   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1444       throws IOException {
1445     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1446       @Override
1447       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1448           throws IOException {
1449         // Once we don't need to support the legacy call, replace RegionOperation with a version
1450         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1451         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1452         if (env.useLegacyPost) {
1453           if (logKey instanceof HLogKey) {
1454             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1455           } else {
1456             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1457           }
1458         } else {
1459           oserver.postWALRestore(ctx, info, logKey, logEdit);
1460         }
1461       }
1462     });
1463   }
1464 
1465   /**
1466    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1467    */
1468   @Deprecated
1469   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1470       throws IOException {
1471     postWALRestore(info, (WALKey)logKey, logEdit);
1472   }
1473 
1474   /**
1475    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1476    * @return true if the default operation should be bypassed
1477    * @throws IOException
1478    */
1479   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1480     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1481       @Override
1482       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1483           throws IOException {
1484         oserver.preBulkLoadHFile(ctx, familyPaths);
1485       }
1486     });
1487   }
1488 
1489   /**
1490    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1491    * @param hasLoaded whether load was successful or not
1492    * @return the possibly modified value of hasLoaded
1493    * @throws IOException
1494    */
1495   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1496       boolean hasLoaded) throws IOException {
1497     return execOperationWithResult(hasLoaded,
1498         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1499       @Override
1500       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1501           throws IOException {
1502         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1503       }
1504     });
1505   }
1506 
1507   public void postStartRegionOperation(final Operation op) throws IOException {
1508     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1509       @Override
1510       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1511           throws IOException {
1512         oserver.postStartRegionOperation(ctx, op);
1513       }
1514     });
1515   }
1516 
1517   public void postCloseRegionOperation(final Operation op) throws IOException {
1518     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1519       @Override
1520       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1521           throws IOException {
1522         oserver.postCloseRegionOperation(ctx, op);
1523       }
1524     });
1525   }
1526 
1527   /**
1528    * @param fs fileystem to read from
1529    * @param p path to the file
1530    * @param in {@link FSDataInputStreamWrapper}
1531    * @param size Full size of the file
1532    * @param cacheConf
1533    * @param r original reference file. This will be not null only when reading a split file.
1534    * @return a Reader instance to use instead of the base reader if overriding
1535    * default behavior, null otherwise
1536    * @throws IOException
1537    */
1538   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1539       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1540       final Reference r) throws IOException {
1541     return execOperationWithResult(null,
1542         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1543       @Override
1544       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1545           throws IOException {
1546         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1547       }
1548     });
1549   }
1550 
1551   /**
1552    * @param fs fileystem to read from
1553    * @param p path to the file
1554    * @param in {@link FSDataInputStreamWrapper}
1555    * @param size Full size of the file
1556    * @param cacheConf
1557    * @param r original reference file. This will be not null only when reading a split file.
1558    * @param reader the base reader instance
1559    * @return The reader to use
1560    * @throws IOException
1561    */
1562   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1563       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1564       final Reference r, final StoreFile.Reader reader) throws IOException {
1565     return execOperationWithResult(reader,
1566         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1567       @Override
1568       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1569           throws IOException {
1570         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1571       }
1572     });
1573   }
1574 
1575   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1576       final Cell oldCell, Cell newCell) throws IOException {
1577     return execOperationWithResult(newCell,
1578         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1579       @Override
1580       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1581           throws IOException {
1582         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1583       }
1584     });
1585   }
1586 
1587   public Message preEndpointInvocation(final Service service, final String methodName,
1588       Message request) throws IOException {
1589     return execOperationWithResult(request,
1590         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1591       @Override
1592       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1593           throws IOException {
1594         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1595       }
1596     });
1597   }
1598 
1599   public void postEndpointInvocation(final Service service, final String methodName,
1600       final Message request, final Message.Builder responseBuilder) throws IOException {
1601     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1602       @Override
1603       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1604           throws IOException {
1605         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1606       }
1607     });
1608   }
1609 
1610   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1611     return execOperationWithResult(tracker,
1612         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1613       @Override
1614       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1615           throws IOException {
1616         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1617       }
1618     });
1619   }
1620 
1621   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1622     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1623     for (RegionEnvironment env : coprocessors) {
1624       DescriptiveStatistics ds = new DescriptiveStatistics();
1625       if (env.getInstance() instanceof RegionObserver) {
1626         for (Long time : env.getExecutionLatenciesNanos()) {
1627           ds.addValue(time);
1628         }
1629         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1630         if (ds.getN() == 0) {
1631           ds.addValue(0);
1632         }
1633         results.put(env.getInstance().getClass().getSimpleName(), ds);
1634       }
1635     }
1636     return results;
1637   }
1638 
1639   private static abstract class CoprocessorOperation
1640       extends ObserverContext<RegionCoprocessorEnvironment> {
1641     public abstract void call(Coprocessor observer,
1642         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1643     public abstract boolean hasCall(Coprocessor observer);
1644     public void postEnvCall(RegionEnvironment env) { }
1645   }
1646 
1647   private static abstract class RegionOperation extends CoprocessorOperation {
1648     public abstract void call(RegionObserver observer,
1649         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1650 
1651     public boolean hasCall(Coprocessor observer) {
1652       return observer instanceof RegionObserver;
1653     }
1654 
1655     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1656         throws IOException {
1657       call((RegionObserver)observer, ctx);
1658     }
1659   }
1660 
1661   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1662     private T result = null;
1663     public void setResult(final T result) { this.result = result; }
1664     public T getResult() { return this.result; }
1665   }
1666 
1667   private static abstract class EndpointOperation extends CoprocessorOperation {
1668     public abstract void call(EndpointObserver observer,
1669         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1670 
1671     public boolean hasCall(Coprocessor observer) {
1672       return observer instanceof EndpointObserver;
1673     }
1674 
1675     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1676         throws IOException {
1677       call((EndpointObserver)observer, ctx);
1678     }
1679   }
1680 
1681   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1682     private T result = null;
1683     public void setResult(final T result) { this.result = result; }
1684     public T getResult() { return this.result; }
1685   }
1686 
1687   private boolean execOperation(final CoprocessorOperation ctx)
1688       throws IOException {
1689     return execOperation(true, ctx);
1690   }
1691 
1692   private <T> T execOperationWithResult(final T defaultValue,
1693       final RegionOperationWithResult<T> ctx) throws IOException {
1694     if (ctx == null) return defaultValue;
1695     ctx.setResult(defaultValue);
1696     execOperation(true, ctx);
1697     return ctx.getResult();
1698   }
1699 
1700   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1701       final RegionOperationWithResult<T> ctx) throws IOException {
1702     boolean bypass = false;
1703     T result = defaultValue;
1704     if (ctx != null) {
1705       ctx.setResult(defaultValue);
1706       bypass = execOperation(true, ctx);
1707       result = ctx.getResult();
1708     }
1709     return bypass == ifBypass ? result : null;
1710   }
1711 
1712   private <T> T execOperationWithResult(final T defaultValue,
1713       final EndpointOperationWithResult<T> ctx) throws IOException {
1714     if (ctx == null) return defaultValue;
1715     ctx.setResult(defaultValue);
1716     execOperation(true, ctx);
1717     return ctx.getResult();
1718   }
1719 
1720   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1721       throws IOException {
1722     boolean bypass = false;
1723     for (RegionEnvironment env: coprocessors) {
1724       Coprocessor observer = env.getInstance();
1725       if (ctx.hasCall(observer)) {
1726         long startTime = System.nanoTime();
1727         ctx.prepare(env);
1728         Thread currentThread = Thread.currentThread();
1729         ClassLoader cl = currentThread.getContextClassLoader();
1730         try {
1731           currentThread.setContextClassLoader(env.getClassLoader());
1732           ctx.call(observer, ctx);
1733         } catch (Throwable e) {
1734           handleCoprocessorThrowable(env, e);
1735         } finally {
1736           currentThread.setContextClassLoader(cl);
1737         }
1738         env.offerExecutionLatency(System.nanoTime() - startTime);
1739         bypass |= ctx.shouldBypass();
1740         if (earlyExit && ctx.shouldComplete()) {
1741           break;
1742         }
1743       }
1744 
1745       ctx.postEnvCall(env);
1746     }
1747     return bypass;
1748   }
1749 }