View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.UUID;
30  import java.util.concurrent.ArrayBlockingQueue;
31  import java.util.concurrent.BlockingQueue;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.regex.Matcher;
35  
36  import com.google.common.collect.ImmutableList;
37  import com.google.common.collect.Lists;
38  import com.google.protobuf.Message;
39  import com.google.protobuf.Service;
40  
41  import org.apache.commons.collections.map.AbstractReferenceMap;
42  import org.apache.commons.collections.map.ReferenceMap;
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.Coprocessor;
53  import org.apache.hadoop.hbase.CoprocessorEnvironment;
54  import org.apache.hadoop.hbase.HBaseConfiguration;
55  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionInfo;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.client.Append;
60  import org.apache.hadoop.hbase.client.Delete;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.client.Increment;
64  import org.apache.hadoop.hbase.client.Mutation;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.Result;
67  import org.apache.hadoop.hbase.client.Scan;
68  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
69  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
70  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
71  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
72  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
73  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
74  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
75  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
76  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
77  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
78  import org.apache.hadoop.hbase.io.Reference;
79  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
80  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
81  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
82  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
85  import org.apache.hadoop.hbase.util.Bytes;
86  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
87  import org.apache.hadoop.hbase.util.Pair;
88  
89  /**
90   * Implements the coprocessor environment and runtime support for coprocessors
91   * loaded within a {@link HRegion}.
92   */
93  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
94  @InterfaceStability.Evolving
95  public class RegionCoprocessorHost
96      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
97  
98    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
99    // The shared data map
100   private static ReferenceMap sharedDataMap =
101       new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
102 
103   /**
104    * Encapsulation of the environment of each coprocessor
105    */
106   static class RegionEnvironment extends CoprocessorHost.Environment
107       implements RegionCoprocessorEnvironment {
108 
109     private HRegion region;
110     private RegionServerServices rsServices;
111     ConcurrentMap<String, Object> sharedData;
112     private static final int LATENCY_BUFFER_SIZE = 100;
113     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
114         LATENCY_BUFFER_SIZE);
115     private final boolean useLegacyPre;
116     private final boolean useLegacyPost;
117 
118     /**
119      * Constructor
120      * @param impl the coprocessor instance
121      * @param priority chaining priority
122      */
123     public RegionEnvironment(final Coprocessor impl, final int priority,
124         final int seq, final Configuration conf, final HRegion region,
125         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
126       super(impl, priority, seq, conf);
127       this.region = region;
128       this.rsServices = services;
129       this.sharedData = sharedData;
130       // Pick which version of the WAL related events we'll call.
131       // This way we avoid calling the new version on older RegionObservers so
132       // we can maintain binary compatibility.
133       // See notes in javadoc for RegionObserver
134       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
135           HRegionInfo.class, WALKey.class, WALEdit.class);
136       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
137           HRegionInfo.class, WALKey.class, WALEdit.class);
138     }
139 
140     /** @return the region */
141     @Override
142     public HRegion getRegion() {
143       return region;
144     }
145 
146     /** @return reference to the region server services */
147     @Override
148     public RegionServerServices getRegionServerServices() {
149       return rsServices;
150     }
151 
152     public void shutdown() {
153       super.shutdown();
154     }
155 
156     @Override
157     public ConcurrentMap<String, Object> getSharedData() {
158       return sharedData;
159     }
160 
161     public void offerExecutionLatency(long latencyNanos) {
162       coprocessorTimeNanos.offer(latencyNanos);
163     }
164 
165     public Collection<Long> getExecutionLatenciesNanos() {
166       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
167       coprocessorTimeNanos.drainTo(latencies);
168       return latencies;
169     }
170 
171     @Override
172     public HRegionInfo getRegionInfo() {
173       return region.getRegionInfo();
174     }
175 
176   }
177 
178   static class TableCoprocessorAttribute {
179     private Path path;
180     private String className;
181     private int priority;
182     private Configuration conf;
183 
184     public TableCoprocessorAttribute(Path path, String className, int priority,
185         Configuration conf) {
186       this.path = path;
187       this.className = className;
188       this.priority = priority;
189       this.conf = conf;
190     }
191 
192     public Path getPath() {
193       return path;
194     }
195 
196     public String getClassName() {
197       return className;
198     }
199 
200     public int getPriority() {
201       return priority;
202     }
203 
204     public Configuration getConf() {
205       return conf;
206     }
207   }
208 
209   /** The region server services */
210   RegionServerServices rsServices;
211   /** The region */
212   HRegion region;
213 
214   /**
215    * Constructor
216    * @param region the region
217    * @param rsServices interface to available region server functionality
218    * @param conf the configuration
219    */
220   public RegionCoprocessorHost(final HRegion region,
221       final RegionServerServices rsServices, final Configuration conf) {
222     super(rsServices);
223     this.conf = conf;
224     this.rsServices = rsServices;
225     this.region = region;
226     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
227 
228     // load system default cp's from configuration.
229     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
230 
231     // load system default cp's for user tables from configuration.
232     if (!region.getRegionInfo().getTable().isSystemTable()) {
233       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
234     }
235 
236     // load Coprocessor From HDFS
237     loadTableCoprocessors(conf);
238   }
239 
240   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
241       HTableDescriptor htd) {
242     List<TableCoprocessorAttribute> result = Lists.newArrayList();
243     for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) {
244       String key = Bytes.toString(e.getKey().get()).trim();
245       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
246         String spec = Bytes.toString(e.getValue().get()).trim();
247         // found one
248         try {
249           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
250           if (matcher.matches()) {
251             // jar file path can be empty if the cp class can be loaded
252             // from class loader.
253             Path path = matcher.group(1).trim().isEmpty() ?
254                 null : new Path(matcher.group(1).trim());
255             String className = matcher.group(2).trim();
256             if (className.isEmpty()) {
257               LOG.error("Malformed table coprocessor specification: key=" +
258                 key + ", spec: " + spec);
259               continue;
260             }
261             int priority = matcher.group(3).trim().isEmpty() ?
262                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
263             String cfgSpec = null;
264             try {
265               cfgSpec = matcher.group(4);
266             } catch (IndexOutOfBoundsException ex) {
267               // ignore
268             }
269             Configuration ourConf;
270             if (cfgSpec != null) {
271               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
272               // do an explicit deep copy of the passed configuration
273               ourConf = new Configuration(false);
274               HBaseConfiguration.merge(ourConf, conf);
275               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
276               while (m.find()) {
277                 ourConf.set(m.group(1), m.group(2));
278               }
279             } else {
280               ourConf = conf;
281             }
282             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
283           } else {
284             LOG.error("Malformed table coprocessor specification: key=" + key +
285               ", spec: " + spec);
286           }
287         } catch (Exception ioe) {
288           LOG.error("Malformed table coprocessor specification: key=" + key +
289             ", spec: " + spec);
290         }
291       }
292     }
293     return result;
294   }
295 
296   /**
297    * Sanity check the table coprocessor attributes of the supplied schema. Will
298    * throw an exception if there is a problem.
299    * @param conf
300    * @param htd
301    * @throws IOException
302    */
303   public static void testTableCoprocessorAttrs(final Configuration conf,
304       final HTableDescriptor htd) throws IOException {
305     String pathPrefix = UUID.randomUUID().toString();
306     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
307       if (attr.getPriority() < 0) {
308         throw new IOException("Priority for coprocessor " + attr.getClassName() +
309           " cannot be less than 0");
310       }
311       ClassLoader old = Thread.currentThread().getContextClassLoader();
312       try {
313         ClassLoader cl;
314         if (attr.getPath() != null) {
315           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
316             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
317         } else {
318           cl = CoprocessorHost.class.getClassLoader();
319         }
320         Thread.currentThread().setContextClassLoader(cl);
321         cl.loadClass(attr.getClassName());
322       } catch (ClassNotFoundException e) {
323         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
324       } finally {
325         Thread.currentThread().setContextClassLoader(old);
326       }
327     }
328   }
329 
330   void loadTableCoprocessors(final Configuration conf) {
331     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
332       DEFAULT_COPROCESSORS_ENABLED);
333     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
334       DEFAULT_USER_COPROCESSORS_ENABLED);
335     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
336       return;
337     }
338 
339     // scan the table attributes for coprocessor load specifications
340     // initialize the coprocessors
341     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
342     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
343         region.getTableDesc())) {
344       // Load encompasses classloading and coprocessor initialization
345       try {
346         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
347           attr.getConf());
348         configured.add(env);
349         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
350             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
351       } catch (Throwable t) {
352         // Coprocessor failed to load, do we abort on error?
353         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
354           abortServer(attr.getClassName(), t);
355         } else {
356           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
357         }
358       }
359     }
360     // add together to coprocessor set for COW efficiency
361     coprocessors.addAll(configured);
362   }
363 
364   @Override
365   public RegionEnvironment createEnvironment(Class<?> implClass,
366       Coprocessor instance, int priority, int seq, Configuration conf) {
367     // Check if it's an Endpoint.
368     // Due to current dynamic protocol design, Endpoint
369     // uses a different way to be registered and executed.
370     // It uses a visitor pattern to invoke registered Endpoint
371     // method.
372     for (Class<?> c : implClass.getInterfaces()) {
373       if (CoprocessorService.class.isAssignableFrom(c)) {
374         region.registerService( ((CoprocessorService)instance).getService() );
375       }
376     }
377     ConcurrentMap<String, Object> classData;
378     // make sure only one thread can add maps
379     synchronized (sharedDataMap) {
380       // as long as at least one RegionEnvironment holds on to its classData it will
381       // remain in this map
382       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
383       if (classData == null) {
384         classData = new ConcurrentHashMap<String, Object>();
385         sharedDataMap.put(implClass.getName(), classData);
386       }
387     }
388     return new RegionEnvironment(instance, priority, seq, conf, region,
389         rsServices, classData);
390   }
391 
392   /**
393    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
394    *
395    * For example, {@link
396    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
397    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
398    *
399    * See also
400    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
401    *    CoprocessorEnvironment, Throwable)}
402    * @param env The coprocessor that threw the exception.
403    * @param e The exception that was thrown.
404    */
405   private void handleCoprocessorThrowableNoRethrow(
406       final CoprocessorEnvironment env, final Throwable e) {
407     try {
408       handleCoprocessorThrowable(env,e);
409     } catch (IOException ioe) {
410       // We cannot throw exceptions from the caller hook, so ignore.
411       LOG.warn(
412         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
413         e + ". Ignoring.",e);
414     }
415   }
416 
417   /**
418    * Invoked before a region open.
419    *
420    * @throws IOException Signals that an I/O exception has occurred.
421    */
422   public void preOpen() throws IOException {
423     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
424       @Override
425       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
426           throws IOException {
427         oserver.preOpen(ctx);
428       }
429     });
430   }
431 
432   /**
433    * Invoked after a region open
434    */
435   public void postOpen() {
436     try {
437       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
438         @Override
439         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
440             throws IOException {
441           oserver.postOpen(ctx);
442         }
443       });
444     } catch (IOException e) {
445       LOG.warn(e);
446     }
447   }
448 
449   /**
450    * Invoked after log replay on region
451    */
452   public void postLogReplay() {
453     try {
454       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
455         @Override
456         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
457             throws IOException {
458           oserver.postLogReplay(ctx);
459         }
460       });
461     } catch (IOException e) {
462       LOG.warn(e);
463     }
464   }
465 
466   /**
467    * Invoked before a region is closed
468    * @param abortRequested true if the server is aborting
469    */
470   public void preClose(final boolean abortRequested) throws IOException {
471     execOperation(false, new RegionOperation() {
472       @Override
473       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
474           throws IOException {
475         oserver.preClose(ctx, abortRequested);
476       }
477     });
478   }
479 
480   /**
481    * Invoked after a region is closed
482    * @param abortRequested true if the server is aborting
483    */
484   public void postClose(final boolean abortRequested) {
485     try {
486       execOperation(false, new RegionOperation() {
487         @Override
488         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
489             throws IOException {
490           oserver.postClose(ctx, abortRequested);
491         }
492         public void postEnvCall(RegionEnvironment env) {
493           shutdown(env);
494         }
495       });
496     } catch (IOException e) {
497       LOG.warn(e);
498     }
499   }
500 
501   /**
502    * See
503    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
504    */
505   public InternalScanner preCompactScannerOpen(final Store store,
506       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
507       final CompactionRequest request) throws IOException {
508     return execOperationWithResult(null,
509         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
510       @Override
511       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
512           throws IOException {
513         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
514           earliestPutTs, getResult(), request));
515       }
516     });
517   }
518 
519   /**
520    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
521    * available candidates.
522    * @param store The store where compaction is being requested
523    * @param candidates The currently available store files
524    * @param request custom compaction request
525    * @return If {@code true}, skip the normal selection process and use the current list
526    * @throws IOException
527    */
528   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
529       final CompactionRequest request) throws IOException {
530     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
531       @Override
532       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
533           throws IOException {
534         oserver.preCompactSelection(ctx, store, candidates, request);
535       }
536     });
537   }
538 
539   /**
540    * Called after the {@link StoreFile}s to be compacted have been selected from the available
541    * candidates.
542    * @param store The store where compaction is being requested
543    * @param selected The store files selected to compact
544    * @param request custom compaction
545    */
546   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
547       final CompactionRequest request) {
548     try {
549       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
550         @Override
551         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
552             throws IOException {
553           oserver.postCompactSelection(ctx, store, selected, request);
554         }
555       });
556     } catch (IOException e) {
557       LOG.warn(e);
558     }
559   }
560 
561   /**
562    * Called prior to rewriting the store files selected for compaction
563    * @param store the store being compacted
564    * @param scanner the scanner used to read store data during compaction
565    * @param scanType type of Scan
566    * @param request the compaction that will be executed
567    * @throws IOException
568    */
569   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
570       final ScanType scanType, final CompactionRequest request) throws IOException {
571     return execOperationWithResult(false, scanner,
572         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
573       @Override
574       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
575           throws IOException {
576         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
577       }
578     });
579   }
580 
581   /**
582    * Called after the store compaction has completed.
583    * @param store the store being compacted
584    * @param resultFile the new store file written during compaction
585    * @param request the compaction that is being executed
586    * @throws IOException
587    */
588   public void postCompact(final Store store, final StoreFile resultFile,
589       final CompactionRequest request) throws IOException {
590     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
591       @Override
592       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
593           throws IOException {
594         oserver.postCompact(ctx, store, resultFile, request);
595       }
596     });
597   }
598 
599   /**
600    * Invoked before a memstore flush
601    * @throws IOException
602    */
603   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
604       throws IOException {
605     return execOperationWithResult(false, scanner,
606         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
607       @Override
608       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
609           throws IOException {
610         setResult(oserver.preFlush(ctx, store, getResult()));
611       }
612     });
613   }
614 
615   /**
616    * Invoked before a memstore flush
617    * @throws IOException
618    */
619   public void preFlush() throws IOException {
620     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
621       @Override
622       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
623           throws IOException {
624         oserver.preFlush(ctx);
625       }
626     });
627   }
628 
629   /**
630    * See
631    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
632    *    Store, KeyValueScanner, InternalScanner)}
633    */
634   public InternalScanner preFlushScannerOpen(final Store store,
635       final KeyValueScanner memstoreScanner) throws IOException {
636     return execOperationWithResult(null,
637         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
638       @Override
639       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
640           throws IOException {
641         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
642       }
643     });
644   }
645 
646   /**
647    * Invoked after a memstore flush
648    * @throws IOException
649    */
650   public void postFlush() throws IOException {
651     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
652       @Override
653       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
654           throws IOException {
655         oserver.postFlush(ctx);
656       }
657     });
658   }
659 
660   /**
661    * Invoked after a memstore flush
662    * @throws IOException
663    */
664   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
665     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
666       @Override
667       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
668           throws IOException {
669         oserver.postFlush(ctx, store, storeFile);
670       }
671     });
672   }
673 
674   /**
675    * Invoked just before a split
676    * @throws IOException
677    */
678   // TODO: Deprecate this
679   public void preSplit() throws IOException {
680     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
681       @Override
682       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
683           throws IOException {
684         oserver.preSplit(ctx);
685       }
686     });
687   }
688 
689   /**
690    * Invoked just before a split
691    * @throws IOException
692    */
693   public void preSplit(final byte[] splitRow) throws IOException {
694     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
695       @Override
696       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
697           throws IOException {
698         oserver.preSplit(ctx, splitRow);
699       }
700     });
701   }
702 
703   /**
704    * Invoked just after a split
705    * @param l the new left-hand daughter region
706    * @param r the new right-hand daughter region
707    * @throws IOException
708    */
709   public void postSplit(final HRegion l, final HRegion r) throws IOException {
710     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
711       @Override
712       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
713           throws IOException {
714         oserver.postSplit(ctx, l, r);
715       }
716     });
717   }
718 
719   public boolean preSplitBeforePONR(final byte[] splitKey,
720       final List<Mutation> metaEntries) throws IOException {
721     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
722       @Override
723       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
724           throws IOException {
725         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
726       }
727     });
728   }
729 
730   public void preSplitAfterPONR() throws IOException {
731     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
732       @Override
733       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
734           throws IOException {
735         oserver.preSplitAfterPONR(ctx);
736       }
737     });
738   }
739 
740   /**
741    * Invoked just before the rollback of a failed split is started
742    * @throws IOException
743    */
744   public void preRollBackSplit() throws IOException {
745     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
746       @Override
747       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
748           throws IOException {
749         oserver.preRollBackSplit(ctx);
750       }
751     });
752   }
753 
754   /**
755    * Invoked just after the rollback of a failed split is done
756    * @throws IOException
757    */
758   public void postRollBackSplit() throws IOException {
759     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
760       @Override
761       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
762           throws IOException {
763         oserver.postRollBackSplit(ctx);
764       }
765     });
766   }
767 
768   /**
769    * Invoked after a split is completed irrespective of a failure or success.
770    * @throws IOException
771    */
772   public void postCompleteSplit() throws IOException {
773     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
774       @Override
775       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
776           throws IOException {
777         oserver.postCompleteSplit(ctx);
778       }
779     });
780   }
781 
782   // RegionObserver support
783 
784   /**
785    * @param row the row key
786    * @param family the family
787    * @param result the result set from the region
788    * @return true if default processing should be bypassed
789    * @exception IOException Exception
790    */
791   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
792       final Result result) throws IOException {
793     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
794       @Override
795       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
796           throws IOException {
797         oserver.preGetClosestRowBefore(ctx, row, family, result);
798       }
799     });
800   }
801 
802   /**
803    * @param row the row key
804    * @param family the family
805    * @param result the result set from the region
806    * @exception IOException Exception
807    */
808   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
809       final Result result) throws IOException {
810     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
811       @Override
812       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
813           throws IOException {
814         oserver.postGetClosestRowBefore(ctx, row, family, result);
815       }
816     });
817   }
818 
819   /**
820    * @param get the Get request
821    * @return true if default processing should be bypassed
822    * @exception IOException Exception
823    */
824   public boolean preGet(final Get get, final List<Cell> results)
825       throws IOException {
826     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
827       @Override
828       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
829           throws IOException {
830         oserver.preGetOp(ctx, get, results);
831       }
832     });
833   }
834 
835   /**
836    * @param get the Get request
837    * @param results the result sett
838    * @exception IOException Exception
839    */
840   public void postGet(final Get get, final List<Cell> results)
841       throws IOException {
842     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
843       @Override
844       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
845           throws IOException {
846         oserver.postGetOp(ctx, get, results);
847       }
848     });
849   }
850 
851   /**
852    * @param get the Get request
853    * @return true or false to return to client if bypassing normal operation,
854    * or null otherwise
855    * @exception IOException Exception
856    */
857   public Boolean preExists(final Get get) throws IOException {
858     return execOperationWithResult(true, false,
859         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
860       @Override
861       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
862           throws IOException {
863         setResult(oserver.preExists(ctx, get, getResult()));
864       }
865     });
866   }
867 
868   /**
869    * @param get the Get request
870    * @param exists the result returned by the region server
871    * @return the result to return to the client
872    * @exception IOException Exception
873    */
874   public boolean postExists(final Get get, boolean exists)
875       throws IOException {
876     return execOperationWithResult(exists,
877         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
878       @Override
879       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
880           throws IOException {
881         setResult(oserver.postExists(ctx, get, getResult()));
882       }
883     });
884   }
885 
886   /**
887    * @param put The Put object
888    * @param edit The WALEdit object.
889    * @param durability The durability used
890    * @return true if default processing should be bypassed
891    * @exception IOException Exception
892    */
893   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
894       throws IOException {
895     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
896       @Override
897       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
898           throws IOException {
899         oserver.prePut(ctx, put, edit, durability);
900       }
901     });
902   }
903 
904   /**
905    * @param mutation - the current mutation
906    * @param kv - the current cell
907    * @param byteNow - current timestamp in bytes
908    * @param get - the get that could be used
909    * Note that the get only does not specify the family and qualifier that should be used
910    * @return true if default processing should be bypassed
911    * @exception IOException
912    *              Exception
913    */
914   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
915       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
916     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
917       @Override
918       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
919           throws IOException {
920         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
921       }
922     });
923   }
924 
925   /**
926    * @param put The Put object
927    * @param edit The WALEdit object.
928    * @param durability The durability used
929    * @exception IOException Exception
930    */
931   public void postPut(final Put put, final WALEdit edit, final Durability durability)
932       throws IOException {
933     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
934       @Override
935       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
936           throws IOException {
937         oserver.postPut(ctx, put, edit, durability);
938       }
939     });
940   }
941 
942   /**
943    * @param delete The Delete object
944    * @param edit The WALEdit object.
945    * @param durability The durability used
946    * @return true if default processing should be bypassed
947    * @exception IOException Exception
948    */
949   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
950       throws IOException {
951     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
952       @Override
953       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
954           throws IOException {
955         oserver.preDelete(ctx, delete, edit, durability);
956       }
957     });
958   }
959 
960   /**
961    * @param delete The Delete object
962    * @param edit The WALEdit object.
963    * @param durability The durability used
964    * @exception IOException Exception
965    */
966   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
967       throws IOException {
968     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
969       @Override
970       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
971           throws IOException {
972         oserver.postDelete(ctx, delete, edit, durability);
973       }
974     });
975   }
976 
977   /**
978    * @param miniBatchOp
979    * @return true if default processing should be bypassed
980    * @throws IOException
981    */
982   public boolean preBatchMutate(
983       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
984     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
985       @Override
986       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
987           throws IOException {
988         oserver.preBatchMutate(ctx, miniBatchOp);
989       }
990     });
991   }
992 
993   /**
994    * @param miniBatchOp
995    * @throws IOException
996    */
997   public void postBatchMutate(
998       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
999     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1000       @Override
1001       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1002           throws IOException {
1003         oserver.postBatchMutate(ctx, miniBatchOp);
1004       }
1005     });
1006   }
1007 
1008   public void postBatchMutateIndispensably(
1009       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1010       throws IOException {
1011     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1012       @Override
1013       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1014           throws IOException {
1015         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1016       }
1017     });
1018   }
1019 
1020   /**
1021    * @param row row to check
1022    * @param family column family
1023    * @param qualifier column qualifier
1024    * @param compareOp the comparison operation
1025    * @param comparator the comparator
1026    * @param put data to put if check succeeds
1027    * @return true or false to return to client if default processing should
1028    * be bypassed, or null otherwise
1029    * @throws IOException e
1030    */
1031   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1032       final byte [] qualifier, final CompareOp compareOp,
1033       final ByteArrayComparable comparator, final Put put)
1034       throws IOException {
1035     return execOperationWithResult(true, false,
1036         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1037       @Override
1038       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1039           throws IOException {
1040         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1041           compareOp, comparator, put, getResult()));
1042       }
1043     });
1044   }
1045 
1046   /**
1047    * @param row row to check
1048    * @param family column family
1049    * @param qualifier column qualifier
1050    * @param compareOp the comparison operation
1051    * @param comparator the comparator
1052    * @param put data to put if check succeeds
1053    * @return true or false to return to client if default processing should
1054    * be bypassed, or null otherwise
1055    * @throws IOException e
1056    */
1057   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1058       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1059       final Put put) throws IOException {
1060     return execOperationWithResult(true, false,
1061         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1062       @Override
1063       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1064           throws IOException {
1065         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1066           compareOp, comparator, put, getResult()));
1067       }
1068     });
1069   }
1070 
1071   /**
1072    * @param row row to check
1073    * @param family column family
1074    * @param qualifier column qualifier
1075    * @param compareOp the comparison operation
1076    * @param comparator the comparator
1077    * @param put data to put if check succeeds
1078    * @throws IOException e
1079    */
1080   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1081       final byte [] qualifier, final CompareOp compareOp,
1082       final ByteArrayComparable comparator, final Put put,
1083       boolean result) throws IOException {
1084     return execOperationWithResult(result,
1085         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1086       @Override
1087       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1088           throws IOException {
1089         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1090           compareOp, comparator, put, getResult()));
1091       }
1092     });
1093   }
1094 
1095   /**
1096    * @param row row to check
1097    * @param family column family
1098    * @param qualifier column qualifier
1099    * @param compareOp the comparison operation
1100    * @param comparator the comparator
1101    * @param delete delete to commit if check succeeds
1102    * @return true or false to return to client if default processing should
1103    * be bypassed, or null otherwise
1104    * @throws IOException e
1105    */
1106   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1107       final byte [] qualifier, final CompareOp compareOp,
1108       final ByteArrayComparable comparator, final Delete delete)
1109       throws IOException {
1110     return execOperationWithResult(true, false,
1111         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1112       @Override
1113       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1114           throws IOException {
1115         setResult(oserver.preCheckAndDelete(ctx, row, family,
1116             qualifier, compareOp, comparator, delete, getResult()));
1117       }
1118     });
1119   }
1120 
1121   /**
1122    * @param row row to check
1123    * @param family column family
1124    * @param qualifier column qualifier
1125    * @param compareOp the comparison operation
1126    * @param comparator the comparator
1127    * @param delete delete to commit if check succeeds
1128    * @return true or false to return to client if default processing should
1129    * be bypassed, or null otherwise
1130    * @throws IOException e
1131    */
1132   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1133       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1134       final Delete delete) throws IOException {
1135     return execOperationWithResult(true, false,
1136         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1137       @Override
1138       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1139           throws IOException {
1140         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1141               family, qualifier, compareOp, comparator, delete, getResult()));
1142       }
1143     });
1144   }
1145 
1146   /**
1147    * @param row row to check
1148    * @param family column family
1149    * @param qualifier column qualifier
1150    * @param compareOp the comparison operation
1151    * @param comparator the comparator
1152    * @param delete delete to commit if check succeeds
1153    * @throws IOException e
1154    */
1155   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1156       final byte [] qualifier, final CompareOp compareOp,
1157       final ByteArrayComparable comparator, final Delete delete,
1158       boolean result) throws IOException {
1159     return execOperationWithResult(result,
1160         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1161       @Override
1162       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1163           throws IOException {
1164         setResult(oserver.postCheckAndDelete(ctx, row, family,
1165             qualifier, compareOp, comparator, delete, getResult()));
1166       }
1167     });
1168   }
1169 
1170   /**
1171    * @param append append object
1172    * @return result to return to client if default operation should be
1173    * bypassed, null otherwise
1174    * @throws IOException if an error occurred on the coprocessor
1175    */
1176   public Result preAppend(final Append append) throws IOException {
1177     return execOperationWithResult(true, null,
1178         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1179       @Override
1180       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1181           throws IOException {
1182         setResult(oserver.preAppend(ctx, append));
1183       }
1184     });
1185   }
1186 
1187   /**
1188    * @param append append object
1189    * @return result to return to client if default operation should be
1190    * bypassed, null otherwise
1191    * @throws IOException if an error occurred on the coprocessor
1192    */
1193   public Result preAppendAfterRowLock(final Append append) throws IOException {
1194     return execOperationWithResult(true, null,
1195         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1196       @Override
1197       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1198           throws IOException {
1199         setResult(oserver.preAppendAfterRowLock(ctx, append));
1200       }
1201     });
1202   }
1203 
1204   /**
1205    * @param increment increment object
1206    * @return result to return to client if default operation should be
1207    * bypassed, null otherwise
1208    * @throws IOException if an error occurred on the coprocessor
1209    */
1210   public Result preIncrement(final Increment increment) throws IOException {
1211     return execOperationWithResult(true, null,
1212         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1213       @Override
1214       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1215           throws IOException {
1216         setResult(oserver.preIncrement(ctx, increment));
1217       }
1218     });
1219   }
1220 
1221   /**
1222    * @param increment increment object
1223    * @return result to return to client if default operation should be
1224    * bypassed, null otherwise
1225    * @throws IOException if an error occurred on the coprocessor
1226    */
1227   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1228     return execOperationWithResult(true, null,
1229         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1230       @Override
1231       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1232           throws IOException {
1233         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1234       }
1235     });
1236   }
1237 
1238   /**
1239    * @param append Append object
1240    * @param result the result returned by the append
1241    * @throws IOException if an error occurred on the coprocessor
1242    */
1243   public void postAppend(final Append append, final Result result) throws IOException {
1244     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1245       @Override
1246       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1247           throws IOException {
1248         oserver.postAppend(ctx, append, result);
1249       }
1250     });
1251   }
1252 
1253   /**
1254    * @param increment increment object
1255    * @param result the result returned by postIncrement
1256    * @throws IOException if an error occurred on the coprocessor
1257    */
1258   public Result postIncrement(final Increment increment, Result result) throws IOException {
1259     return execOperationWithResult(result,
1260         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1261       @Override
1262       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1263           throws IOException {
1264         setResult(oserver.postIncrement(ctx, increment, getResult()));
1265       }
1266     });
1267   }
1268 
1269   /**
1270    * @param scan the Scan specification
1271    * @return scanner id to return to client if default operation should be
1272    * bypassed, false otherwise
1273    * @exception IOException Exception
1274    */
1275   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1276     return execOperationWithResult(true, null,
1277         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1278       @Override
1279       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1280           throws IOException {
1281         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1282       }
1283     });
1284   }
1285 
1286   /**
1287    * See
1288    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1289    *    Store, Scan, NavigableSet, KeyValueScanner)}
1290    */
1291   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1292       final NavigableSet<byte[]> targetCols) throws IOException {
1293     return execOperationWithResult(null,
1294         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1295       @Override
1296       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1297           throws IOException {
1298         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1299       }
1300     });
1301   }
1302 
1303   /**
1304    * @param scan the Scan specification
1305    * @param s the scanner
1306    * @return the scanner instance to use
1307    * @exception IOException Exception
1308    */
1309   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1310     return execOperationWithResult(s,
1311         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1312       @Override
1313       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1314           throws IOException {
1315         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1316       }
1317     });
1318   }
1319 
1320   /**
1321    * @param s the scanner
1322    * @param results the result set returned by the region server
1323    * @param limit the maximum number of results to return
1324    * @return 'has next' indication to client if bypassing default behavior, or
1325    * null otherwise
1326    * @exception IOException Exception
1327    */
1328   public Boolean preScannerNext(final InternalScanner s,
1329       final List<Result> results, final int limit) throws IOException {
1330     return execOperationWithResult(true, false,
1331         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1332       @Override
1333       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1334           throws IOException {
1335         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1336       }
1337     });
1338   }
1339 
1340   /**
1341    * @param s the scanner
1342    * @param results the result set returned by the region server
1343    * @param limit the maximum number of results to return
1344    * @param hasMore
1345    * @return 'has more' indication to give to client
1346    * @exception IOException Exception
1347    */
1348   public boolean postScannerNext(final InternalScanner s,
1349       final List<Result> results, final int limit, boolean hasMore)
1350       throws IOException {
1351     return execOperationWithResult(hasMore,
1352         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1353       @Override
1354       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1355           throws IOException {
1356         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1357       }
1358     });
1359   }
1360 
1361   /**
1362    * This will be called by the scan flow when the current scanned row is being filtered out by the
1363    * filter.
1364    * @param s the scanner
1365    * @param currentRow The current rowkey which got filtered out
1366    * @param offset offset to rowkey
1367    * @param length length of rowkey
1368    * @return whether more rows are available for the scanner or not
1369    * @throws IOException
1370    */
1371   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1372       final int offset, final short length) throws IOException {
1373     return execOperationWithResult(true,
1374         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1375       @Override
1376       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1377           throws IOException {
1378         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1379       }
1380     });
1381   }
1382 
1383   /**
1384    * @param s the scanner
1385    * @return true if default behavior should be bypassed, false otherwise
1386    * @exception IOException Exception
1387    */
1388   public boolean preScannerClose(final InternalScanner s) throws IOException {
1389     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1390       @Override
1391       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1392           throws IOException {
1393         oserver.preScannerClose(ctx, s);
1394       }
1395     });
1396   }
1397 
1398   /**
1399    * @exception IOException Exception
1400    */
1401   public void postScannerClose(final InternalScanner s) throws IOException {
1402     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1403       @Override
1404       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1405           throws IOException {
1406         oserver.postScannerClose(ctx, s);
1407       }
1408     });
1409   }
1410 
1411   /**
1412    * @param info
1413    * @param logKey
1414    * @param logEdit
1415    * @return true if default behavior should be bypassed, false otherwise
1416    * @throws IOException
1417    */
1418   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1419       final WALEdit logEdit) throws IOException {
1420     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1421       @Override
1422       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1423           throws IOException {
1424         // Once we don't need to support the legacy call, replace RegionOperation with a version
1425         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1426         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1427         if (env.useLegacyPre) {
1428           if (logKey instanceof HLogKey) {
1429             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1430           } else {
1431             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1432           }
1433         } else {
1434           oserver.preWALRestore(ctx, info, logKey, logEdit);
1435         }
1436       }
1437     });
1438   }
1439 
1440   /**
1441    * @return true if default behavior should be bypassed, false otherwise
1442    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1443    */
1444   @Deprecated
1445   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1446       final WALEdit logEdit) throws IOException {
1447     return preWALRestore(info, (WALKey)logKey, logEdit);
1448   }
1449 
1450   /**
1451    * @param info
1452    * @param logKey
1453    * @param logEdit
1454    * @throws IOException
1455    */
1456   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1457       throws IOException {
1458     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1459       @Override
1460       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1461           throws IOException {
1462         // Once we don't need to support the legacy call, replace RegionOperation with a version
1463         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1464         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1465         if (env.useLegacyPost) {
1466           if (logKey instanceof HLogKey) {
1467             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1468           } else {
1469             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1470           }
1471         } else {
1472           oserver.postWALRestore(ctx, info, logKey, logEdit);
1473         }
1474       }
1475     });
1476   }
1477 
1478   /**
1479    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1480    */
1481   @Deprecated
1482   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1483       throws IOException {
1484     postWALRestore(info, (WALKey)logKey, logEdit);
1485   }
1486 
1487   /**
1488    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1489    * @return true if the default operation should be bypassed
1490    * @throws IOException
1491    */
1492   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1493     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1494       @Override
1495       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1496           throws IOException {
1497         oserver.preBulkLoadHFile(ctx, familyPaths);
1498       }
1499     });
1500   }
1501 
1502   /**
1503    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1504    * @param hasLoaded whether load was successful or not
1505    * @return the possibly modified value of hasLoaded
1506    * @throws IOException
1507    */
1508   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1509       boolean hasLoaded) throws IOException {
1510     return execOperationWithResult(hasLoaded,
1511         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1512       @Override
1513       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1514           throws IOException {
1515         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1516       }
1517     });
1518   }
1519 
1520   public void postStartRegionOperation(final Operation op) throws IOException {
1521     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1522       @Override
1523       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1524           throws IOException {
1525         oserver.postStartRegionOperation(ctx, op);
1526       }
1527     });
1528   }
1529 
1530   public void postCloseRegionOperation(final Operation op) throws IOException {
1531     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1532       @Override
1533       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1534           throws IOException {
1535         oserver.postCloseRegionOperation(ctx, op);
1536       }
1537     });
1538   }
1539 
1540   /**
1541    * @param fs fileystem to read from
1542    * @param p path to the file
1543    * @param in {@link FSDataInputStreamWrapper}
1544    * @param size Full size of the file
1545    * @param cacheConf
1546    * @param r original reference file. This will be not null only when reading a split file.
1547    * @return a Reader instance to use instead of the base reader if overriding
1548    * default behavior, null otherwise
1549    * @throws IOException
1550    */
1551   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1552       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1553       final Reference r) throws IOException {
1554     return execOperationWithResult(null,
1555         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1556       @Override
1557       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1558           throws IOException {
1559         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1560       }
1561     });
1562   }
1563 
1564   /**
1565    * @param fs fileystem to read from
1566    * @param p path to the file
1567    * @param in {@link FSDataInputStreamWrapper}
1568    * @param size Full size of the file
1569    * @param cacheConf
1570    * @param r original reference file. This will be not null only when reading a split file.
1571    * @param reader the base reader instance
1572    * @return The reader to use
1573    * @throws IOException
1574    */
1575   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1576       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1577       final Reference r, final StoreFile.Reader reader) throws IOException {
1578     return execOperationWithResult(reader,
1579         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1580       @Override
1581       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1582           throws IOException {
1583         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1584       }
1585     });
1586   }
1587 
1588   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1589       final Cell oldCell, Cell newCell) throws IOException {
1590     return execOperationWithResult(newCell,
1591         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1592       @Override
1593       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1594           throws IOException {
1595         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1596       }
1597     });
1598   }
1599 
1600   public Message preEndpointInvocation(final Service service, final String methodName,
1601       Message request) throws IOException {
1602     return execOperationWithResult(request,
1603         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1604       @Override
1605       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1606           throws IOException {
1607         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1608       }
1609     });
1610   }
1611 
1612   public void postEndpointInvocation(final Service service, final String methodName,
1613       final Message request, final Message.Builder responseBuilder) throws IOException {
1614     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1615       @Override
1616       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1617           throws IOException {
1618         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1619       }
1620     });
1621   }
1622 
1623   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1624     return execOperationWithResult(tracker,
1625         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1626       @Override
1627       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1628           throws IOException {
1629         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1630       }
1631     });
1632   }
1633 
1634   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1635     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1636     for (RegionEnvironment env : coprocessors) {
1637       DescriptiveStatistics ds = new DescriptiveStatistics();
1638       if (env.getInstance() instanceof RegionObserver) {
1639         for (Long time : env.getExecutionLatenciesNanos()) {
1640           ds.addValue(time);
1641         }
1642         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1643         if (ds.getN() == 0) {
1644           ds.addValue(0);
1645         }
1646         results.put(env.getInstance().getClass().getSimpleName(), ds);
1647       }
1648     }
1649     return results;
1650   }
1651 
1652   private static abstract class CoprocessorOperation
1653       extends ObserverContext<RegionCoprocessorEnvironment> {
1654     public abstract void call(Coprocessor observer,
1655         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1656     public abstract boolean hasCall(Coprocessor observer);
1657     public void postEnvCall(RegionEnvironment env) { }
1658   }
1659 
1660   private static abstract class RegionOperation extends CoprocessorOperation {
1661     public abstract void call(RegionObserver observer,
1662         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1663 
1664     public boolean hasCall(Coprocessor observer) {
1665       return observer instanceof RegionObserver;
1666     }
1667 
1668     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1669         throws IOException {
1670       call((RegionObserver)observer, ctx);
1671     }
1672   }
1673 
1674   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1675     private T result = null;
1676     public void setResult(final T result) { this.result = result; }
1677     public T getResult() { return this.result; }
1678   }
1679 
1680   private static abstract class EndpointOperation extends CoprocessorOperation {
1681     public abstract void call(EndpointObserver observer,
1682         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1683 
1684     public boolean hasCall(Coprocessor observer) {
1685       return observer instanceof EndpointObserver;
1686     }
1687 
1688     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1689         throws IOException {
1690       call((EndpointObserver)observer, ctx);
1691     }
1692   }
1693 
1694   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1695     private T result = null;
1696     public void setResult(final T result) { this.result = result; }
1697     public T getResult() { return this.result; }
1698   }
1699 
1700   private boolean execOperation(final CoprocessorOperation ctx)
1701       throws IOException {
1702     return execOperation(true, ctx);
1703   }
1704 
1705   private <T> T execOperationWithResult(final T defaultValue,
1706       final RegionOperationWithResult<T> ctx) throws IOException {
1707     if (ctx == null) return defaultValue;
1708     ctx.setResult(defaultValue);
1709     execOperation(true, ctx);
1710     return ctx.getResult();
1711   }
1712 
1713   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1714       final RegionOperationWithResult<T> ctx) throws IOException {
1715     boolean bypass = false;
1716     T result = defaultValue;
1717     if (ctx != null) {
1718       ctx.setResult(defaultValue);
1719       bypass = execOperation(true, ctx);
1720       result = ctx.getResult();
1721     }
1722     return bypass == ifBypass ? result : null;
1723   }
1724 
1725   private <T> T execOperationWithResult(final T defaultValue,
1726       final EndpointOperationWithResult<T> ctx) throws IOException {
1727     if (ctx == null) return defaultValue;
1728     ctx.setResult(defaultValue);
1729     execOperation(true, ctx);
1730     return ctx.getResult();
1731   }
1732 
1733   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1734       throws IOException {
1735     boolean bypass = false;
1736     for (RegionEnvironment env: coprocessors) {
1737       Coprocessor observer = env.getInstance();
1738       if (ctx.hasCall(observer)) {
1739         long startTime = System.nanoTime();
1740         ctx.prepare(env);
1741         Thread currentThread = Thread.currentThread();
1742         ClassLoader cl = currentThread.getContextClassLoader();
1743         try {
1744           currentThread.setContextClassLoader(env.getClassLoader());
1745           ctx.call(observer, ctx);
1746         } catch (Throwable e) {
1747           handleCoprocessorThrowable(env, e);
1748         } finally {
1749           currentThread.setContextClassLoader(cl);
1750         }
1751         env.offerExecutionLatency(System.nanoTime() - startTime);
1752         bypass |= ctx.shouldBypass();
1753         if (earlyExit && ctx.shouldComplete()) {
1754           break;
1755         }
1756       }
1757 
1758       ctx.postEnvCall(env);
1759     }
1760     return bypass;
1761   }
1762 }