View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.UUID;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.regex.Matcher;
31  
32  import com.google.common.collect.ImmutableList;
33  import com.google.common.collect.Lists;
34  import com.google.protobuf.Message;
35  import com.google.protobuf.Service;
36  
37  import org.apache.commons.collections.map.AbstractReferenceMap;
38  import org.apache.commons.collections.map.ReferenceMap;
39  import org.apache.commons.lang.ClassUtils;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.Cell;
48  import org.apache.hadoop.hbase.Coprocessor;
49  import org.apache.hadoop.hbase.CoprocessorEnvironment;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.client.Append;
56  import org.apache.hadoop.hbase.client.Delete;
57  import org.apache.hadoop.hbase.client.Durability;
58  import org.apache.hadoop.hbase.client.Get;
59  import org.apache.hadoop.hbase.client.Increment;
60  import org.apache.hadoop.hbase.client.Mutation;
61  import org.apache.hadoop.hbase.client.Put;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
69  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
71  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
72  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
73  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
74  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
75  import org.apache.hadoop.hbase.io.Reference;
76  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
77  import org.apache.hadoop.hbase.regionserver.Region.Operation;
78  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
79  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
80  import org.apache.hadoop.hbase.wal.WALKey;
81  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
82  import org.apache.hadoop.hbase.util.Bytes;
83  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
84  import org.apache.hadoop.hbase.util.Pair;
85  
86  /**
87   * Implements the coprocessor environment and runtime support for coprocessors
88   * loaded within a {@link Region}.
89   */
90  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
91  @InterfaceStability.Evolving
92  public class RegionCoprocessorHost
93      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
94  
95    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
96    // The shared data map
97    private static ReferenceMap sharedDataMap =
98        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
99  
100   // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
101   private final boolean hasCustomPostScannerFilterRow;
102 
103   /**
104    * 
105    * Encapsulation of the environment of each coprocessor
106    */
107   static class RegionEnvironment extends CoprocessorHost.Environment
108       implements RegionCoprocessorEnvironment {
109 
110     private Region region;
111     private RegionServerServices rsServices;
112     ConcurrentMap<String, Object> sharedData;
113     private final boolean useLegacyPre;
114     private final boolean useLegacyPost;
115 
116     /**
117      * Constructor
118      * @param impl the coprocessor instance
119      * @param priority chaining priority
120      */
121     public RegionEnvironment(final Coprocessor impl, final int priority,
122         final int seq, final Configuration conf, final Region region,
123         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
124       super(impl, priority, seq, conf);
125       this.region = region;
126       this.rsServices = services;
127       this.sharedData = sharedData;
128       // Pick which version of the WAL related events we'll call.
129       // This way we avoid calling the new version on older RegionObservers so
130       // we can maintain binary compatibility.
131       // See notes in javadoc for RegionObserver
132       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
133           HRegionInfo.class, WALKey.class, WALEdit.class);
134       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
135           HRegionInfo.class, WALKey.class, WALEdit.class);
136     }
137 
138     /** @return the region */
139     @Override
140     public Region getRegion() {
141       return region;
142     }
143 
144     /** @return reference to the region server services */
145     @Override
146     public RegionServerServices getRegionServerServices() {
147       return rsServices;
148     }
149 
150     public void shutdown() {
151       super.shutdown();
152     }
153 
154     @Override
155     public ConcurrentMap<String, Object> getSharedData() {
156       return sharedData;
157     }
158 
159     @Override
160     public HRegionInfo getRegionInfo() {
161       return region.getRegionInfo();
162     }
163 
164   }
165 
166   static class TableCoprocessorAttribute {
167     private Path path;
168     private String className;
169     private int priority;
170     private Configuration conf;
171 
172     public TableCoprocessorAttribute(Path path, String className, int priority,
173         Configuration conf) {
174       this.path = path;
175       this.className = className;
176       this.priority = priority;
177       this.conf = conf;
178     }
179 
180     public Path getPath() {
181       return path;
182     }
183 
184     public String getClassName() {
185       return className;
186     }
187 
188     public int getPriority() {
189       return priority;
190     }
191 
192     public Configuration getConf() {
193       return conf;
194     }
195   }
196 
197   /** The region server services */
198   RegionServerServices rsServices;
199   /** The region */
200   Region region;
201 
202   /**
203    * Constructor
204    * @param region the region
205    * @param rsServices interface to available region server functionality
206    * @param conf the configuration
207    */
208   public RegionCoprocessorHost(final Region region,
209       final RegionServerServices rsServices, final Configuration conf) {
210     super(rsServices);
211     this.conf = conf;
212     this.rsServices = rsServices;
213     this.region = region;
214     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
215 
216     // load system default cp's from configuration.
217     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
218 
219     // load system default cp's for user tables from configuration.
220     if (!region.getRegionInfo().getTable().isSystemTable()) {
221       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
222     }
223 
224     // load Coprocessor From HDFS
225     loadTableCoprocessors(conf);
226 
227     // now check whether any coprocessor implements postScannerFilterRow
228     boolean hasCustomPostScannerFilterRow = false;
229     out: for (RegionEnvironment env: coprocessors) {
230       if (env.getInstance() instanceof RegionObserver) {
231         Class<?> clazz = env.getInstance().getClass();
232         for(;;) {
233           if (clazz == null) {
234             // we must have directly implemented RegionObserver
235             hasCustomPostScannerFilterRow = true;
236             break out;
237           }
238           if (clazz == BaseRegionObserver.class) {
239             // we reached BaseRegionObserver, try next coprocessor
240             break;
241           }
242           try {
243             clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
244               InternalScanner.class, Cell.class, boolean.class);
245             // this coprocessor has a custom version of postScannerFilterRow
246             hasCustomPostScannerFilterRow = true;
247             break out;
248           } catch (NoSuchMethodException ignore) {
249           }
250           // the deprecated signature still exists
251           try {
252             clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
253               InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
254             // this coprocessor has a custom version of postScannerFilterRow
255             hasCustomPostScannerFilterRow = true;
256             break out;
257           } catch (NoSuchMethodException ignore) {
258           }
259           clazz = clazz.getSuperclass();
260         }
261       }
262     }
263     this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
264   }
265 
266   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
267       HTableDescriptor htd) {
268     List<TableCoprocessorAttribute> result = Lists.newArrayList();
269     for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) {
270       String key = Bytes.toString(e.getKey().get()).trim();
271       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
272         String spec = Bytes.toString(e.getValue().get()).trim();
273         // found one
274         try {
275           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
276           if (matcher.matches()) {
277             // jar file path can be empty if the cp class can be loaded
278             // from class loader.
279             Path path = matcher.group(1).trim().isEmpty() ?
280                 null : new Path(matcher.group(1).trim());
281             String className = matcher.group(2).trim();
282             if (className.isEmpty()) {
283               LOG.error("Malformed table coprocessor specification: key=" +
284                 key + ", spec: " + spec);
285               continue;
286             }
287             String priorityStr = matcher.group(3).trim();
288             int priority = priorityStr.isEmpty() ?
289                 Coprocessor.PRIORITY_USER : Integer.parseInt(priorityStr);
290             String cfgSpec = null;
291             try {
292               cfgSpec = matcher.group(4);
293             } catch (IndexOutOfBoundsException ex) {
294               // ignore
295             }
296             Configuration ourConf;
297             if (cfgSpec != null && !cfgSpec.trim().equals("|")) {
298               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
299               // do an explicit deep copy of the passed configuration
300               ourConf = new Configuration(false);
301               HBaseConfiguration.merge(ourConf, conf);
302               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
303               while (m.find()) {
304                 ourConf.set(m.group(1), m.group(2));
305               }
306             } else {
307               ourConf = conf;
308             }
309             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
310           } else {
311             LOG.error("Malformed table coprocessor specification: key=" + key +
312               ", spec: " + spec);
313           }
314         } catch (Exception ioe) {
315           LOG.error("Malformed table coprocessor specification: key=" + key +
316             ", spec: " + spec);
317         }
318       }
319     }
320     return result;
321   }
322 
323   /**
324    * Sanity check the table coprocessor attributes of the supplied schema. Will
325    * throw an exception if there is a problem.
326    * @param conf
327    * @param htd
328    * @throws IOException
329    */
330   public static void testTableCoprocessorAttrs(final Configuration conf,
331       final HTableDescriptor htd) throws IOException {
332     String pathPrefix = UUID.randomUUID().toString();
333     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
334       if (attr.getPriority() < 0) {
335         throw new IOException("Priority for coprocessor " + attr.getClassName() +
336           " cannot be less than 0");
337       }
338       ClassLoader old = Thread.currentThread().getContextClassLoader();
339       try {
340         ClassLoader cl;
341         if (attr.getPath() != null) {
342           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
343             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
344         } else {
345           cl = CoprocessorHost.class.getClassLoader();
346         }
347         Thread.currentThread().setContextClassLoader(cl);
348         cl.loadClass(attr.getClassName());
349       } catch (ClassNotFoundException e) {
350         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
351       } finally {
352         Thread.currentThread().setContextClassLoader(old);
353       }
354     }
355   }
356 
357   void loadTableCoprocessors(final Configuration conf) {
358     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
359       DEFAULT_COPROCESSORS_ENABLED);
360     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
361       DEFAULT_USER_COPROCESSORS_ENABLED);
362     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
363       return;
364     }
365 
366     // scan the table attributes for coprocessor load specifications
367     // initialize the coprocessors
368     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
369     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
370         region.getTableDesc())) {
371       // Load encompasses classloading and coprocessor initialization
372       try {
373         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
374           attr.getConf());
375         configured.add(env);
376         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
377             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
378       } catch (Throwable t) {
379         // Coprocessor failed to load, do we abort on error?
380         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
381           abortServer(attr.getClassName(), t);
382         } else {
383           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
384         }
385       }
386     }
387     // add together to coprocessor set for COW efficiency
388     coprocessors.addAll(configured);
389   }
390 
391   @Override
392   public RegionEnvironment createEnvironment(Class<?> implClass,
393       Coprocessor instance, int priority, int seq, Configuration conf) {
394     // Check if it's an Endpoint.
395     // Due to current dynamic protocol design, Endpoint
396     // uses a different way to be registered and executed.
397     // It uses a visitor pattern to invoke registered Endpoint
398     // method.
399     for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
400       Class<?> c = (Class<?>) itf;
401       if (CoprocessorService.class.isAssignableFrom(c)) {
402         region.registerService( ((CoprocessorService)instance).getService() );
403       }
404     }
405     ConcurrentMap<String, Object> classData;
406     // make sure only one thread can add maps
407     synchronized (sharedDataMap) {
408       // as long as at least one RegionEnvironment holds on to its classData it will
409       // remain in this map
410       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
411       if (classData == null) {
412         classData = new ConcurrentHashMap<String, Object>();
413         sharedDataMap.put(implClass.getName(), classData);
414       }
415     }
416     return new RegionEnvironment(instance, priority, seq, conf, region,
417         rsServices, classData);
418   }
419 
420   /**
421    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
422    *
423    * For example, {@link
424    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
425    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
426    *
427    * See also
428    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
429    *    CoprocessorEnvironment, Throwable)}
430    * @param env The coprocessor that threw the exception.
431    * @param e The exception that was thrown.
432    */
433   private void handleCoprocessorThrowableNoRethrow(
434       final CoprocessorEnvironment env, final Throwable e) {
435     try {
436       handleCoprocessorThrowable(env,e);
437     } catch (IOException ioe) {
438       // We cannot throw exceptions from the caller hook, so ignore.
439       LOG.warn(
440         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
441         e + ". Ignoring.",e);
442     }
443   }
444 
445   /**
446    * Invoked before a region open.
447    *
448    * @throws IOException Signals that an I/O exception has occurred.
449    */
450   public void preOpen() throws IOException {
451     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
452       @Override
453       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
454           throws IOException {
455         oserver.preOpen(ctx);
456       }
457     });
458   }
459 
460   /**
461    * Invoked after a region open
462    */
463   public void postOpen() {
464     try {
465       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
466         @Override
467         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
468             throws IOException {
469           oserver.postOpen(ctx);
470         }
471       });
472     } catch (IOException e) {
473       LOG.warn(e);
474     }
475   }
476 
477   /**
478    * Invoked after log replay on region
479    */
480   public void postLogReplay() {
481     try {
482       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
483         @Override
484         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
485             throws IOException {
486           oserver.postLogReplay(ctx);
487         }
488       });
489     } catch (IOException e) {
490       LOG.warn(e);
491     }
492   }
493 
494   /**
495    * Invoked before a region is closed
496    * @param abortRequested true if the server is aborting
497    */
498   public void preClose(final boolean abortRequested) throws IOException {
499     execOperation(false, new RegionOperation() {
500       @Override
501       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
502           throws IOException {
503         oserver.preClose(ctx, abortRequested);
504       }
505     });
506   }
507 
508   /**
509    * Invoked after a region is closed
510    * @param abortRequested true if the server is aborting
511    */
512   public void postClose(final boolean abortRequested) {
513     try {
514       execOperation(false, new RegionOperation() {
515         @Override
516         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
517             throws IOException {
518           oserver.postClose(ctx, abortRequested);
519         }
520         public void postEnvCall(RegionEnvironment env) {
521           shutdown(env);
522         }
523       });
524     } catch (IOException e) {
525       LOG.warn(e);
526     }
527   }
528 
529   /**
530    * See
531    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
532    */
533   public InternalScanner preCompactScannerOpen(final Store store,
534       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
535       final CompactionRequest request) throws IOException {
536     return execOperationWithResult(null,
537         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
538       @Override
539       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
540           throws IOException {
541         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
542           earliestPutTs, getResult(), request));
543       }
544     });
545   }
546 
547   /**
548    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
549    * available candidates.
550    * @param store The store where compaction is being requested
551    * @param candidates The currently available store files
552    * @param request custom compaction request
553    * @return If {@code true}, skip the normal selection process and use the current list
554    * @throws IOException
555    */
556   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
557       final CompactionRequest request) throws IOException {
558     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
559       @Override
560       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
561           throws IOException {
562         oserver.preCompactSelection(ctx, store, candidates, request);
563       }
564     });
565   }
566 
567   /**
568    * Called after the {@link StoreFile}s to be compacted have been selected from the available
569    * candidates.
570    * @param store The store where compaction is being requested
571    * @param selected The store files selected to compact
572    * @param request custom compaction
573    */
574   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
575       final CompactionRequest request) {
576     try {
577       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
578         @Override
579         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
580             throws IOException {
581           oserver.postCompactSelection(ctx, store, selected, request);
582         }
583       });
584     } catch (IOException e) {
585       LOG.warn(e);
586     }
587   }
588 
589   /**
590    * Called prior to rewriting the store files selected for compaction
591    * @param store the store being compacted
592    * @param scanner the scanner used to read store data during compaction
593    * @param scanType type of Scan
594    * @param request the compaction that will be executed
595    * @throws IOException
596    */
597   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
598       final ScanType scanType, final CompactionRequest request) throws IOException {
599     return execOperationWithResult(false, scanner,
600         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
601       @Override
602       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
603           throws IOException {
604         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
605       }
606     });
607   }
608 
609   /**
610    * Called after the store compaction has completed.
611    * @param store the store being compacted
612    * @param resultFile the new store file written during compaction
613    * @param request the compaction that is being executed
614    * @throws IOException
615    */
616   public void postCompact(final Store store, final StoreFile resultFile,
617       final CompactionRequest request) throws IOException {
618     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
619       @Override
620       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
621           throws IOException {
622         oserver.postCompact(ctx, store, resultFile, request);
623       }
624     });
625   }
626 
627   /**
628    * Invoked before a memstore flush
629    * @throws IOException
630    */
631   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
632       throws IOException {
633     return execOperationWithResult(false, scanner,
634         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
635       @Override
636       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
637           throws IOException {
638         setResult(oserver.preFlush(ctx, store, getResult()));
639       }
640     });
641   }
642 
643   /**
644    * Invoked before a memstore flush
645    * @throws IOException
646    */
647   public void preFlush() throws IOException {
648     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
649       @Override
650       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
651           throws IOException {
652         oserver.preFlush(ctx);
653       }
654     });
655   }
656 
657   /**
658    * See
659    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
660    *    Store, KeyValueScanner, InternalScanner)}
661    */
662   public InternalScanner preFlushScannerOpen(final Store store,
663       final KeyValueScanner memstoreScanner) throws IOException {
664     return execOperationWithResult(null,
665         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
666       @Override
667       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
668           throws IOException {
669         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
670       }
671     });
672   }
673 
674   /**
675    * Invoked after a memstore flush
676    * @throws IOException
677    */
678   public void postFlush() throws IOException {
679     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
680       @Override
681       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
682           throws IOException {
683         oserver.postFlush(ctx);
684       }
685     });
686   }
687 
688   /**
689    * Invoked after a memstore flush
690    * @throws IOException
691    */
692   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
693     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
694       @Override
695       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
696           throws IOException {
697         oserver.postFlush(ctx, store, storeFile);
698       }
699     });
700   }
701 
702   /**
703    * Invoked just before a split
704    * @throws IOException
705    */
706   // TODO: Deprecate this
707   public void preSplit() throws IOException {
708     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
709       @Override
710       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
711           throws IOException {
712         oserver.preSplit(ctx);
713       }
714     });
715   }
716 
717   /**
718    * Invoked just before a split
719    * @throws IOException
720    */
721   public void preSplit(final byte[] splitRow) throws IOException {
722     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
723       @Override
724       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
725           throws IOException {
726         oserver.preSplit(ctx, splitRow);
727       }
728     });
729   }
730 
731   /**
732    * Invoked just after a split
733    * @param l the new left-hand daughter region
734    * @param r the new right-hand daughter region
735    * @throws IOException
736    */
737   public void postSplit(final Region l, final Region r) throws IOException {
738     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
739       @Override
740       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
741           throws IOException {
742         oserver.postSplit(ctx, l, r);
743       }
744     });
745   }
746 
747   public boolean preSplitBeforePONR(final byte[] splitKey,
748       final List<Mutation> metaEntries) throws IOException {
749     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
750       @Override
751       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
752           throws IOException {
753         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
754       }
755     });
756   }
757 
758   public void preSplitAfterPONR() throws IOException {
759     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
760       @Override
761       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
762           throws IOException {
763         oserver.preSplitAfterPONR(ctx);
764       }
765     });
766   }
767 
768   /**
769    * Invoked just before the rollback of a failed split is started
770    * @throws IOException
771    */
772   public void preRollBackSplit() throws IOException {
773     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
774       @Override
775       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
776           throws IOException {
777         oserver.preRollBackSplit(ctx);
778       }
779     });
780   }
781 
782   /**
783    * Invoked just after the rollback of a failed split is done
784    * @throws IOException
785    */
786   public void postRollBackSplit() throws IOException {
787     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
788       @Override
789       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
790           throws IOException {
791         oserver.postRollBackSplit(ctx);
792       }
793     });
794   }
795 
796   /**
797    * Invoked after a split is completed irrespective of a failure or success.
798    * @throws IOException
799    */
800   public void postCompleteSplit() throws IOException {
801     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
802       @Override
803       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
804           throws IOException {
805         oserver.postCompleteSplit(ctx);
806       }
807     });
808   }
809 
810   // RegionObserver support
811 
812   /**
813    * @param get the Get request
814    * @return true if default processing should be bypassed
815    * @exception IOException Exception
816    */
817   public boolean preGet(final Get get, final List<Cell> results)
818       throws IOException {
819     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
820       @Override
821       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
822           throws IOException {
823         oserver.preGetOp(ctx, get, results);
824       }
825     });
826   }
827 
828   /**
829    * @param get the Get request
830    * @param results the result sett
831    * @exception IOException Exception
832    */
833   public void postGet(final Get get, final List<Cell> results)
834       throws IOException {
835     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
836       @Override
837       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
838           throws IOException {
839         oserver.postGetOp(ctx, get, results);
840       }
841     });
842   }
843 
844   /**
845    * @param get the Get request
846    * @return true or false to return to client if bypassing normal operation,
847    * or null otherwise
848    * @exception IOException Exception
849    */
850   public Boolean preExists(final Get get) throws IOException {
851     return execOperationWithResult(true, false,
852         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
853       @Override
854       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
855           throws IOException {
856         setResult(oserver.preExists(ctx, get, getResult()));
857       }
858     });
859   }
860 
861   /**
862    * @param get the Get request
863    * @param exists the result returned by the region server
864    * @return the result to return to the client
865    * @exception IOException Exception
866    */
867   public boolean postExists(final Get get, boolean exists)
868       throws IOException {
869     return execOperationWithResult(exists,
870         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
871       @Override
872       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
873           throws IOException {
874         setResult(oserver.postExists(ctx, get, getResult()));
875       }
876     });
877   }
878 
879   /**
880    * @param put The Put object
881    * @param edit The WALEdit object.
882    * @param durability The durability used
883    * @return true if default processing should be bypassed
884    * @exception IOException Exception
885    */
886   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
887       throws IOException {
888     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
889       @Override
890       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
891           throws IOException {
892         oserver.prePut(ctx, put, edit, durability);
893       }
894     });
895   }
896 
897   /**
898    * @param mutation - the current mutation
899    * @param kv - the current cell
900    * @param byteNow - current timestamp in bytes
901    * @param get - the get that could be used
902    * Note that the get only does not specify the family and qualifier that should be used
903    * @return true if default processing should be bypassed
904    * @exception IOException
905    *              Exception
906    */
907   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
908       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
909     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
910       @Override
911       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
912           throws IOException {
913         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
914       }
915     });
916   }
917 
918   /**
919    * @param put The Put object
920    * @param edit The WALEdit object.
921    * @param durability The durability used
922    * @exception IOException Exception
923    */
924   public void postPut(final Put put, final WALEdit edit, final Durability durability)
925       throws IOException {
926     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
927       @Override
928       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
929           throws IOException {
930         oserver.postPut(ctx, put, edit, durability);
931       }
932     });
933   }
934 
935   /**
936    * @param delete The Delete object
937    * @param edit The WALEdit object.
938    * @param durability The durability used
939    * @return true if default processing should be bypassed
940    * @exception IOException Exception
941    */
942   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
943       throws IOException {
944     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
945       @Override
946       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
947           throws IOException {
948         oserver.preDelete(ctx, delete, edit, durability);
949       }
950     });
951   }
952 
953   /**
954    * @param delete The Delete object
955    * @param edit The WALEdit object.
956    * @param durability The durability used
957    * @exception IOException Exception
958    */
959   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
960       throws IOException {
961     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
962       @Override
963       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
964           throws IOException {
965         oserver.postDelete(ctx, delete, edit, durability);
966       }
967     });
968   }
969 
970   /**
971    * @param miniBatchOp
972    * @return true if default processing should be bypassed
973    * @throws IOException
974    */
975   public boolean preBatchMutate(
976       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
977     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
978       @Override
979       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
980           throws IOException {
981         oserver.preBatchMutate(ctx, miniBatchOp);
982       }
983     });
984   }
985 
986   /**
987    * @param miniBatchOp
988    * @throws IOException
989    */
990   public void postBatchMutate(
991       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
992     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
993       @Override
994       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
995           throws IOException {
996         oserver.postBatchMutate(ctx, miniBatchOp);
997       }
998     });
999   }
1000 
1001   public void postBatchMutateIndispensably(
1002       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1003       throws IOException {
1004     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1005       @Override
1006       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1007           throws IOException {
1008         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1009       }
1010     });
1011   }
1012 
1013   /**
1014    * @param row row to check
1015    * @param family column family
1016    * @param qualifier column qualifier
1017    * @param compareOp the comparison operation
1018    * @param comparator the comparator
1019    * @param put data to put if check succeeds
1020    * @return true or false to return to client if default processing should
1021    * be bypassed, or null otherwise
1022    * @throws IOException e
1023    */
1024   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1025       final byte [] qualifier, final CompareOp compareOp,
1026       final ByteArrayComparable comparator, final Put put)
1027       throws IOException {
1028     return execOperationWithResult(true, false,
1029         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1030       @Override
1031       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1032           throws IOException {
1033         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1034           compareOp, comparator, put, getResult()));
1035       }
1036     });
1037   }
1038 
1039   /**
1040    * @param row row to check
1041    * @param family column family
1042    * @param qualifier column qualifier
1043    * @param compareOp the comparison operation
1044    * @param comparator the comparator
1045    * @param put data to put if check succeeds
1046    * @return true or false to return to client if default processing should
1047    * be bypassed, or null otherwise
1048    * @throws IOException e
1049    */
1050   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1051       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1052       final Put put) throws IOException {
1053     return execOperationWithResult(true, false,
1054         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1055       @Override
1056       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1057           throws IOException {
1058         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1059           compareOp, comparator, put, getResult()));
1060       }
1061     });
1062   }
1063 
1064   /**
1065    * @param row row to check
1066    * @param family column family
1067    * @param qualifier column qualifier
1068    * @param compareOp the comparison operation
1069    * @param comparator the comparator
1070    * @param put data to put if check succeeds
1071    * @throws IOException e
1072    */
1073   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1074       final byte [] qualifier, final CompareOp compareOp,
1075       final ByteArrayComparable comparator, final Put put,
1076       boolean result) throws IOException {
1077     return execOperationWithResult(result,
1078         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1079       @Override
1080       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1081           throws IOException {
1082         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1083           compareOp, comparator, put, getResult()));
1084       }
1085     });
1086   }
1087 
1088   /**
1089    * @param row row to check
1090    * @param family column family
1091    * @param qualifier column qualifier
1092    * @param compareOp the comparison operation
1093    * @param comparator the comparator
1094    * @param delete delete to commit if check succeeds
1095    * @return true or false to return to client if default processing should
1096    * be bypassed, or null otherwise
1097    * @throws IOException e
1098    */
1099   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1100       final byte [] qualifier, final CompareOp compareOp,
1101       final ByteArrayComparable comparator, final Delete delete)
1102       throws IOException {
1103     return execOperationWithResult(true, false,
1104         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1105       @Override
1106       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1107           throws IOException {
1108         setResult(oserver.preCheckAndDelete(ctx, row, family,
1109             qualifier, compareOp, comparator, delete, getResult()));
1110       }
1111     });
1112   }
1113 
1114   /**
1115    * @param row row to check
1116    * @param family column family
1117    * @param qualifier column qualifier
1118    * @param compareOp the comparison operation
1119    * @param comparator the comparator
1120    * @param delete delete to commit if check succeeds
1121    * @return true or false to return to client if default processing should
1122    * be bypassed, or null otherwise
1123    * @throws IOException e
1124    */
1125   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1126       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1127       final Delete delete) throws IOException {
1128     return execOperationWithResult(true, false,
1129         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1130       @Override
1131       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1132           throws IOException {
1133         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1134               family, qualifier, compareOp, comparator, delete, getResult()));
1135       }
1136     });
1137   }
1138 
1139   /**
1140    * @param row row to check
1141    * @param family column family
1142    * @param qualifier column qualifier
1143    * @param compareOp the comparison operation
1144    * @param comparator the comparator
1145    * @param delete delete to commit if check succeeds
1146    * @throws IOException e
1147    */
1148   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1149       final byte [] qualifier, final CompareOp compareOp,
1150       final ByteArrayComparable comparator, final Delete delete,
1151       boolean result) throws IOException {
1152     return execOperationWithResult(result,
1153         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1154       @Override
1155       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1156           throws IOException {
1157         setResult(oserver.postCheckAndDelete(ctx, row, family,
1158             qualifier, compareOp, comparator, delete, getResult()));
1159       }
1160     });
1161   }
1162 
1163   /**
1164    * @param append append object
1165    * @return result to return to client if default operation should be
1166    * bypassed, null otherwise
1167    * @throws IOException if an error occurred on the coprocessor
1168    */
1169   public Result preAppend(final Append append) throws IOException {
1170     return execOperationWithResult(true, null,
1171         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1172       @Override
1173       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1174           throws IOException {
1175         setResult(oserver.preAppend(ctx, append));
1176       }
1177     });
1178   }
1179 
1180   /**
1181    * @param append append object
1182    * @return result to return to client if default operation should be
1183    * bypassed, null otherwise
1184    * @throws IOException if an error occurred on the coprocessor
1185    */
1186   public Result preAppendAfterRowLock(final Append append) throws IOException {
1187     return execOperationWithResult(true, null,
1188         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1189       @Override
1190       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1191           throws IOException {
1192         setResult(oserver.preAppendAfterRowLock(ctx, append));
1193       }
1194     });
1195   }
1196 
1197   /**
1198    * @param increment increment object
1199    * @return result to return to client if default operation should be
1200    * bypassed, null otherwise
1201    * @throws IOException if an error occurred on the coprocessor
1202    */
1203   public Result preIncrement(final Increment increment) throws IOException {
1204     return execOperationWithResult(true, null,
1205         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1206       @Override
1207       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1208           throws IOException {
1209         setResult(oserver.preIncrement(ctx, increment));
1210       }
1211     });
1212   }
1213 
1214   /**
1215    * @param increment increment object
1216    * @return result to return to client if default operation should be
1217    * bypassed, null otherwise
1218    * @throws IOException if an error occurred on the coprocessor
1219    */
1220   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1221     return execOperationWithResult(true, null,
1222         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1223       @Override
1224       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1225           throws IOException {
1226         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1227       }
1228     });
1229   }
1230 
1231   /**
1232    * @param append Append object
1233    * @param result the result returned by the append
1234    * @throws IOException if an error occurred on the coprocessor
1235    */
1236   public void postAppend(final Append append, final Result result) throws IOException {
1237     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1238       @Override
1239       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1240           throws IOException {
1241         oserver.postAppend(ctx, append, result);
1242       }
1243     });
1244   }
1245 
1246   /**
1247    * @param increment increment object
1248    * @param result the result returned by postIncrement
1249    * @throws IOException if an error occurred on the coprocessor
1250    */
1251   public Result postIncrement(final Increment increment, Result result) throws IOException {
1252     return execOperationWithResult(result,
1253         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1254       @Override
1255       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1256           throws IOException {
1257         setResult(oserver.postIncrement(ctx, increment, getResult()));
1258       }
1259     });
1260   }
1261 
1262   /**
1263    * @param scan the Scan specification
1264    * @return scanner id to return to client if default operation should be
1265    * bypassed, false otherwise
1266    * @exception IOException Exception
1267    */
1268   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1269     return execOperationWithResult(true, null,
1270         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1271       @Override
1272       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1273           throws IOException {
1274         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1275       }
1276     });
1277   }
1278 
1279   /**
1280    * See
1281    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1282    *    Store, Scan, NavigableSet, KeyValueScanner)}
1283    */
1284   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1285       final NavigableSet<byte[]> targetCols) throws IOException {
1286     return execOperationWithResult(null,
1287         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1288       @Override
1289       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1290           throws IOException {
1291         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1292       }
1293     });
1294   }
1295 
1296   /**
1297    * @param scan the Scan specification
1298    * @param s the scanner
1299    * @return the scanner instance to use
1300    * @exception IOException Exception
1301    */
1302   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1303     return execOperationWithResult(s,
1304         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1305       @Override
1306       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1307           throws IOException {
1308         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1309       }
1310     });
1311   }
1312 
1313   /**
1314    * @param s the scanner
1315    * @param results the result set returned by the region server
1316    * @param limit the maximum number of results to return
1317    * @return 'has next' indication to client if bypassing default behavior, or
1318    * null otherwise
1319    * @exception IOException Exception
1320    */
1321   public Boolean preScannerNext(final InternalScanner s,
1322       final List<Result> results, final int limit) throws IOException {
1323     return execOperationWithResult(true, false,
1324         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1325       @Override
1326       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1327           throws IOException {
1328         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1329       }
1330     });
1331   }
1332 
1333   /**
1334    * @param s the scanner
1335    * @param results the result set returned by the region server
1336    * @param limit the maximum number of results to return
1337    * @param hasMore
1338    * @return 'has more' indication to give to client
1339    * @exception IOException Exception
1340    */
1341   public boolean postScannerNext(final InternalScanner s,
1342       final List<Result> results, final int limit, boolean hasMore)
1343       throws IOException {
1344     return execOperationWithResult(hasMore,
1345         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1346       @Override
1347       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1348           throws IOException {
1349         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1350       }
1351     });
1352   }
1353 
1354   /**
1355    * This will be called by the scan flow when the current scanned row is being filtered out by the
1356    * filter.
1357    * @param s the scanner
1358    * @param curRowCell The cell in the current row which got filtered out
1359    * @return whether more rows are available for the scanner or not
1360    * @throws IOException
1361    */
1362   public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1363       throws IOException {
1364     // short circuit for performance
1365     if (!hasCustomPostScannerFilterRow) return true;
1366     return execOperationWithResult(true,
1367         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1368       @Override
1369       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1370           throws IOException {
1371         setResult(oserver.postScannerFilterRow(ctx, s, curRowCell, getResult()));
1372       }
1373     });
1374   }
1375 
1376   /**
1377    * @param s the scanner
1378    * @return true if default behavior should be bypassed, false otherwise
1379    * @exception IOException Exception
1380    */
1381   public boolean preScannerClose(final InternalScanner s) throws IOException {
1382     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1383       @Override
1384       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1385           throws IOException {
1386         oserver.preScannerClose(ctx, s);
1387       }
1388     });
1389   }
1390 
1391   /**
1392    * @exception IOException Exception
1393    */
1394   public void postScannerClose(final InternalScanner s) throws IOException {
1395     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1396       @Override
1397       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1398           throws IOException {
1399         oserver.postScannerClose(ctx, s);
1400       }
1401     });
1402   }
1403 
1404   /**
1405    * @param info
1406    * @param logKey
1407    * @param logEdit
1408    * @return true if default behavior should be bypassed, false otherwise
1409    * @throws IOException
1410    */
1411   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1412       final WALEdit logEdit) throws IOException {
1413     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1414       @Override
1415       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1416           throws IOException {
1417         // Once we don't need to support the legacy call, replace RegionOperation with a version
1418         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1419         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1420         if (env.useLegacyPre) {
1421           if (logKey instanceof HLogKey) {
1422             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1423           } else {
1424             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1425           }
1426         } else {
1427           oserver.preWALRestore(ctx, info, logKey, logEdit);
1428         }
1429       }
1430     });
1431   }
1432 
1433   /**
1434    * @return true if default behavior should be bypassed, false otherwise
1435    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}; as of 2.0, remove in 3.0
1436    */
1437   @Deprecated
1438   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1439       final WALEdit logEdit) throws IOException {
1440     return preWALRestore(info, (WALKey)logKey, logEdit);
1441   }
1442 
1443   /**
1444    * @param info
1445    * @param logKey
1446    * @param logEdit
1447    * @throws IOException
1448    */
1449   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1450       throws IOException {
1451     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1452       @Override
1453       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1454           throws IOException {
1455         // Once we don't need to support the legacy call, replace RegionOperation with a version
1456         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1457         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1458         if (env.useLegacyPost) {
1459           if (logKey instanceof HLogKey) {
1460             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1461           } else {
1462             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1463           }
1464         } else {
1465           oserver.postWALRestore(ctx, info, logKey, logEdit);
1466         }
1467       }
1468     });
1469   }
1470 
1471   /**
1472    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}; as of 2.0, remove in 3.0
1473    */
1474   @Deprecated
1475   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1476       throws IOException {
1477     postWALRestore(info, (WALKey)logKey, logEdit);
1478   }
1479 
1480   /**
1481    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1482    * @return true if the default operation should be bypassed
1483    * @throws IOException
1484    */
1485   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1486     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1487       @Override
1488       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1489           throws IOException {
1490         oserver.preBulkLoadHFile(ctx, familyPaths);
1491       }
1492     });
1493   }
1494 
1495   /**
1496    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1497    * @param hasLoaded whether load was successful or not
1498    * @return the possibly modified value of hasLoaded
1499    * @throws IOException
1500    */
1501   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1502       boolean hasLoaded) throws IOException {
1503     return execOperationWithResult(hasLoaded,
1504         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1505       @Override
1506       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1507           throws IOException {
1508         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1509       }
1510     });
1511   }
1512 
1513   public void postStartRegionOperation(final Operation op) throws IOException {
1514     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1515       @Override
1516       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1517           throws IOException {
1518         oserver.postStartRegionOperation(ctx, op);
1519       }
1520     });
1521   }
1522 
1523   public void postCloseRegionOperation(final Operation op) throws IOException {
1524     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1525       @Override
1526       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1527           throws IOException {
1528         oserver.postCloseRegionOperation(ctx, op);
1529       }
1530     });
1531   }
1532 
1533   /**
1534    * @param fs fileystem to read from
1535    * @param p path to the file
1536    * @param in {@link FSDataInputStreamWrapper}
1537    * @param size Full size of the file
1538    * @param cacheConf
1539    * @param r original reference file. This will be not null only when reading a split file.
1540    * @return a Reader instance to use instead of the base reader if overriding
1541    * default behavior, null otherwise
1542    * @throws IOException
1543    */
1544   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1545       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1546       final Reference r) throws IOException {
1547     return execOperationWithResult(null,
1548         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1549       @Override
1550       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1551           throws IOException {
1552         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1553       }
1554     });
1555   }
1556 
1557   /**
1558    * @param fs fileystem to read from
1559    * @param p path to the file
1560    * @param in {@link FSDataInputStreamWrapper}
1561    * @param size Full size of the file
1562    * @param cacheConf
1563    * @param r original reference file. This will be not null only when reading a split file.
1564    * @param reader the base reader instance
1565    * @return The reader to use
1566    * @throws IOException
1567    */
1568   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1569       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1570       final Reference r, final StoreFile.Reader reader) throws IOException {
1571     return execOperationWithResult(reader,
1572         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1573       @Override
1574       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1575           throws IOException {
1576         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1577       }
1578     });
1579   }
1580 
1581   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1582       final Cell oldCell, Cell newCell) throws IOException {
1583     return execOperationWithResult(newCell,
1584         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1585       @Override
1586       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1587           throws IOException {
1588         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1589       }
1590     });
1591   }
1592 
1593   public Message preEndpointInvocation(final Service service, final String methodName,
1594       Message request) throws IOException {
1595     return execOperationWithResult(request,
1596         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1597       @Override
1598       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1599           throws IOException {
1600         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1601       }
1602     });
1603   }
1604 
1605   public void postEndpointInvocation(final Service service, final String methodName,
1606       final Message request, final Message.Builder responseBuilder) throws IOException {
1607     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1608       @Override
1609       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1610           throws IOException {
1611         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1612       }
1613     });
1614   }
1615 
1616   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1617     return execOperationWithResult(tracker,
1618         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1619       @Override
1620       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1621           throws IOException {
1622         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1623       }
1624     });
1625   }
1626 
1627   private static abstract class CoprocessorOperation
1628       extends ObserverContext<RegionCoprocessorEnvironment> {
1629     public abstract void call(Coprocessor observer,
1630         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1631     public abstract boolean hasCall(Coprocessor observer);
1632     public void postEnvCall(RegionEnvironment env) { }
1633   }
1634 
1635   private static abstract class RegionOperation extends CoprocessorOperation {
1636     public abstract void call(RegionObserver observer,
1637         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1638 
1639     public boolean hasCall(Coprocessor observer) {
1640       return observer instanceof RegionObserver;
1641     }
1642 
1643     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1644         throws IOException {
1645       call((RegionObserver)observer, ctx);
1646     }
1647   }
1648 
1649   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1650     private T result = null;
1651     public void setResult(final T result) { this.result = result; }
1652     public T getResult() { return this.result; }
1653   }
1654 
1655   private static abstract class EndpointOperation extends CoprocessorOperation {
1656     public abstract void call(EndpointObserver observer,
1657         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1658 
1659     public boolean hasCall(Coprocessor observer) {
1660       return observer instanceof EndpointObserver;
1661     }
1662 
1663     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1664         throws IOException {
1665       call((EndpointObserver)observer, ctx);
1666     }
1667   }
1668 
1669   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1670     private T result = null;
1671     public void setResult(final T result) { this.result = result; }
1672     public T getResult() { return this.result; }
1673   }
1674 
1675   private boolean execOperation(final CoprocessorOperation ctx)
1676       throws IOException {
1677     return execOperation(true, ctx);
1678   }
1679 
1680   private <T> T execOperationWithResult(final T defaultValue,
1681       final RegionOperationWithResult<T> ctx) throws IOException {
1682     if (ctx == null) return defaultValue;
1683     ctx.setResult(defaultValue);
1684     execOperation(true, ctx);
1685     return ctx.getResult();
1686   }
1687 
1688   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1689       final RegionOperationWithResult<T> ctx) throws IOException {
1690     boolean bypass = false;
1691     T result = defaultValue;
1692     if (ctx != null) {
1693       ctx.setResult(defaultValue);
1694       bypass = execOperation(true, ctx);
1695       result = ctx.getResult();
1696     }
1697     return bypass == ifBypass ? result : null;
1698   }
1699 
1700   private <T> T execOperationWithResult(final T defaultValue,
1701       final EndpointOperationWithResult<T> ctx) throws IOException {
1702     if (ctx == null) return defaultValue;
1703     ctx.setResult(defaultValue);
1704     execOperation(true, ctx);
1705     return ctx.getResult();
1706   }
1707 
1708   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1709       throws IOException {
1710     boolean bypass = false;
1711     List<RegionEnvironment> envs = coprocessors.get();
1712     for (int i = 0; i < envs.size(); i++) {
1713       RegionEnvironment env = envs.get(i);
1714       Coprocessor observer = env.getInstance();
1715       if (ctx.hasCall(observer)) {
1716         ctx.prepare(env);
1717         Thread currentThread = Thread.currentThread();
1718         ClassLoader cl = currentThread.getContextClassLoader();
1719         try {
1720           currentThread.setContextClassLoader(env.getClassLoader());
1721           ctx.call(observer, ctx);
1722         } catch (Throwable e) {
1723           handleCoprocessorThrowable(env, e);
1724         } finally {
1725           currentThread.setContextClassLoader(cl);
1726         }
1727         bypass |= ctx.shouldBypass();
1728         if (earlyExit && ctx.shouldComplete()) {
1729           break;
1730         }
1731       }
1732 
1733       ctx.postEnvCall(env);
1734     }
1735     return bypass;
1736   }
1737 }