View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.UUID;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.regex.Matcher;
35  
36  import com.google.common.collect.ImmutableList;
37  import com.google.common.collect.Lists;
38  import com.google.protobuf.Message;
39  import com.google.protobuf.Service;
40  
41  import org.apache.commons.collections.map.AbstractReferenceMap;
42  import org.apache.commons.collections.map.ReferenceMap;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.Coprocessor;
53  import org.apache.hadoop.hbase.CoprocessorEnvironment;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.client.Append;
60  import org.apache.hadoop.hbase.client.Delete;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.client.Increment;
64  import org.apache.hadoop.hbase.client.Mutation;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
69  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
70  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
71  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
72  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
74  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
75  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
76  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
77  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
78  import org.apache.hadoop.hbase.io.Reference;
79  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
80  import org.apache.hadoop.hbase.regionserver.Region.Operation;
81  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
82  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
85  import org.apache.hadoop.hbase.util.Bytes;
86  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
87  import org.apache.hadoop.hbase.util.Pair;
88  
89  /**
90   * Implements the coprocessor environment and runtime support for coprocessors
91   * loaded within a {@link Region}.
92   */
93  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
94  @InterfaceStability.Evolving
95  public class RegionCoprocessorHost
96      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
97  
98    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
99    // The shared data map
100   private static ReferenceMap sharedDataMap =
101       new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
102 
103   /**
104    * Encapsulation of the environment of each coprocessor
105    */
106   static class RegionEnvironment extends CoprocessorHost.Environment
107       implements RegionCoprocessorEnvironment {
108 
109     private Region region;
110     private RegionServerServices rsServices;
111     ConcurrentMap<String, Object> sharedData;
112     private static final int LATENCY_BUFFER_SIZE = 100;
113     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
114         LATENCY_BUFFER_SIZE);
115     private final boolean useLegacyPre;
116     private final boolean useLegacyPost;
117 
118     /**
119      * Constructor
120      * @param impl the coprocessor instance
121      * @param priority chaining priority
122      */
123     public RegionEnvironment(final Coprocessor impl, final int priority,
124         final int seq, final Configuration conf, final Region region,
125         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
126       super(impl, priority, seq, conf);
127       this.region = region;
128       this.rsServices = services;
129       this.sharedData = sharedData;
130       // Pick which version of the WAL related events we'll call.
131       // This way we avoid calling the new version on older RegionObservers so
132       // we can maintain binary compatibility.
133       // See notes in javadoc for RegionObserver
134       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
135           HRegionInfo.class, WALKey.class, WALEdit.class);
136       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
137           HRegionInfo.class, WALKey.class, WALEdit.class);
138     }
139 
140     /** @return the region */
141     @Override
142     public Region getRegion() {
143       return region;
144     }
145 
146     /** @return reference to the region server services */
147     @Override
148     public RegionServerServices getRegionServerServices() {
149       return rsServices;
150     }
151 
152     public void shutdown() {
153       super.shutdown();
154     }
155 
156     @Override
157     public ConcurrentMap<String, Object> getSharedData() {
158       return sharedData;
159     }
160 
161     public void offerExecutionLatency(long latencyNanos) {
162       coprocessorTimeNanos.offer(latencyNanos);
163     }
164 
165     public Collection<Long> getExecutionLatenciesNanos() {
166       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
167       coprocessorTimeNanos.drainTo(latencies);
168       return latencies;
169     }
170 
171     @Override
172     public HRegionInfo getRegionInfo() {
173       return region.getRegionInfo();
174     }
175 
176   }
177 
178   static class TableCoprocessorAttribute {
179     private Path path;
180     private String className;
181     private int priority;
182     private Configuration conf;
183 
184     public TableCoprocessorAttribute(Path path, String className, int priority,
185         Configuration conf) {
186       this.path = path;
187       this.className = className;
188       this.priority = priority;
189       this.conf = conf;
190     }
191 
192     public Path getPath() {
193       return path;
194     }
195 
196     public String getClassName() {
197       return className;
198     }
199 
200     public int getPriority() {
201       return priority;
202     }
203 
204     public Configuration getConf() {
205       return conf;
206     }
207   }
208 
209   /** The region server services */
210   RegionServerServices rsServices;
211   /** The region */
212   Region region;
213 
214   /**
215    * Constructor
216    * @param region the region
217    * @param rsServices interface to available region server functionality
218    * @param conf the configuration
219    */
220   public RegionCoprocessorHost(final Region region,
221       final RegionServerServices rsServices, final Configuration conf) {
222     super(rsServices);
223     this.conf = conf;
224     this.rsServices = rsServices;
225     this.region = region;
226     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
227 
228     // load system default cp's from configuration.
229     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
230 
231     // load system default cp's for user tables from configuration.
232     if (!region.getRegionInfo().getTable().isSystemTable()) {
233       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
234     }
235 
236     // load Coprocessor From HDFS
237     loadTableCoprocessors(conf);
238   }
239 
240   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
241       HTableDescriptor htd) {
242     List<TableCoprocessorAttribute> result = Lists.newArrayList();
243     for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) {
244       String key = Bytes.toString(e.getKey().get()).trim();
245       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
246         String spec = Bytes.toString(e.getValue().get()).trim();
247         // found one
248         try {
249           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
250           if (matcher.matches()) {
251             // jar file path can be empty if the cp class can be loaded
252             // from class loader.
253             Path path = matcher.group(1).trim().isEmpty() ?
254                 null : new Path(matcher.group(1).trim());
255             String className = matcher.group(2).trim();
256             if (className.isEmpty()) {
257               LOG.error("Malformed table coprocessor specification: key=" +
258                 key + ", spec: " + spec);
259               continue;
260             }
261             String priorityStr = matcher.group(3).trim();
262             int priority = priorityStr.isEmpty() ?
263                 Coprocessor.PRIORITY_USER : Integer.parseInt(priorityStr);
264             String cfgSpec = null;
265             try {
266               cfgSpec = matcher.group(4);
267             } catch (IndexOutOfBoundsException ex) {
268               // ignore
269             }
270             Configuration ourConf;
271             if (cfgSpec != null) {
272               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
273               // do an explicit deep copy of the passed configuration
274               ourConf = new Configuration(false);
275               HBaseConfiguration.merge(ourConf, conf);
276               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
277               while (m.find()) {
278                 ourConf.set(m.group(1), m.group(2));
279               }
280             } else {
281               ourConf = conf;
282             }
283             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
284           } else {
285             LOG.error("Malformed table coprocessor specification: key=" + key +
286               ", spec: " + spec);
287           }
288         } catch (Exception ioe) {
289           LOG.error("Malformed table coprocessor specification: key=" + key +
290             ", spec: " + spec);
291         }
292       }
293     }
294     return result;
295   }
296 
297   /**
298    * Sanity check the table coprocessor attributes of the supplied schema. Will
299    * throw an exception if there is a problem.
300    * @param conf
301    * @param htd
302    * @throws IOException
303    */
304   public static void testTableCoprocessorAttrs(final Configuration conf,
305       final HTableDescriptor htd) throws IOException {
306     String pathPrefix = UUID.randomUUID().toString();
307     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
308       if (attr.getPriority() < 0) {
309         throw new IOException("Priority for coprocessor " + attr.getClassName() +
310           " cannot be less than 0");
311       }
312       ClassLoader old = Thread.currentThread().getContextClassLoader();
313       try {
314         ClassLoader cl;
315         if (attr.getPath() != null) {
316           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
317             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
318         } else {
319           cl = CoprocessorHost.class.getClassLoader();
320         }
321         Thread.currentThread().setContextClassLoader(cl);
322         cl.loadClass(attr.getClassName());
323       } catch (ClassNotFoundException e) {
324         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
325       } finally {
326         Thread.currentThread().setContextClassLoader(old);
327       }
328     }
329   }
330 
331   void loadTableCoprocessors(final Configuration conf) {
332     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
333       DEFAULT_COPROCESSORS_ENABLED);
334     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
335       DEFAULT_USER_COPROCESSORS_ENABLED);
336     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
337       return;
338     }
339 
340     // scan the table attributes for coprocessor load specifications
341     // initialize the coprocessors
342     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
343     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
344         region.getTableDesc())) {
345       // Load encompasses classloading and coprocessor initialization
346       try {
347         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
348           attr.getConf());
349         configured.add(env);
350         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
351             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
352       } catch (Throwable t) {
353         // Coprocessor failed to load, do we abort on error?
354         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
355           abortServer(attr.getClassName(), t);
356         } else {
357           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
358         }
359       }
360     }
361     // add together to coprocessor set for COW efficiency
362     coprocessors.addAll(configured);
363   }
364 
365   @Override
366   public RegionEnvironment createEnvironment(Class<?> implClass,
367       Coprocessor instance, int priority, int seq, Configuration conf) {
368     // Check if it's an Endpoint.
369     // Due to current dynamic protocol design, Endpoint
370     // uses a different way to be registered and executed.
371     // It uses a visitor pattern to invoke registered Endpoint
372     // method.
373     for (Class<?> c : implClass.getInterfaces()) {
374       if (CoprocessorService.class.isAssignableFrom(c)) {
375         region.registerService( ((CoprocessorService)instance).getService() );
376       }
377     }
378     ConcurrentMap<String, Object> classData;
379     // make sure only one thread can add maps
380     synchronized (sharedDataMap) {
381       // as long as at least one RegionEnvironment holds on to its classData it will
382       // remain in this map
383       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
384       if (classData == null) {
385         classData = new ConcurrentHashMap<String, Object>();
386         sharedDataMap.put(implClass.getName(), classData);
387       }
388     }
389     return new RegionEnvironment(instance, priority, seq, conf, region,
390         rsServices, classData);
391   }
392 
393   /**
394    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
395    *
396    * For example, {@link
397    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
398    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
399    *
400    * See also
401    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
402    *    CoprocessorEnvironment, Throwable)}
403    * @param env The coprocessor that threw the exception.
404    * @param e The exception that was thrown.
405    */
406   private void handleCoprocessorThrowableNoRethrow(
407       final CoprocessorEnvironment env, final Throwable e) {
408     try {
409       handleCoprocessorThrowable(env,e);
410     } catch (IOException ioe) {
411       // We cannot throw exceptions from the caller hook, so ignore.
412       LOG.warn(
413         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
414         e + ". Ignoring.",e);
415     }
416   }
417 
418   /**
419    * Invoked before a region open.
420    *
421    * @throws IOException Signals that an I/O exception has occurred.
422    */
423   public void preOpen() throws IOException {
424     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
425       @Override
426       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
427           throws IOException {
428         oserver.preOpen(ctx);
429       }
430     });
431   }
432 
433   /**
434    * Invoked after a region open
435    */
436   public void postOpen() {
437     try {
438       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
439         @Override
440         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
441             throws IOException {
442           oserver.postOpen(ctx);
443         }
444       });
445     } catch (IOException e) {
446       LOG.warn(e);
447     }
448   }
449 
450   /**
451    * Invoked after log replay on region
452    */
453   public void postLogReplay() {
454     try {
455       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
456         @Override
457         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
458             throws IOException {
459           oserver.postLogReplay(ctx);
460         }
461       });
462     } catch (IOException e) {
463       LOG.warn(e);
464     }
465   }
466 
467   /**
468    * Invoked before a region is closed
469    * @param abortRequested true if the server is aborting
470    */
471   public void preClose(final boolean abortRequested) throws IOException {
472     execOperation(false, new RegionOperation() {
473       @Override
474       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
475           throws IOException {
476         oserver.preClose(ctx, abortRequested);
477       }
478     });
479   }
480 
481   /**
482    * Invoked after a region is closed
483    * @param abortRequested true if the server is aborting
484    */
485   public void postClose(final boolean abortRequested) {
486     try {
487       execOperation(false, new RegionOperation() {
488         @Override
489         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
490             throws IOException {
491           oserver.postClose(ctx, abortRequested);
492         }
493         public void postEnvCall(RegionEnvironment env) {
494           shutdown(env);
495         }
496       });
497     } catch (IOException e) {
498       LOG.warn(e);
499     }
500   }
501 
502   /**
503    * See
504    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
505    */
506   public InternalScanner preCompactScannerOpen(final Store store,
507       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
508       final CompactionRequest request) throws IOException {
509     return execOperationWithResult(null,
510         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
511       @Override
512       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
513           throws IOException {
514         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
515           earliestPutTs, getResult(), request));
516       }
517     });
518   }
519 
520   /**
521    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
522    * available candidates.
523    * @param store The store where compaction is being requested
524    * @param candidates The currently available store files
525    * @param request custom compaction request
526    * @return If {@code true}, skip the normal selection process and use the current list
527    * @throws IOException
528    */
529   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
530       final CompactionRequest request) throws IOException {
531     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
532       @Override
533       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
534           throws IOException {
535         oserver.preCompactSelection(ctx, store, candidates, request);
536       }
537     });
538   }
539 
540   /**
541    * Called after the {@link StoreFile}s to be compacted have been selected from the available
542    * candidates.
543    * @param store The store where compaction is being requested
544    * @param selected The store files selected to compact
545    * @param request custom compaction
546    */
547   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
548       final CompactionRequest request) {
549     try {
550       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
551         @Override
552         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
553             throws IOException {
554           oserver.postCompactSelection(ctx, store, selected, request);
555         }
556       });
557     } catch (IOException e) {
558       LOG.warn(e);
559     }
560   }
561 
562   /**
563    * Called prior to rewriting the store files selected for compaction
564    * @param store the store being compacted
565    * @param scanner the scanner used to read store data during compaction
566    * @param scanType type of Scan
567    * @param request the compaction that will be executed
568    * @throws IOException
569    */
570   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
571       final ScanType scanType, final CompactionRequest request) throws IOException {
572     return execOperationWithResult(false, scanner,
573         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
574       @Override
575       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
576           throws IOException {
577         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
578       }
579     });
580   }
581 
582   /**
583    * Called after the store compaction has completed.
584    * @param store the store being compacted
585    * @param resultFile the new store file written during compaction
586    * @param request the compaction that is being executed
587    * @throws IOException
588    */
589   public void postCompact(final Store store, final StoreFile resultFile,
590       final CompactionRequest request) throws IOException {
591     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
592       @Override
593       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
594           throws IOException {
595         oserver.postCompact(ctx, store, resultFile, request);
596       }
597     });
598   }
599 
600   /**
601    * Invoked before a memstore flush
602    * @throws IOException
603    */
604   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
605       throws IOException {
606     return execOperationWithResult(false, scanner,
607         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
608       @Override
609       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
610           throws IOException {
611         setResult(oserver.preFlush(ctx, store, getResult()));
612       }
613     });
614   }
615 
616   /**
617    * Invoked before a memstore flush
618    * @throws IOException
619    */
620   public void preFlush() throws IOException {
621     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
622       @Override
623       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
624           throws IOException {
625         oserver.preFlush(ctx);
626       }
627     });
628   }
629 
630   /**
631    * See
632    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
633    *    Store, KeyValueScanner, InternalScanner)}
634    */
635   public InternalScanner preFlushScannerOpen(final Store store,
636       final KeyValueScanner memstoreScanner) throws IOException {
637     return execOperationWithResult(null,
638         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
639       @Override
640       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
641           throws IOException {
642         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
643       }
644     });
645   }
646 
647   /**
648    * Invoked after a memstore flush
649    * @throws IOException
650    */
651   public void postFlush() throws IOException {
652     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
653       @Override
654       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
655           throws IOException {
656         oserver.postFlush(ctx);
657       }
658     });
659   }
660 
661   /**
662    * Invoked after a memstore flush
663    * @throws IOException
664    */
665   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
666     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
667       @Override
668       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
669           throws IOException {
670         oserver.postFlush(ctx, store, storeFile);
671       }
672     });
673   }
674 
675   /**
676    * Invoked just before a split
677    * @throws IOException
678    */
679   // TODO: Deprecate this
680   public void preSplit() throws IOException {
681     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
682       @Override
683       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
684           throws IOException {
685         oserver.preSplit(ctx);
686       }
687     });
688   }
689 
690   /**
691    * Invoked just before a split
692    * @throws IOException
693    */
694   public void preSplit(final byte[] splitRow) throws IOException {
695     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
696       @Override
697       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
698           throws IOException {
699         oserver.preSplit(ctx, splitRow);
700       }
701     });
702   }
703 
704   /**
705    * Invoked just after a split
706    * @param l the new left-hand daughter region
707    * @param r the new right-hand daughter region
708    * @throws IOException
709    */
710   public void postSplit(final Region l, final Region r) throws IOException {
711     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
712       @Override
713       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
714           throws IOException {
715         oserver.postSplit(ctx, l, r);
716       }
717     });
718   }
719 
720   public boolean preSplitBeforePONR(final byte[] splitKey,
721       final List<Mutation> metaEntries) throws IOException {
722     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
723       @Override
724       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
725           throws IOException {
726         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
727       }
728     });
729   }
730 
731   public void preSplitAfterPONR() throws IOException {
732     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
733       @Override
734       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
735           throws IOException {
736         oserver.preSplitAfterPONR(ctx);
737       }
738     });
739   }
740 
741   /**
742    * Invoked just before the rollback of a failed split is started
743    * @throws IOException
744    */
745   public void preRollBackSplit() throws IOException {
746     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
747       @Override
748       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
749           throws IOException {
750         oserver.preRollBackSplit(ctx);
751       }
752     });
753   }
754 
755   /**
756    * Invoked just after the rollback of a failed split is done
757    * @throws IOException
758    */
759   public void postRollBackSplit() throws IOException {
760     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
761       @Override
762       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
763           throws IOException {
764         oserver.postRollBackSplit(ctx);
765       }
766     });
767   }
768 
769   /**
770    * Invoked after a split is completed irrespective of a failure or success.
771    * @throws IOException
772    */
773   public void postCompleteSplit() throws IOException {
774     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
775       @Override
776       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
777           throws IOException {
778         oserver.postCompleteSplit(ctx);
779       }
780     });
781   }
782 
783   // RegionObserver support
784 
785   /**
786    * @param row the row key
787    * @param family the family
788    * @param result the result set from the region
789    * @return true if default processing should be bypassed
790    * @exception IOException Exception
791    */
792   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
793       final Result result) throws IOException {
794     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
795       @Override
796       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
797           throws IOException {
798         oserver.preGetClosestRowBefore(ctx, row, family, result);
799       }
800     });
801   }
802 
803   /**
804    * @param row the row key
805    * @param family the family
806    * @param result the result set from the region
807    * @exception IOException Exception
808    */
809   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
810       final Result result) throws IOException {
811     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
812       @Override
813       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
814           throws IOException {
815         oserver.postGetClosestRowBefore(ctx, row, family, result);
816       }
817     });
818   }
819 
820   /**
821    * @param get the Get request
822    * @return true if default processing should be bypassed
823    * @exception IOException Exception
824    */
825   public boolean preGet(final Get get, final List<Cell> results)
826       throws IOException {
827     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
828       @Override
829       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
830           throws IOException {
831         oserver.preGetOp(ctx, get, results);
832       }
833     });
834   }
835 
836   /**
837    * @param get the Get request
838    * @param results the result sett
839    * @exception IOException Exception
840    */
841   public void postGet(final Get get, final List<Cell> results)
842       throws IOException {
843     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
844       @Override
845       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
846           throws IOException {
847         oserver.postGetOp(ctx, get, results);
848       }
849     });
850   }
851 
852   /**
853    * @param get the Get request
854    * @return true or false to return to client if bypassing normal operation,
855    * or null otherwise
856    * @exception IOException Exception
857    */
858   public Boolean preExists(final Get get) throws IOException {
859     return execOperationWithResult(true, false,
860         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
861       @Override
862       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
863           throws IOException {
864         setResult(oserver.preExists(ctx, get, getResult()));
865       }
866     });
867   }
868 
869   /**
870    * @param get the Get request
871    * @param exists the result returned by the region server
872    * @return the result to return to the client
873    * @exception IOException Exception
874    */
875   public boolean postExists(final Get get, boolean exists)
876       throws IOException {
877     return execOperationWithResult(exists,
878         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
879       @Override
880       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
881           throws IOException {
882         setResult(oserver.postExists(ctx, get, getResult()));
883       }
884     });
885   }
886 
887   /**
888    * @param put The Put object
889    * @param edit The WALEdit object.
890    * @param durability The durability used
891    * @return true if default processing should be bypassed
892    * @exception IOException Exception
893    */
894   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
895       throws IOException {
896     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
897       @Override
898       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
899           throws IOException {
900         oserver.prePut(ctx, put, edit, durability);
901       }
902     });
903   }
904 
905   /**
906    * @param mutation - the current mutation
907    * @param kv - the current cell
908    * @param byteNow - current timestamp in bytes
909    * @param get - the get that could be used
910    * Note that the get only does not specify the family and qualifier that should be used
911    * @return true if default processing should be bypassed
912    * @exception IOException
913    *              Exception
914    */
915   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
916       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
917     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
918       @Override
919       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
920           throws IOException {
921         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
922       }
923     });
924   }
925 
926   /**
927    * @param put The Put object
928    * @param edit The WALEdit object.
929    * @param durability The durability used
930    * @exception IOException Exception
931    */
932   public void postPut(final Put put, final WALEdit edit, final Durability durability)
933       throws IOException {
934     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
935       @Override
936       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
937           throws IOException {
938         oserver.postPut(ctx, put, edit, durability);
939       }
940     });
941   }
942 
943   /**
944    * @param delete The Delete object
945    * @param edit The WALEdit object.
946    * @param durability The durability used
947    * @return true if default processing should be bypassed
948    * @exception IOException Exception
949    */
950   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
951       throws IOException {
952     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
953       @Override
954       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
955           throws IOException {
956         oserver.preDelete(ctx, delete, edit, durability);
957       }
958     });
959   }
960 
961   /**
962    * @param delete The Delete object
963    * @param edit The WALEdit object.
964    * @param durability The durability used
965    * @exception IOException Exception
966    */
967   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
968       throws IOException {
969     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
970       @Override
971       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
972           throws IOException {
973         oserver.postDelete(ctx, delete, edit, durability);
974       }
975     });
976   }
977 
978   /**
979    * @param miniBatchOp
980    * @return true if default processing should be bypassed
981    * @throws IOException
982    */
983   public boolean preBatchMutate(
984       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
985     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
986       @Override
987       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
988           throws IOException {
989         oserver.preBatchMutate(ctx, miniBatchOp);
990       }
991     });
992   }
993 
994   /**
995    * @param miniBatchOp
996    * @throws IOException
997    */
998   public void postBatchMutate(
999       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1000     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1001       @Override
1002       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1003           throws IOException {
1004         oserver.postBatchMutate(ctx, miniBatchOp);
1005       }
1006     });
1007   }
1008 
1009   public void postBatchMutateIndispensably(
1010       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1011       throws IOException {
1012     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1013       @Override
1014       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1015           throws IOException {
1016         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1017       }
1018     });
1019   }
1020 
1021   /**
1022    * @param row row to check
1023    * @param family column family
1024    * @param qualifier column qualifier
1025    * @param compareOp the comparison operation
1026    * @param comparator the comparator
1027    * @param put data to put if check succeeds
1028    * @return true or false to return to client if default processing should
1029    * be bypassed, or null otherwise
1030    * @throws IOException e
1031    */
1032   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1033       final byte [] qualifier, final CompareOp compareOp,
1034       final ByteArrayComparable comparator, final Put put)
1035       throws IOException {
1036     return execOperationWithResult(true, false,
1037         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1038       @Override
1039       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1040           throws IOException {
1041         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1042           compareOp, comparator, put, getResult()));
1043       }
1044     });
1045   }
1046 
1047   /**
1048    * @param row row to check
1049    * @param family column family
1050    * @param qualifier column qualifier
1051    * @param compareOp the comparison operation
1052    * @param comparator the comparator
1053    * @param put data to put if check succeeds
1054    * @return true or false to return to client if default processing should
1055    * be bypassed, or null otherwise
1056    * @throws IOException e
1057    */
1058   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1059       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1060       final Put put) throws IOException {
1061     return execOperationWithResult(true, false,
1062         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1063       @Override
1064       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1065           throws IOException {
1066         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1067           compareOp, comparator, put, getResult()));
1068       }
1069     });
1070   }
1071 
1072   /**
1073    * @param row row to check
1074    * @param family column family
1075    * @param qualifier column qualifier
1076    * @param compareOp the comparison operation
1077    * @param comparator the comparator
1078    * @param put data to put if check succeeds
1079    * @throws IOException e
1080    */
1081   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1082       final byte [] qualifier, final CompareOp compareOp,
1083       final ByteArrayComparable comparator, final Put put,
1084       boolean result) throws IOException {
1085     return execOperationWithResult(result,
1086         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1087       @Override
1088       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1089           throws IOException {
1090         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1091           compareOp, comparator, put, getResult()));
1092       }
1093     });
1094   }
1095 
1096   /**
1097    * @param row row to check
1098    * @param family column family
1099    * @param qualifier column qualifier
1100    * @param compareOp the comparison operation
1101    * @param comparator the comparator
1102    * @param delete delete to commit if check succeeds
1103    * @return true or false to return to client if default processing should
1104    * be bypassed, or null otherwise
1105    * @throws IOException e
1106    */
1107   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1108       final byte [] qualifier, final CompareOp compareOp,
1109       final ByteArrayComparable comparator, final Delete delete)
1110       throws IOException {
1111     return execOperationWithResult(true, false,
1112         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1113       @Override
1114       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1115           throws IOException {
1116         setResult(oserver.preCheckAndDelete(ctx, row, family,
1117             qualifier, compareOp, comparator, delete, getResult()));
1118       }
1119     });
1120   }
1121 
1122   /**
1123    * @param row row to check
1124    * @param family column family
1125    * @param qualifier column qualifier
1126    * @param compareOp the comparison operation
1127    * @param comparator the comparator
1128    * @param delete delete to commit if check succeeds
1129    * @return true or false to return to client if default processing should
1130    * be bypassed, or null otherwise
1131    * @throws IOException e
1132    */
1133   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1134       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1135       final Delete delete) throws IOException {
1136     return execOperationWithResult(true, false,
1137         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1138       @Override
1139       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1140           throws IOException {
1141         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1142               family, qualifier, compareOp, comparator, delete, getResult()));
1143       }
1144     });
1145   }
1146 
1147   /**
1148    * @param row row to check
1149    * @param family column family
1150    * @param qualifier column qualifier
1151    * @param compareOp the comparison operation
1152    * @param comparator the comparator
1153    * @param delete delete to commit if check succeeds
1154    * @throws IOException e
1155    */
1156   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1157       final byte [] qualifier, final CompareOp compareOp,
1158       final ByteArrayComparable comparator, final Delete delete,
1159       boolean result) throws IOException {
1160     return execOperationWithResult(result,
1161         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1162       @Override
1163       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1164           throws IOException {
1165         setResult(oserver.postCheckAndDelete(ctx, row, family,
1166             qualifier, compareOp, comparator, delete, getResult()));
1167       }
1168     });
1169   }
1170 
1171   /**
1172    * @param append append object
1173    * @return result to return to client if default operation should be
1174    * bypassed, null otherwise
1175    * @throws IOException if an error occurred on the coprocessor
1176    */
1177   public Result preAppend(final Append append) throws IOException {
1178     return execOperationWithResult(true, null,
1179         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1180       @Override
1181       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1182           throws IOException {
1183         setResult(oserver.preAppend(ctx, append));
1184       }
1185     });
1186   }
1187 
1188   /**
1189    * @param append append object
1190    * @return result to return to client if default operation should be
1191    * bypassed, null otherwise
1192    * @throws IOException if an error occurred on the coprocessor
1193    */
1194   public Result preAppendAfterRowLock(final Append append) throws IOException {
1195     return execOperationWithResult(true, null,
1196         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1197       @Override
1198       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1199           throws IOException {
1200         setResult(oserver.preAppendAfterRowLock(ctx, append));
1201       }
1202     });
1203   }
1204 
1205   /**
1206    * @param increment increment object
1207    * @return result to return to client if default operation should be
1208    * bypassed, null otherwise
1209    * @throws IOException if an error occurred on the coprocessor
1210    */
1211   public Result preIncrement(final Increment increment) throws IOException {
1212     return execOperationWithResult(true, null,
1213         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1214       @Override
1215       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1216           throws IOException {
1217         setResult(oserver.preIncrement(ctx, increment));
1218       }
1219     });
1220   }
1221 
1222   /**
1223    * @param increment increment object
1224    * @return result to return to client if default operation should be
1225    * bypassed, null otherwise
1226    * @throws IOException if an error occurred on the coprocessor
1227    */
1228   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1229     return execOperationWithResult(true, null,
1230         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1231       @Override
1232       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1233           throws IOException {
1234         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1235       }
1236     });
1237   }
1238 
1239   /**
1240    * @param append Append object
1241    * @param result the result returned by the append
1242    * @throws IOException if an error occurred on the coprocessor
1243    */
1244   public void postAppend(final Append append, final Result result) throws IOException {
1245     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1246       @Override
1247       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1248           throws IOException {
1249         oserver.postAppend(ctx, append, result);
1250       }
1251     });
1252   }
1253 
1254   /**
1255    * @param increment increment object
1256    * @param result the result returned by postIncrement
1257    * @throws IOException if an error occurred on the coprocessor
1258    */
1259   public Result postIncrement(final Increment increment, Result result) throws IOException {
1260     return execOperationWithResult(result,
1261         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1262       @Override
1263       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1264           throws IOException {
1265         setResult(oserver.postIncrement(ctx, increment, getResult()));
1266       }
1267     });
1268   }
1269 
1270   /**
1271    * @param scan the Scan specification
1272    * @return scanner id to return to client if default operation should be
1273    * bypassed, false otherwise
1274    * @exception IOException Exception
1275    */
1276   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1277     return execOperationWithResult(true, null,
1278         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1279       @Override
1280       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1281           throws IOException {
1282         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1283       }
1284     });
1285   }
1286 
1287   /**
1288    * See
1289    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1290    *    Store, Scan, NavigableSet, KeyValueScanner)}
1291    */
1292   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1293       final NavigableSet<byte[]> targetCols) throws IOException {
1294     return execOperationWithResult(null,
1295         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1296       @Override
1297       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1298           throws IOException {
1299         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1300       }
1301     });
1302   }
1303 
1304   /**
1305    * @param scan the Scan specification
1306    * @param s the scanner
1307    * @return the scanner instance to use
1308    * @exception IOException Exception
1309    */
1310   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1311     return execOperationWithResult(s,
1312         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1313       @Override
1314       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1315           throws IOException {
1316         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1317       }
1318     });
1319   }
1320 
1321   /**
1322    * @param s the scanner
1323    * @param results the result set returned by the region server
1324    * @param limit the maximum number of results to return
1325    * @return 'has next' indication to client if bypassing default behavior, or
1326    * null otherwise
1327    * @exception IOException Exception
1328    */
1329   public Boolean preScannerNext(final InternalScanner s,
1330       final List<Result> results, final int limit) throws IOException {
1331     return execOperationWithResult(true, false,
1332         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1333       @Override
1334       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1335           throws IOException {
1336         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1337       }
1338     });
1339   }
1340 
1341   /**
1342    * @param s the scanner
1343    * @param results the result set returned by the region server
1344    * @param limit the maximum number of results to return
1345    * @param hasMore
1346    * @return 'has more' indication to give to client
1347    * @exception IOException Exception
1348    */
1349   public boolean postScannerNext(final InternalScanner s,
1350       final List<Result> results, final int limit, boolean hasMore)
1351       throws IOException {
1352     return execOperationWithResult(hasMore,
1353         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1354       @Override
1355       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1356           throws IOException {
1357         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1358       }
1359     });
1360   }
1361 
1362   /**
1363    * This will be called by the scan flow when the current scanned row is being filtered out by the
1364    * filter.
1365    * @param s the scanner
1366    * @param currentRow The current rowkey which got filtered out
1367    * @param offset offset to rowkey
1368    * @param length length of rowkey
1369    * @return whether more rows are available for the scanner or not
1370    * @throws IOException
1371    */
1372   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1373       final int offset, final short length) throws IOException {
1374     return execOperationWithResult(true,
1375         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1376       @Override
1377       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1378           throws IOException {
1379         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1380       }
1381     });
1382   }
1383 
1384   /**
1385    * @param s the scanner
1386    * @return true if default behavior should be bypassed, false otherwise
1387    * @exception IOException Exception
1388    */
1389   public boolean preScannerClose(final InternalScanner s) throws IOException {
1390     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1391       @Override
1392       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1393           throws IOException {
1394         oserver.preScannerClose(ctx, s);
1395       }
1396     });
1397   }
1398 
1399   /**
1400    * @exception IOException Exception
1401    */
1402   public void postScannerClose(final InternalScanner s) throws IOException {
1403     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1404       @Override
1405       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1406           throws IOException {
1407         oserver.postScannerClose(ctx, s);
1408       }
1409     });
1410   }
1411 
1412   /**
1413    * @param info
1414    * @param logKey
1415    * @param logEdit
1416    * @return true if default behavior should be bypassed, false otherwise
1417    * @throws IOException
1418    */
1419   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1420       final WALEdit logEdit) throws IOException {
1421     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1422       @Override
1423       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1424           throws IOException {
1425         // Once we don't need to support the legacy call, replace RegionOperation with a version
1426         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1427         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1428         if (env.useLegacyPre) {
1429           if (logKey instanceof HLogKey) {
1430             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1431           } else {
1432             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1433           }
1434         } else {
1435           oserver.preWALRestore(ctx, info, logKey, logEdit);
1436         }
1437       }
1438     });
1439   }
1440 
1441   /**
1442    * @return true if default behavior should be bypassed, false otherwise
1443    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1444    */
1445   @Deprecated
1446   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1447       final WALEdit logEdit) throws IOException {
1448     return preWALRestore(info, (WALKey)logKey, logEdit);
1449   }
1450 
1451   /**
1452    * @param info
1453    * @param logKey
1454    * @param logEdit
1455    * @throws IOException
1456    */
1457   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1458       throws IOException {
1459     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1460       @Override
1461       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1462           throws IOException {
1463         // Once we don't need to support the legacy call, replace RegionOperation with a version
1464         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1465         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1466         if (env.useLegacyPost) {
1467           if (logKey instanceof HLogKey) {
1468             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1469           } else {
1470             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1471           }
1472         } else {
1473           oserver.postWALRestore(ctx, info, logKey, logEdit);
1474         }
1475       }
1476     });
1477   }
1478 
1479   /**
1480    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1481    */
1482   @Deprecated
1483   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1484       throws IOException {
1485     postWALRestore(info, (WALKey)logKey, logEdit);
1486   }
1487 
1488   /**
1489    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1490    * @return true if the default operation should be bypassed
1491    * @throws IOException
1492    */
1493   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1494     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1495       @Override
1496       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1497           throws IOException {
1498         oserver.preBulkLoadHFile(ctx, familyPaths);
1499       }
1500     });
1501   }
1502 
1503   /**
1504    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1505    * @param hasLoaded whether load was successful or not
1506    * @return the possibly modified value of hasLoaded
1507    * @throws IOException
1508    */
1509   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1510       boolean hasLoaded) throws IOException {
1511     return execOperationWithResult(hasLoaded,
1512         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1513       @Override
1514       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1515           throws IOException {
1516         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1517       }
1518     });
1519   }
1520 
1521   public void postStartRegionOperation(final Operation op) throws IOException {
1522     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1523       @Override
1524       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1525           throws IOException {
1526         oserver.postStartRegionOperation(ctx, op);
1527       }
1528     });
1529   }
1530 
1531   public void postCloseRegionOperation(final Operation op) throws IOException {
1532     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1533       @Override
1534       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1535           throws IOException {
1536         oserver.postCloseRegionOperation(ctx, op);
1537       }
1538     });
1539   }
1540 
1541   /**
1542    * @param fs fileystem to read from
1543    * @param p path to the file
1544    * @param in {@link FSDataInputStreamWrapper}
1545    * @param size Full size of the file
1546    * @param cacheConf
1547    * @param r original reference file. This will be not null only when reading a split file.
1548    * @return a Reader instance to use instead of the base reader if overriding
1549    * default behavior, null otherwise
1550    * @throws IOException
1551    */
1552   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1553       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1554       final Reference r) throws IOException {
1555     return execOperationWithResult(null,
1556         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1557       @Override
1558       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1559           throws IOException {
1560         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1561       }
1562     });
1563   }
1564 
1565   /**
1566    * @param fs fileystem to read from
1567    * @param p path to the file
1568    * @param in {@link FSDataInputStreamWrapper}
1569    * @param size Full size of the file
1570    * @param cacheConf
1571    * @param r original reference file. This will be not null only when reading a split file.
1572    * @param reader the base reader instance
1573    * @return The reader to use
1574    * @throws IOException
1575    */
1576   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1577       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1578       final Reference r, final StoreFile.Reader reader) throws IOException {
1579     return execOperationWithResult(reader,
1580         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1581       @Override
1582       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1583           throws IOException {
1584         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1585       }
1586     });
1587   }
1588 
1589   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1590       final Cell oldCell, Cell newCell) throws IOException {
1591     return execOperationWithResult(newCell,
1592         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1593       @Override
1594       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1595           throws IOException {
1596         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1597       }
1598     });
1599   }
1600 
1601   public Message preEndpointInvocation(final Service service, final String methodName,
1602       Message request) throws IOException {
1603     return execOperationWithResult(request,
1604         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1605       @Override
1606       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1607           throws IOException {
1608         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1609       }
1610     });
1611   }
1612 
1613   public void postEndpointInvocation(final Service service, final String methodName,
1614       final Message request, final Message.Builder responseBuilder) throws IOException {
1615     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1616       @Override
1617       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1618           throws IOException {
1619         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1620       }
1621     });
1622   }
1623 
1624   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1625     return execOperationWithResult(tracker,
1626         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1627       @Override
1628       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1629           throws IOException {
1630         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1631       }
1632     });
1633   }
1634 
1635   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1636     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1637     for (RegionEnvironment env : coprocessors) {
1638       DescriptiveStatistics ds = new DescriptiveStatistics();
1639       if (env.getInstance() instanceof RegionObserver) {
1640         for (Long time : env.getExecutionLatenciesNanos()) {
1641           ds.addValue(time);
1642         }
1643         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1644         if (ds.getN() == 0) {
1645           ds.addValue(0);
1646         }
1647         results.put(env.getInstance().getClass().getSimpleName(), ds);
1648       }
1649     }
1650     return results;
1651   }
1652 
1653   private static abstract class CoprocessorOperation
1654       extends ObserverContext<RegionCoprocessorEnvironment> {
1655     public abstract void call(Coprocessor observer,
1656         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1657     public abstract boolean hasCall(Coprocessor observer);
1658     public void postEnvCall(RegionEnvironment env) { }
1659   }
1660 
1661   private static abstract class RegionOperation extends CoprocessorOperation {
1662     public abstract void call(RegionObserver observer,
1663         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1664 
1665     public boolean hasCall(Coprocessor observer) {
1666       return observer instanceof RegionObserver;
1667     }
1668 
1669     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1670         throws IOException {
1671       call((RegionObserver)observer, ctx);
1672     }
1673   }
1674 
1675   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1676     private T result = null;
1677     public void setResult(final T result) { this.result = result; }
1678     public T getResult() { return this.result; }
1679   }
1680 
1681   private static abstract class EndpointOperation extends CoprocessorOperation {
1682     public abstract void call(EndpointObserver observer,
1683         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1684 
1685     public boolean hasCall(Coprocessor observer) {
1686       return observer instanceof EndpointObserver;
1687     }
1688 
1689     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1690         throws IOException {
1691       call((EndpointObserver)observer, ctx);
1692     }
1693   }
1694 
1695   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1696     private T result = null;
1697     public void setResult(final T result) { this.result = result; }
1698     public T getResult() { return this.result; }
1699   }
1700 
1701   private boolean execOperation(final CoprocessorOperation ctx)
1702       throws IOException {
1703     return execOperation(true, ctx);
1704   }
1705 
1706   private <T> T execOperationWithResult(final T defaultValue,
1707       final RegionOperationWithResult<T> ctx) throws IOException {
1708     if (ctx == null) return defaultValue;
1709     ctx.setResult(defaultValue);
1710     execOperation(true, ctx);
1711     return ctx.getResult();
1712   }
1713 
1714   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1715       final RegionOperationWithResult<T> ctx) throws IOException {
1716     boolean bypass = false;
1717     T result = defaultValue;
1718     if (ctx != null) {
1719       ctx.setResult(defaultValue);
1720       bypass = execOperation(true, ctx);
1721       result = ctx.getResult();
1722     }
1723     return bypass == ifBypass ? result : null;
1724   }
1725 
1726   private <T> T execOperationWithResult(final T defaultValue,
1727       final EndpointOperationWithResult<T> ctx) throws IOException {
1728     if (ctx == null) return defaultValue;
1729     ctx.setResult(defaultValue);
1730     execOperation(true, ctx);
1731     return ctx.getResult();
1732   }
1733 
1734   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1735       throws IOException {
1736     boolean bypass = false;
1737     for (RegionEnvironment env: coprocessors) {
1738       Coprocessor observer = env.getInstance();
1739       if (ctx.hasCall(observer)) {
1740         long startTime = System.nanoTime();
1741         ctx.prepare(env);
1742         Thread currentThread = Thread.currentThread();
1743         ClassLoader cl = currentThread.getContextClassLoader();
1744         try {
1745           currentThread.setContextClassLoader(env.getClassLoader());
1746           ctx.call(observer, ctx);
1747         } catch (Throwable e) {
1748           handleCoprocessorThrowable(env, e);
1749         } finally {
1750           currentThread.setContextClassLoader(cl);
1751         }
1752         env.offerExecutionLatency(System.nanoTime() - startTime);
1753         bypass |= ctx.shouldBypass();
1754         if (earlyExit && ctx.shouldComplete()) {
1755           break;
1756         }
1757       }
1758 
1759       ctx.postEnvCall(env);
1760     }
1761     return bypass;
1762   }
1763 }