View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.UUID;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  import java.util.regex.Matcher;
33  
34  import org.apache.commons.collections.map.AbstractReferenceMap;
35  import org.apache.commons.collections.map.ReferenceMap;
36  import org.apache.commons.lang.ClassUtils;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.classification.InterfaceStability;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.Coprocessor;
47  import org.apache.hadoop.hbase.CoprocessorEnvironment;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Durability;
56  import org.apache.hadoop.hbase.client.Get;
57  import org.apache.hadoop.hbase.client.Increment;
58  import org.apache.hadoop.hbase.client.Mutation;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.client.Scan;
62  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
63  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
64  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
65  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
66  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
67  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
68  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
69  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
70  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
71  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
72  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
73  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
74  import org.apache.hadoop.hbase.io.Reference;
75  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
76  import org.apache.hadoop.hbase.regionserver.Region.Operation;
77  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
78  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
79  import org.apache.hadoop.hbase.wal.WALKey;
80  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
81  import org.apache.hadoop.hbase.util.BoundedConcurrentLinkedQueue;
82  import org.apache.hadoop.hbase.util.Bytes;
83  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
84  import org.apache.hadoop.hbase.util.Pair;
85  
86  import com.google.common.collect.ImmutableList;
87  import com.google.common.collect.Lists;
88  import com.google.protobuf.Message;
89  import com.google.protobuf.Service;
90  
91  /**
92   * Implements the coprocessor environment and runtime support for coprocessors
93   * loaded within a {@link Region}.
94   */
95  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
96  @InterfaceStability.Evolving
97  public class RegionCoprocessorHost
98      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
99  
100   private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
101   // The shared data map
102   private static ReferenceMap sharedDataMap =
103       new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
104 
105   // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
106   private final boolean hasCustomPostScannerFilterRow;
107 
108   /**
109    * 
110    * Encapsulation of the environment of each coprocessor
111    */
112   static class RegionEnvironment extends CoprocessorHost.Environment
113       implements RegionCoprocessorEnvironment {
114 
115     private Region region;
116     private RegionServerServices rsServices;
117     ConcurrentMap<String, Object> sharedData;
118     private static final int LATENCY_BUFFER_SIZE = 100;
119     private final BoundedConcurrentLinkedQueue<Long> coprocessorTimeNanos =
120         new BoundedConcurrentLinkedQueue<Long>(LATENCY_BUFFER_SIZE);
121     private final boolean useLegacyPre;
122     private final boolean useLegacyPost;
123 
124     /**
125      * Constructor
126      * @param impl the coprocessor instance
127      * @param priority chaining priority
128      */
129     public RegionEnvironment(final Coprocessor impl, final int priority,
130         final int seq, final Configuration conf, final Region region,
131         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
132       super(impl, priority, seq, conf);
133       this.region = region;
134       this.rsServices = services;
135       this.sharedData = sharedData;
136       // Pick which version of the WAL related events we'll call.
137       // This way we avoid calling the new version on older RegionObservers so
138       // we can maintain binary compatibility.
139       // See notes in javadoc for RegionObserver
140       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
141           HRegionInfo.class, WALKey.class, WALEdit.class);
142       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
143           HRegionInfo.class, WALKey.class, WALEdit.class);
144     }
145 
146     /** @return the region */
147     @Override
148     public Region getRegion() {
149       return region;
150     }
151 
152     /** @return reference to the region server services */
153     @Override
154     public RegionServerServices getRegionServerServices() {
155       return rsServices;
156     }
157 
158     public void shutdown() {
159       super.shutdown();
160     }
161 
162     @Override
163     public ConcurrentMap<String, Object> getSharedData() {
164       return sharedData;
165     }
166 
167     public void offerExecutionLatency(long latencyNanos) {
168       coprocessorTimeNanos.offer(latencyNanos);
169     }
170 
171     public Collection<Long> getExecutionLatenciesNanos() {
172       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
173       coprocessorTimeNanos.drainTo(latencies);
174       return latencies;
175     }
176 
177     @Override
178     public HRegionInfo getRegionInfo() {
179       return region.getRegionInfo();
180     }
181 
182   }
183 
184   static class TableCoprocessorAttribute {
185     private Path path;
186     private String className;
187     private int priority;
188     private Configuration conf;
189 
190     public TableCoprocessorAttribute(Path path, String className, int priority,
191         Configuration conf) {
192       this.path = path;
193       this.className = className;
194       this.priority = priority;
195       this.conf = conf;
196     }
197 
198     public Path getPath() {
199       return path;
200     }
201 
202     public String getClassName() {
203       return className;
204     }
205 
206     public int getPriority() {
207       return priority;
208     }
209 
210     public Configuration getConf() {
211       return conf;
212     }
213   }
214 
215   /** The region server services */
216   RegionServerServices rsServices;
217   /** The region */
218   Region region;
219 
220   /**
221    * Constructor
222    * @param region the region
223    * @param rsServices interface to available region server functionality
224    * @param conf the configuration
225    */
226   public RegionCoprocessorHost(final Region region,
227       final RegionServerServices rsServices, final Configuration conf) {
228     super(rsServices);
229     this.conf = conf;
230     this.rsServices = rsServices;
231     this.region = region;
232     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
233 
234     // load system default cp's from configuration.
235     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
236 
237     // load system default cp's for user tables from configuration.
238     if (!region.getRegionInfo().getTable().isSystemTable()) {
239       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
240     }
241 
242     // load Coprocessor From HDFS
243     loadTableCoprocessors(conf);
244 
245     // now check whether any coprocessor implements postScannerFilterRow
246     boolean hasCustomPostScannerFilterRow = false;
247     out: for (RegionEnvironment env: coprocessors) {
248       if (env.getInstance() instanceof RegionObserver) {
249         Class<?> clazz = env.getInstance().getClass();
250         for(;;) {
251           if (clazz == null) {
252             // we must have directly implemented RegionObserver
253             hasCustomPostScannerFilterRow = true;
254             break out;
255           }
256           if (clazz == BaseRegionObserver.class) {
257             // we reached BaseRegionObserver, try next coprocessor
258             break;
259           }
260           try {
261             clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
262               InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
263             // this coprocessor has a custom version of postScannerFilterRow
264             hasCustomPostScannerFilterRow = true;
265             break out;
266           } catch (NoSuchMethodException ignore) {
267           }
268           clazz = clazz.getSuperclass();
269         }
270       }
271     }
272     this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
273   }
274 
275   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
276       HTableDescriptor htd) {
277     List<TableCoprocessorAttribute> result = Lists.newArrayList();
278     for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
279       String key = Bytes.toString(e.getKey().get()).trim();
280       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
281         String spec = Bytes.toString(e.getValue().get()).trim();
282         // found one
283         try {
284           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
285           if (matcher.matches()) {
286             // jar file path can be empty if the cp class can be loaded
287             // from class loader.
288             Path path = matcher.group(1).trim().isEmpty() ?
289                 null : new Path(matcher.group(1).trim());
290             String className = matcher.group(2).trim();
291             if (className.isEmpty()) {
292               LOG.error("Malformed table coprocessor specification: key=" +
293                 key + ", spec: " + spec);
294               continue;
295             }
296             int priority = matcher.group(3).trim().isEmpty() ?
297                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
298             String cfgSpec = null;
299             try {
300               cfgSpec = matcher.group(4);
301             } catch (IndexOutOfBoundsException ex) {
302               // ignore
303             }
304             Configuration ourConf;
305             if (cfgSpec != null && !cfgSpec.trim().equals("|")) {
306               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
307               // do an explicit deep copy of the passed configuration
308               ourConf = new Configuration(false);
309               HBaseConfiguration.merge(ourConf, conf);
310               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
311               while (m.find()) {
312                 ourConf.set(m.group(1), m.group(2));
313               }
314             } else {
315               ourConf = conf;
316             }
317             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
318           } else {
319             LOG.error("Malformed table coprocessor specification: key=" + key +
320               ", spec: " + spec);
321           }
322         } catch (Exception ioe) {
323           LOG.error("Malformed table coprocessor specification: key=" + key +
324             ", spec: " + spec);
325         }
326       }
327     }
328     return result;
329   }
330 
331   /**
332    * Sanity check the table coprocessor attributes of the supplied schema. Will
333    * throw an exception if there is a problem.
334    * @param conf
335    * @param htd
336    * @throws IOException
337    */
338   public static void testTableCoprocessorAttrs(final Configuration conf,
339       final HTableDescriptor htd) throws IOException {
340     String pathPrefix = UUID.randomUUID().toString();
341     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
342       if (attr.getPriority() < 0) {
343         throw new IOException("Priority for coprocessor " + attr.getClassName() +
344           " cannot be less than 0");
345       }
346       ClassLoader old = Thread.currentThread().getContextClassLoader();
347       try {
348         ClassLoader cl;
349         if (attr.getPath() != null) {
350           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
351             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
352         } else {
353           cl = CoprocessorHost.class.getClassLoader();
354         }
355         Thread.currentThread().setContextClassLoader(cl);
356         cl.loadClass(attr.getClassName());
357       } catch (ClassNotFoundException e) {
358         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
359       } finally {
360         Thread.currentThread().setContextClassLoader(old);
361       }
362     }
363   }
364 
365   void loadTableCoprocessors(final Configuration conf) {
366     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
367       DEFAULT_COPROCESSORS_ENABLED);
368     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
369       DEFAULT_USER_COPROCESSORS_ENABLED);
370     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
371       return;
372     }
373 
374     // scan the table attributes for coprocessor load specifications
375     // initialize the coprocessors
376     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
377     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
378         region.getTableDesc())) {
379       // Load encompasses classloading and coprocessor initialization
380       try {
381         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
382           attr.getConf());
383         configured.add(env);
384         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
385             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
386       } catch (Throwable t) {
387         // Coprocessor failed to load, do we abort on error?
388         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
389           abortServer(attr.getClassName(), t);
390         } else {
391           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
392         }
393       }
394     }
395     // add together to coprocessor set for COW efficiency
396     coprocessors.addAll(configured);
397   }
398 
399   @Override
400   public RegionEnvironment createEnvironment(Class<?> implClass,
401       Coprocessor instance, int priority, int seq, Configuration conf) {
402     // Check if it's an Endpoint.
403     // Due to current dynamic protocol design, Endpoint
404     // uses a different way to be registered and executed.
405     // It uses a visitor pattern to invoke registered Endpoint
406     // method.
407     for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
408       Class<?> c = (Class<?>) itf;
409       if (CoprocessorService.class.isAssignableFrom(c)) {
410         region.registerService( ((CoprocessorService)instance).getService() );
411       }
412     }
413     ConcurrentMap<String, Object> classData;
414     // make sure only one thread can add maps
415     synchronized (sharedDataMap) {
416       // as long as at least one RegionEnvironment holds on to its classData it will
417       // remain in this map
418       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
419       if (classData == null) {
420         classData = new ConcurrentHashMap<String, Object>();
421         sharedDataMap.put(implClass.getName(), classData);
422       }
423     }
424     return new RegionEnvironment(instance, priority, seq, conf, region,
425         rsServices, classData);
426   }
427 
428   /**
429    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
430    *
431    * For example, {@link
432    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
433    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
434    *
435    * See also
436    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
437    *    CoprocessorEnvironment, Throwable)}
438    * @param env The coprocessor that threw the exception.
439    * @param e The exception that was thrown.
440    */
441   private void handleCoprocessorThrowableNoRethrow(
442       final CoprocessorEnvironment env, final Throwable e) {
443     try {
444       handleCoprocessorThrowable(env,e);
445     } catch (IOException ioe) {
446       // We cannot throw exceptions from the caller hook, so ignore.
447       LOG.warn(
448         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
449         e + ". Ignoring.",e);
450     }
451   }
452 
453   /**
454    * Invoked before a region open.
455    *
456    * @throws IOException Signals that an I/O exception has occurred.
457    */
458   public void preOpen() throws IOException {
459     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
460       @Override
461       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
462           throws IOException {
463         oserver.preOpen(ctx);
464       }
465     });
466   }
467 
468   /**
469    * Invoked after a region open
470    */
471   public void postOpen() {
472     try {
473       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
474         @Override
475         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
476             throws IOException {
477           oserver.postOpen(ctx);
478         }
479       });
480     } catch (IOException e) {
481       LOG.warn(e);
482     }
483   }
484 
485   /**
486    * Invoked after log replay on region
487    */
488   public void postLogReplay() {
489     try {
490       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
491         @Override
492         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
493             throws IOException {
494           oserver.postLogReplay(ctx);
495         }
496       });
497     } catch (IOException e) {
498       LOG.warn(e);
499     }
500   }
501 
502   /**
503    * Invoked before a region is closed
504    * @param abortRequested true if the server is aborting
505    */
506   public void preClose(final boolean abortRequested) throws IOException {
507     execOperation(false, new RegionOperation() {
508       @Override
509       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
510           throws IOException {
511         oserver.preClose(ctx, abortRequested);
512       }
513     });
514   }
515 
516   /**
517    * Invoked after a region is closed
518    * @param abortRequested true if the server is aborting
519    */
520   public void postClose(final boolean abortRequested) {
521     try {
522       execOperation(false, new RegionOperation() {
523         @Override
524         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
525             throws IOException {
526           oserver.postClose(ctx, abortRequested);
527         }
528         public void postEnvCall(RegionEnvironment env) {
529           shutdown(env);
530         }
531       });
532     } catch (IOException e) {
533       LOG.warn(e);
534     }
535   }
536 
537   /**
538    * See
539    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
540    */
541   public InternalScanner preCompactScannerOpen(final Store store,
542       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
543       final CompactionRequest request) throws IOException {
544     return execOperationWithResult(null,
545         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
546       @Override
547       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
548           throws IOException {
549         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
550           earliestPutTs, getResult(), request));
551       }
552     });
553   }
554 
555   /**
556    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
557    * available candidates.
558    * @param store The store where compaction is being requested
559    * @param candidates The currently available store files
560    * @param request custom compaction request
561    * @return If {@code true}, skip the normal selection process and use the current list
562    * @throws IOException
563    */
564   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
565       final CompactionRequest request) throws IOException {
566     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
567       @Override
568       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
569           throws IOException {
570         oserver.preCompactSelection(ctx, store, candidates, request);
571       }
572     });
573   }
574 
575   /**
576    * Called after the {@link StoreFile}s to be compacted have been selected from the available
577    * candidates.
578    * @param store The store where compaction is being requested
579    * @param selected The store files selected to compact
580    * @param request custom compaction
581    */
582   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
583       final CompactionRequest request) {
584     try {
585       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
586         @Override
587         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
588             throws IOException {
589           oserver.postCompactSelection(ctx, store, selected, request);
590         }
591       });
592     } catch (IOException e) {
593       LOG.warn(e);
594     }
595   }
596 
597   /**
598    * Called prior to rewriting the store files selected for compaction
599    * @param store the store being compacted
600    * @param scanner the scanner used to read store data during compaction
601    * @param scanType type of Scan
602    * @param request the compaction that will be executed
603    * @throws IOException
604    */
605   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
606       final ScanType scanType, final CompactionRequest request) throws IOException {
607     return execOperationWithResult(false, scanner,
608         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
609       @Override
610       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
611           throws IOException {
612         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
613       }
614     });
615   }
616 
617   /**
618    * Called after the store compaction has completed.
619    * @param store the store being compacted
620    * @param resultFile the new store file written during compaction
621    * @param request the compaction that is being executed
622    * @throws IOException
623    */
624   public void postCompact(final Store store, final StoreFile resultFile,
625       final CompactionRequest request) throws IOException {
626     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
627       @Override
628       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
629           throws IOException {
630         oserver.postCompact(ctx, store, resultFile, request);
631       }
632     });
633   }
634 
635   /**
636    * Invoked before a memstore flush
637    * @throws IOException
638    */
639   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
640       throws IOException {
641     return execOperationWithResult(false, scanner,
642         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
643       @Override
644       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
645           throws IOException {
646         setResult(oserver.preFlush(ctx, store, getResult()));
647       }
648     });
649   }
650 
651   /**
652    * Invoked before a memstore flush
653    * @throws IOException
654    */
655   public void preFlush() throws IOException {
656     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
657       @Override
658       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
659           throws IOException {
660         oserver.preFlush(ctx);
661       }
662     });
663   }
664 
665   /**
666    * See
667    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
668    *    Store, KeyValueScanner, InternalScanner)}
669    */
670   public InternalScanner preFlushScannerOpen(final Store store,
671       final KeyValueScanner memstoreScanner) throws IOException {
672     return execOperationWithResult(null,
673         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
674       @Override
675       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
676           throws IOException {
677         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
678       }
679     });
680   }
681 
682   /**
683    * Invoked after a memstore flush
684    * @throws IOException
685    */
686   public void postFlush() throws IOException {
687     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
688       @Override
689       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
690           throws IOException {
691         oserver.postFlush(ctx);
692       }
693     });
694   }
695 
696   /**
697    * Invoked after a memstore flush
698    * @throws IOException
699    */
700   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
701     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
702       @Override
703       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
704           throws IOException {
705         oserver.postFlush(ctx, store, storeFile);
706       }
707     });
708   }
709 
710   /**
711    * Invoked just before a split
712    * @throws IOException
713    */
714   // TODO: Deprecate this
715   public void preSplit() throws IOException {
716     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
717       @Override
718       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
719           throws IOException {
720         oserver.preSplit(ctx);
721       }
722     });
723   }
724 
725   /**
726    * Invoked just before a split
727    * @throws IOException
728    */
729   public void preSplit(final byte[] splitRow) throws IOException {
730     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
731       @Override
732       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
733           throws IOException {
734         oserver.preSplit(ctx, splitRow);
735       }
736     });
737   }
738 
739   /**
740    * Invoked just after a split
741    * @param l the new left-hand daughter region
742    * @param r the new right-hand daughter region
743    * @throws IOException
744    */
745   public void postSplit(final Region l, final Region r) throws IOException {
746     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
747       @Override
748       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
749           throws IOException {
750         oserver.postSplit(ctx, l, r);
751       }
752     });
753   }
754 
755   public boolean preSplitBeforePONR(final byte[] splitKey,
756       final List<Mutation> metaEntries) throws IOException {
757     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
758       @Override
759       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
760           throws IOException {
761         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
762       }
763     });
764   }
765 
766   public void preSplitAfterPONR() throws IOException {
767     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
768       @Override
769       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
770           throws IOException {
771         oserver.preSplitAfterPONR(ctx);
772       }
773     });
774   }
775 
776   /**
777    * Invoked just before the rollback of a failed split is started
778    * @throws IOException
779    */
780   public void preRollBackSplit() throws IOException {
781     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
782       @Override
783       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
784           throws IOException {
785         oserver.preRollBackSplit(ctx);
786       }
787     });
788   }
789 
790   /**
791    * Invoked just after the rollback of a failed split is done
792    * @throws IOException
793    */
794   public void postRollBackSplit() throws IOException {
795     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
796       @Override
797       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
798           throws IOException {
799         oserver.postRollBackSplit(ctx);
800       }
801     });
802   }
803 
804   /**
805    * Invoked after a split is completed irrespective of a failure or success.
806    * @throws IOException
807    */
808   public void postCompleteSplit() throws IOException {
809     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
810       @Override
811       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
812           throws IOException {
813         oserver.postCompleteSplit(ctx);
814       }
815     });
816   }
817 
818   // RegionObserver support
819 
820   /**
821    * @param row the row key
822    * @param family the family
823    * @param result the result set from the region
824    * @return true if default processing should be bypassed
825    * @exception IOException Exception
826    */
827   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
828       final Result result) throws IOException {
829     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
830       @Override
831       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
832           throws IOException {
833         oserver.preGetClosestRowBefore(ctx, row, family, result);
834       }
835     });
836   }
837 
838   /**
839    * @param row the row key
840    * @param family the family
841    * @param result the result set from the region
842    * @exception IOException Exception
843    */
844   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
845       final Result result) throws IOException {
846     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
847       @Override
848       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
849           throws IOException {
850         oserver.postGetClosestRowBefore(ctx, row, family, result);
851       }
852     });
853   }
854 
855   /**
856    * @param get the Get request
857    * @return true if default processing should be bypassed
858    * @exception IOException Exception
859    */
860   public boolean preGet(final Get get, final List<Cell> results)
861       throws IOException {
862     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
863       @Override
864       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
865           throws IOException {
866         oserver.preGetOp(ctx, get, results);
867       }
868     });
869   }
870 
871   /**
872    * @param get the Get request
873    * @param results the result sett
874    * @exception IOException Exception
875    */
876   public void postGet(final Get get, final List<Cell> results)
877       throws IOException {
878     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
879       @Override
880       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
881           throws IOException {
882         oserver.postGetOp(ctx, get, results);
883       }
884     });
885   }
886 
887   /**
888    * @param get the Get request
889    * @return true or false to return to client if bypassing normal operation,
890    * or null otherwise
891    * @exception IOException Exception
892    */
893   public Boolean preExists(final Get get) throws IOException {
894     return execOperationWithResult(true, false,
895         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
896       @Override
897       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
898           throws IOException {
899         setResult(oserver.preExists(ctx, get, getResult()));
900       }
901     });
902   }
903 
904   /**
905    * @param get the Get request
906    * @param exists the result returned by the region server
907    * @return the result to return to the client
908    * @exception IOException Exception
909    */
910   public boolean postExists(final Get get, boolean exists)
911       throws IOException {
912     return execOperationWithResult(exists,
913         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
914       @Override
915       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
916           throws IOException {
917         setResult(oserver.postExists(ctx, get, getResult()));
918       }
919     });
920   }
921 
922   /**
923    * @param put The Put object
924    * @param edit The WALEdit object.
925    * @param durability The durability used
926    * @return true if default processing should be bypassed
927    * @exception IOException Exception
928    */
929   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
930       throws IOException {
931     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
932       @Override
933       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
934           throws IOException {
935         oserver.prePut(ctx, put, edit, durability);
936       }
937     });
938   }
939 
940   /**
941    * @param mutation - the current mutation
942    * @param kv - the current cell
943    * @param byteNow - current timestamp in bytes
944    * @param get - the get that could be used
945    * Note that the get only does not specify the family and qualifier that should be used
946    * @return true if default processing should be bypassed
947    * @exception IOException
948    *              Exception
949    */
950   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
951       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
952     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
953       @Override
954       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
955           throws IOException {
956         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
957       }
958     });
959   }
960 
961   /**
962    * @param put The Put object
963    * @param edit The WALEdit object.
964    * @param durability The durability used
965    * @exception IOException Exception
966    */
967   public void postPut(final Put put, final WALEdit edit, final Durability durability)
968       throws IOException {
969     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
970       @Override
971       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
972           throws IOException {
973         oserver.postPut(ctx, put, edit, durability);
974       }
975     });
976   }
977 
978   /**
979    * @param delete The Delete object
980    * @param edit The WALEdit object.
981    * @param durability The durability used
982    * @return true if default processing should be bypassed
983    * @exception IOException Exception
984    */
985   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
986       throws IOException {
987     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
988       @Override
989       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
990           throws IOException {
991         oserver.preDelete(ctx, delete, edit, durability);
992       }
993     });
994   }
995 
996   /**
997    * @param delete The Delete object
998    * @param edit The WALEdit object.
999    * @param durability The durability used
1000    * @exception IOException Exception
1001    */
1002   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
1003       throws IOException {
1004     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1005       @Override
1006       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1007           throws IOException {
1008         oserver.postDelete(ctx, delete, edit, durability);
1009       }
1010     });
1011   }
1012 
1013   /**
1014    * @param miniBatchOp
1015    * @return true if default processing should be bypassed
1016    * @throws IOException
1017    */
1018   public boolean preBatchMutate(
1019       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1020     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1021       @Override
1022       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1023           throws IOException {
1024         oserver.preBatchMutate(ctx, miniBatchOp);
1025       }
1026     });
1027   }
1028 
1029   /**
1030    * @param miniBatchOp
1031    * @throws IOException
1032    */
1033   public void postBatchMutate(
1034       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1035     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1036       @Override
1037       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1038           throws IOException {
1039         oserver.postBatchMutate(ctx, miniBatchOp);
1040       }
1041     });
1042   }
1043 
1044   public void postBatchMutateIndispensably(
1045       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1046       throws IOException {
1047     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1048       @Override
1049       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1050           throws IOException {
1051         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1052       }
1053     });
1054   }
1055 
1056   /**
1057    * @param row row to check
1058    * @param family column family
1059    * @param qualifier column qualifier
1060    * @param compareOp the comparison operation
1061    * @param comparator the comparator
1062    * @param put data to put if check succeeds
1063    * @return true or false to return to client if default processing should
1064    * be bypassed, or null otherwise
1065    * @throws IOException e
1066    */
1067   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1068       final byte [] qualifier, final CompareOp compareOp,
1069       final ByteArrayComparable comparator, final Put put)
1070       throws IOException {
1071     return execOperationWithResult(true, false,
1072         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1073       @Override
1074       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1075           throws IOException {
1076         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1077           compareOp, comparator, put, getResult()));
1078       }
1079     });
1080   }
1081 
1082   /**
1083    * @param row row to check
1084    * @param family column family
1085    * @param qualifier column qualifier
1086    * @param compareOp the comparison operation
1087    * @param comparator the comparator
1088    * @param put data to put if check succeeds
1089    * @return true or false to return to client if default processing should
1090    * be bypassed, or null otherwise
1091    * @throws IOException e
1092    */
1093   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1094       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1095       final Put put) throws IOException {
1096     return execOperationWithResult(true, false,
1097         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1098       @Override
1099       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1100           throws IOException {
1101         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1102           compareOp, comparator, put, getResult()));
1103       }
1104     });
1105   }
1106 
1107   /**
1108    * @param row row to check
1109    * @param family column family
1110    * @param qualifier column qualifier
1111    * @param compareOp the comparison operation
1112    * @param comparator the comparator
1113    * @param put data to put if check succeeds
1114    * @throws IOException e
1115    */
1116   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1117       final byte [] qualifier, final CompareOp compareOp,
1118       final ByteArrayComparable comparator, final Put put,
1119       boolean result) throws IOException {
1120     return execOperationWithResult(result,
1121         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1122       @Override
1123       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1124           throws IOException {
1125         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1126           compareOp, comparator, put, getResult()));
1127       }
1128     });
1129   }
1130 
1131   /**
1132    * @param row row to check
1133    * @param family column family
1134    * @param qualifier column qualifier
1135    * @param compareOp the comparison operation
1136    * @param comparator the comparator
1137    * @param delete delete to commit if check succeeds
1138    * @return true or false to return to client if default processing should
1139    * be bypassed, or null otherwise
1140    * @throws IOException e
1141    */
1142   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1143       final byte [] qualifier, final CompareOp compareOp,
1144       final ByteArrayComparable comparator, final Delete delete)
1145       throws IOException {
1146     return execOperationWithResult(true, false,
1147         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1148       @Override
1149       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1150           throws IOException {
1151         setResult(oserver.preCheckAndDelete(ctx, row, family,
1152             qualifier, compareOp, comparator, delete, getResult()));
1153       }
1154     });
1155   }
1156 
1157   /**
1158    * @param row row to check
1159    * @param family column family
1160    * @param qualifier column qualifier
1161    * @param compareOp the comparison operation
1162    * @param comparator the comparator
1163    * @param delete delete to commit if check succeeds
1164    * @return true or false to return to client if default processing should
1165    * be bypassed, or null otherwise
1166    * @throws IOException e
1167    */
1168   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1169       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1170       final Delete delete) throws IOException {
1171     return execOperationWithResult(true, false,
1172         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1173       @Override
1174       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1175           throws IOException {
1176         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1177               family, qualifier, compareOp, comparator, delete, getResult()));
1178       }
1179     });
1180   }
1181 
1182   /**
1183    * @param row row to check
1184    * @param family column family
1185    * @param qualifier column qualifier
1186    * @param compareOp the comparison operation
1187    * @param comparator the comparator
1188    * @param delete delete to commit if check succeeds
1189    * @throws IOException e
1190    */
1191   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1192       final byte [] qualifier, final CompareOp compareOp,
1193       final ByteArrayComparable comparator, final Delete delete,
1194       boolean result) throws IOException {
1195     return execOperationWithResult(result,
1196         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1197       @Override
1198       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1199           throws IOException {
1200         setResult(oserver.postCheckAndDelete(ctx, row, family,
1201             qualifier, compareOp, comparator, delete, getResult()));
1202       }
1203     });
1204   }
1205 
1206   /**
1207    * @param append append object
1208    * @return result to return to client if default operation should be
1209    * bypassed, null otherwise
1210    * @throws IOException if an error occurred on the coprocessor
1211    */
1212   public Result preAppend(final Append append) throws IOException {
1213     return execOperationWithResult(true, null,
1214         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1215       @Override
1216       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1217           throws IOException {
1218         setResult(oserver.preAppend(ctx, append));
1219       }
1220     });
1221   }
1222 
1223   /**
1224    * @param append append object
1225    * @return result to return to client if default operation should be
1226    * bypassed, null otherwise
1227    * @throws IOException if an error occurred on the coprocessor
1228    */
1229   public Result preAppendAfterRowLock(final Append append) throws IOException {
1230     return execOperationWithResult(true, null,
1231         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1232       @Override
1233       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1234           throws IOException {
1235         setResult(oserver.preAppendAfterRowLock(ctx, append));
1236       }
1237     });
1238   }
1239 
1240   /**
1241    * @param increment increment object
1242    * @return result to return to client if default operation should be
1243    * bypassed, null otherwise
1244    * @throws IOException if an error occurred on the coprocessor
1245    */
1246   public Result preIncrement(final Increment increment) throws IOException {
1247     return execOperationWithResult(true, null,
1248         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1249       @Override
1250       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1251           throws IOException {
1252         setResult(oserver.preIncrement(ctx, increment));
1253       }
1254     });
1255   }
1256 
1257   /**
1258    * @param increment increment object
1259    * @return result to return to client if default operation should be
1260    * bypassed, null otherwise
1261    * @throws IOException if an error occurred on the coprocessor
1262    */
1263   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1264     return execOperationWithResult(true, null,
1265         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1266       @Override
1267       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1268           throws IOException {
1269         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1270       }
1271     });
1272   }
1273 
1274   /**
1275    * @param append Append object
1276    * @param result the result returned by the append
1277    * @throws IOException if an error occurred on the coprocessor
1278    */
1279   public void postAppend(final Append append, final Result result) throws IOException {
1280     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1281       @Override
1282       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1283           throws IOException {
1284         oserver.postAppend(ctx, append, result);
1285       }
1286     });
1287   }
1288 
1289   /**
1290    * @param increment increment object
1291    * @param result the result returned by postIncrement
1292    * @throws IOException if an error occurred on the coprocessor
1293    */
1294   public Result postIncrement(final Increment increment, Result result) throws IOException {
1295     return execOperationWithResult(result,
1296         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1297       @Override
1298       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1299           throws IOException {
1300         setResult(oserver.postIncrement(ctx, increment, getResult()));
1301       }
1302     });
1303   }
1304 
1305   /**
1306    * @param scan the Scan specification
1307    * @return scanner id to return to client if default operation should be
1308    * bypassed, false otherwise
1309    * @exception IOException Exception
1310    */
1311   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1312     return execOperationWithResult(true, null,
1313         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1314       @Override
1315       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1316           throws IOException {
1317         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1318       }
1319     });
1320   }
1321 
1322   /**
1323    * See
1324    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1325    *    Store, Scan, NavigableSet, KeyValueScanner)}
1326    */
1327   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1328       final NavigableSet<byte[]> targetCols) throws IOException {
1329     return execOperationWithResult(null,
1330         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1331       @Override
1332       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1333           throws IOException {
1334         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1335       }
1336     });
1337   }
1338 
1339   /**
1340    * @param scan the Scan specification
1341    * @param s the scanner
1342    * @return the scanner instance to use
1343    * @exception IOException Exception
1344    */
1345   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1346     return execOperationWithResult(s,
1347         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1348       @Override
1349       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1350           throws IOException {
1351         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1352       }
1353     });
1354   }
1355 
1356   /**
1357    * @param s the scanner
1358    * @param results the result set returned by the region server
1359    * @param limit the maximum number of results to return
1360    * @return 'has next' indication to client if bypassing default behavior, or
1361    * null otherwise
1362    * @exception IOException Exception
1363    */
1364   public Boolean preScannerNext(final InternalScanner s,
1365       final List<Result> results, final int limit) throws IOException {
1366     return execOperationWithResult(true, false,
1367         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1368       @Override
1369       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1370           throws IOException {
1371         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1372       }
1373     });
1374   }
1375 
1376   /**
1377    * @param s the scanner
1378    * @param results the result set returned by the region server
1379    * @param limit the maximum number of results to return
1380    * @param hasMore
1381    * @return 'has more' indication to give to client
1382    * @exception IOException Exception
1383    */
1384   public boolean postScannerNext(final InternalScanner s,
1385       final List<Result> results, final int limit, boolean hasMore)
1386       throws IOException {
1387     return execOperationWithResult(hasMore,
1388         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1389       @Override
1390       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1391           throws IOException {
1392         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1393       }
1394     });
1395   }
1396 
1397   /**
1398    * This will be called by the scan flow when the current scanned row is being filtered out by the
1399    * filter.
1400    * @param s the scanner
1401    * @param currentRow The current rowkey which got filtered out
1402    * @param offset offset to rowkey
1403    * @param length length of rowkey
1404    * @return whether more rows are available for the scanner or not
1405    * @throws IOException
1406    */
1407   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1408       final int offset, final short length) throws IOException {
1409     // short circuit for performance
1410     if (!hasCustomPostScannerFilterRow) return true;
1411     return execOperationWithResult(true,
1412         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1413       @Override
1414       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1415           throws IOException {
1416         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1417       }
1418     });
1419   }
1420 
1421   /**
1422    * @param s the scanner
1423    * @return true if default behavior should be bypassed, false otherwise
1424    * @exception IOException Exception
1425    */
1426   public boolean preScannerClose(final InternalScanner s) throws IOException {
1427     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1428       @Override
1429       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1430           throws IOException {
1431         oserver.preScannerClose(ctx, s);
1432       }
1433     });
1434   }
1435 
1436   /**
1437    * @exception IOException Exception
1438    */
1439   public void postScannerClose(final InternalScanner s) throws IOException {
1440     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1441       @Override
1442       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1443           throws IOException {
1444         oserver.postScannerClose(ctx, s);
1445       }
1446     });
1447   }
1448 
1449   /**
1450    * @param info
1451    * @param logKey
1452    * @param logEdit
1453    * @return true if default behavior should be bypassed, false otherwise
1454    * @throws IOException
1455    */
1456   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1457       final WALEdit logEdit) throws IOException {
1458     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1459       @Override
1460       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1461           throws IOException {
1462         // Once we don't need to support the legacy call, replace RegionOperation with a version
1463         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1464         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1465         if (env.useLegacyPre) {
1466           if (logKey instanceof HLogKey) {
1467             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1468           } else {
1469             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1470           }
1471         } else {
1472           oserver.preWALRestore(ctx, info, logKey, logEdit);
1473         }
1474       }
1475     });
1476   }
1477 
1478   /**
1479    * @return true if default behavior should be bypassed, false otherwise
1480    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1481    */
1482   @Deprecated
1483   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1484       final WALEdit logEdit) throws IOException {
1485     return preWALRestore(info, (WALKey)logKey, logEdit);
1486   }
1487 
1488   /**
1489    * @param info
1490    * @param logKey
1491    * @param logEdit
1492    * @throws IOException
1493    */
1494   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1495       throws IOException {
1496     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1497       @Override
1498       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1499           throws IOException {
1500         // Once we don't need to support the legacy call, replace RegionOperation with a version
1501         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1502         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1503         if (env.useLegacyPost) {
1504           if (logKey instanceof HLogKey) {
1505             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1506           } else {
1507             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1508           }
1509         } else {
1510           oserver.postWALRestore(ctx, info, logKey, logEdit);
1511         }
1512       }
1513     });
1514   }
1515 
1516   /**
1517    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1518    */
1519   @Deprecated
1520   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1521       throws IOException {
1522     postWALRestore(info, (WALKey)logKey, logEdit);
1523   }
1524 
1525   /**
1526    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1527    * @return true if the default operation should be bypassed
1528    * @throws IOException
1529    */
1530   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1531     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1532       @Override
1533       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1534           throws IOException {
1535         oserver.preBulkLoadHFile(ctx, familyPaths);
1536       }
1537     });
1538   }
1539 
1540   /**
1541    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1542    * @param hasLoaded whether load was successful or not
1543    * @return the possibly modified value of hasLoaded
1544    * @throws IOException
1545    */
1546   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1547       boolean hasLoaded) throws IOException {
1548     return execOperationWithResult(hasLoaded,
1549         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1550       @Override
1551       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1552           throws IOException {
1553         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1554       }
1555     });
1556   }
1557 
1558   public void postStartRegionOperation(final Operation op) throws IOException {
1559     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1560       @Override
1561       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1562           throws IOException {
1563         oserver.postStartRegionOperation(ctx, op);
1564       }
1565     });
1566   }
1567 
1568   public void postCloseRegionOperation(final Operation op) throws IOException {
1569     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1570       @Override
1571       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1572           throws IOException {
1573         oserver.postCloseRegionOperation(ctx, op);
1574       }
1575     });
1576   }
1577 
1578   /**
1579    * @param fs fileystem to read from
1580    * @param p path to the file
1581    * @param in {@link FSDataInputStreamWrapper}
1582    * @param size Full size of the file
1583    * @param cacheConf
1584    * @param r original reference file. This will be not null only when reading a split file.
1585    * @return a Reader instance to use instead of the base reader if overriding
1586    * default behavior, null otherwise
1587    * @throws IOException
1588    */
1589   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1590       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1591       final Reference r) throws IOException {
1592     return execOperationWithResult(null,
1593         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1594       @Override
1595       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1596           throws IOException {
1597         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1598       }
1599     });
1600   }
1601 
1602   /**
1603    * @param fs fileystem to read from
1604    * @param p path to the file
1605    * @param in {@link FSDataInputStreamWrapper}
1606    * @param size Full size of the file
1607    * @param cacheConf
1608    * @param r original reference file. This will be not null only when reading a split file.
1609    * @param reader the base reader instance
1610    * @return The reader to use
1611    * @throws IOException
1612    */
1613   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1614       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1615       final Reference r, final StoreFile.Reader reader) throws IOException {
1616     return execOperationWithResult(reader,
1617         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1618       @Override
1619       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1620           throws IOException {
1621         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1622       }
1623     });
1624   }
1625 
1626   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1627       final Cell oldCell, Cell newCell) throws IOException {
1628     return execOperationWithResult(newCell,
1629         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1630       @Override
1631       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1632           throws IOException {
1633         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1634       }
1635     });
1636   }
1637 
1638   public Message preEndpointInvocation(final Service service, final String methodName,
1639       Message request) throws IOException {
1640     return execOperationWithResult(request,
1641         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1642       @Override
1643       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1644           throws IOException {
1645         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1646       }
1647     });
1648   }
1649 
1650   public void postEndpointInvocation(final Service service, final String methodName,
1651       final Message request, final Message.Builder responseBuilder) throws IOException {
1652     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1653       @Override
1654       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1655           throws IOException {
1656         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1657       }
1658     });
1659   }
1660 
1661   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1662     return execOperationWithResult(tracker,
1663         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1664       @Override
1665       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1666           throws IOException {
1667         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1668       }
1669     });
1670   }
1671 
1672   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1673     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1674     for (RegionEnvironment env : coprocessors) {
1675       DescriptiveStatistics ds = new DescriptiveStatistics();
1676       if (env.getInstance() instanceof RegionObserver) {
1677         for (Long time : env.getExecutionLatenciesNanos()) {
1678           ds.addValue(time);
1679         }
1680         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1681         if (ds.getN() == 0) {
1682           ds.addValue(0);
1683         }
1684         results.put(env.getInstance().getClass().getSimpleName(), ds);
1685       }
1686     }
1687     return results;
1688   }
1689 
1690   private static abstract class CoprocessorOperation
1691       extends ObserverContext<RegionCoprocessorEnvironment> {
1692     public abstract void call(Coprocessor observer,
1693         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1694     public abstract boolean hasCall(Coprocessor observer);
1695     public void postEnvCall(RegionEnvironment env) { }
1696   }
1697 
1698   private static abstract class RegionOperation extends CoprocessorOperation {
1699     public abstract void call(RegionObserver observer,
1700         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1701 
1702     public boolean hasCall(Coprocessor observer) {
1703       return observer instanceof RegionObserver;
1704     }
1705 
1706     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1707         throws IOException {
1708       call((RegionObserver)observer, ctx);
1709     }
1710   }
1711 
1712   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1713     private T result = null;
1714     public void setResult(final T result) { this.result = result; }
1715     public T getResult() { return this.result; }
1716   }
1717 
1718   private static abstract class EndpointOperation extends CoprocessorOperation {
1719     public abstract void call(EndpointObserver observer,
1720         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1721 
1722     public boolean hasCall(Coprocessor observer) {
1723       return observer instanceof EndpointObserver;
1724     }
1725 
1726     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1727         throws IOException {
1728       call((EndpointObserver)observer, ctx);
1729     }
1730   }
1731 
1732   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1733     private T result = null;
1734     public void setResult(final T result) { this.result = result; }
1735     public T getResult() { return this.result; }
1736   }
1737 
1738   private boolean execOperation(final CoprocessorOperation ctx)
1739       throws IOException {
1740     return execOperation(true, ctx);
1741   }
1742 
1743   private <T> T execOperationWithResult(final T defaultValue,
1744       final RegionOperationWithResult<T> ctx) throws IOException {
1745     if (ctx == null) return defaultValue;
1746     ctx.setResult(defaultValue);
1747     execOperation(true, ctx);
1748     return ctx.getResult();
1749   }
1750 
1751   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1752       final RegionOperationWithResult<T> ctx) throws IOException {
1753     boolean bypass = false;
1754     T result = defaultValue;
1755     if (ctx != null) {
1756       ctx.setResult(defaultValue);
1757       bypass = execOperation(true, ctx);
1758       result = ctx.getResult();
1759     }
1760     return bypass == ifBypass ? result : null;
1761   }
1762 
1763   private <T> T execOperationWithResult(final T defaultValue,
1764       final EndpointOperationWithResult<T> ctx) throws IOException {
1765     if (ctx == null) return defaultValue;
1766     ctx.setResult(defaultValue);
1767     execOperation(true, ctx);
1768     return ctx.getResult();
1769   }
1770 
1771   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1772       throws IOException {
1773     boolean bypass = false;
1774     for (RegionEnvironment env: coprocessors) {
1775       Coprocessor observer = env.getInstance();
1776       if (ctx.hasCall(observer)) {
1777         long startTime = System.nanoTime();
1778         ctx.prepare(env);
1779         Thread currentThread = Thread.currentThread();
1780         ClassLoader cl = currentThread.getContextClassLoader();
1781         try {
1782           currentThread.setContextClassLoader(env.getClassLoader());
1783           ctx.call(observer, ctx);
1784         } catch (Throwable e) {
1785           handleCoprocessorThrowable(env, e);
1786         } finally {
1787           currentThread.setContextClassLoader(cl);
1788         }
1789         env.offerExecutionLatency(System.nanoTime() - startTime);
1790         bypass |= ctx.shouldBypass();
1791         if (earlyExit && ctx.shouldComplete()) {
1792           break;
1793         }
1794       }
1795 
1796       ctx.postEnvCall(env);
1797     }
1798     return bypass;
1799   }
1800 }