View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.UUID;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.regex.Matcher;
31  
32  import org.apache.commons.collections.map.AbstractReferenceMap;
33  import org.apache.commons.collections.map.ReferenceMap;
34  import org.apache.commons.lang.ClassUtils;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.client.Append;
51  import org.apache.hadoop.hbase.client.Delete;
52  import org.apache.hadoop.hbase.client.Durability;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.Increment;
55  import org.apache.hadoop.hbase.client.Mutation;
56  import org.apache.hadoop.hbase.client.Put;
57  import org.apache.hadoop.hbase.client.Result;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
60  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
61  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
62  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
63  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
64  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
65  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
66  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
67  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
68  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
69  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
70  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
71  import org.apache.hadoop.hbase.io.Reference;
72  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
73  import org.apache.hadoop.hbase.regionserver.Region.Operation;
74  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
75  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
76  import org.apache.hadoop.hbase.wal.WALKey;
77  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
78  import org.apache.hadoop.hbase.util.Bytes;
79  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
80  import org.apache.hadoop.hbase.util.Pair;
81  
82  import com.google.common.collect.ImmutableList;
83  import com.google.common.collect.Lists;
84  import com.google.protobuf.Message;
85  import com.google.protobuf.Service;
86  
87  /**
88   * Implements the coprocessor environment and runtime support for coprocessors
89   * loaded within a {@link Region}.
90   */
91  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
92  @InterfaceStability.Evolving
93  public class RegionCoprocessorHost
94      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
95  
96    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
97    // The shared data map
98    private static ReferenceMap sharedDataMap =
99        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
100 
101   // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
102   private final boolean hasCustomPostScannerFilterRow;
103 
104   /**
105    * 
106    * Encapsulation of the environment of each coprocessor
107    */
108   static class RegionEnvironment extends CoprocessorHost.Environment
109       implements RegionCoprocessorEnvironment {
110 
111     private Region region;
112     private RegionServerServices rsServices;
113     ConcurrentMap<String, Object> sharedData;
114     private final boolean useLegacyPre;
115     private final boolean useLegacyPost;
116 
117     /**
118      * Constructor
119      * @param impl the coprocessor instance
120      * @param priority chaining priority
121      */
122     public RegionEnvironment(final Coprocessor impl, final int priority,
123         final int seq, final Configuration conf, final Region region,
124         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
125       super(impl, priority, seq, conf);
126       this.region = region;
127       this.rsServices = services;
128       this.sharedData = sharedData;
129       // Pick which version of the WAL related events we'll call.
130       // This way we avoid calling the new version on older RegionObservers so
131       // we can maintain binary compatibility.
132       // See notes in javadoc for RegionObserver
133       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
134           HRegionInfo.class, WALKey.class, WALEdit.class);
135       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
136           HRegionInfo.class, WALKey.class, WALEdit.class);
137     }
138 
139     /** @return the region */
140     @Override
141     public Region getRegion() {
142       return region;
143     }
144 
145     /** @return reference to the region server services */
146     @Override
147     public RegionServerServices getRegionServerServices() {
148       return rsServices;
149     }
150 
151     public void shutdown() {
152       super.shutdown();
153     }
154 
155     @Override
156     public ConcurrentMap<String, Object> getSharedData() {
157       return sharedData;
158     }
159 
160     @Override
161     public HRegionInfo getRegionInfo() {
162       return region.getRegionInfo();
163     }
164 
165   }
166 
167   static class TableCoprocessorAttribute {
168     private Path path;
169     private String className;
170     private int priority;
171     private Configuration conf;
172 
173     public TableCoprocessorAttribute(Path path, String className, int priority,
174         Configuration conf) {
175       this.path = path;
176       this.className = className;
177       this.priority = priority;
178       this.conf = conf;
179     }
180 
181     public Path getPath() {
182       return path;
183     }
184 
185     public String getClassName() {
186       return className;
187     }
188 
189     public int getPriority() {
190       return priority;
191     }
192 
193     public Configuration getConf() {
194       return conf;
195     }
196   }
197 
198   /** The region server services */
199   RegionServerServices rsServices;
200   /** The region */
201   Region region;
202 
203   /**
204    * Constructor
205    * @param region the region
206    * @param rsServices interface to available region server functionality
207    * @param conf the configuration
208    */
209   public RegionCoprocessorHost(final Region region,
210       final RegionServerServices rsServices, final Configuration conf) {
211     super(rsServices);
212     this.conf = conf;
213     this.rsServices = rsServices;
214     this.region = region;
215     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
216 
217     // load system default cp's from configuration.
218     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
219 
220     // load system default cp's for user tables from configuration.
221     if (!region.getRegionInfo().getTable().isSystemTable()) {
222       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
223     }
224 
225     // load Coprocessor From HDFS
226     loadTableCoprocessors(conf);
227 
228     // now check whether any coprocessor implements postScannerFilterRow
229     boolean hasCustomPostScannerFilterRow = false;
230     out: for (RegionEnvironment env: coprocessors) {
231       if (env.getInstance() instanceof RegionObserver) {
232         Class<?> clazz = env.getInstance().getClass();
233         for(;;) {
234           if (clazz == null) {
235             // we must have directly implemented RegionObserver
236             hasCustomPostScannerFilterRow = true;
237             break out;
238           }
239           if (clazz == BaseRegionObserver.class) {
240             // we reached BaseRegionObserver, try next coprocessor
241             break;
242           }
243           try {
244             clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
245               InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
246             // this coprocessor has a custom version of postScannerFilterRow
247             hasCustomPostScannerFilterRow = true;
248             break out;
249           } catch (NoSuchMethodException ignore) {
250           }
251           clazz = clazz.getSuperclass();
252         }
253       }
254     }
255     this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
256   }
257 
258   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
259       HTableDescriptor htd) {
260     List<TableCoprocessorAttribute> result = Lists.newArrayList();
261     for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
262       String key = Bytes.toString(e.getKey().get()).trim();
263       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
264         String spec = Bytes.toString(e.getValue().get()).trim();
265         // found one
266         try {
267           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
268           if (matcher.matches()) {
269             // jar file path can be empty if the cp class can be loaded
270             // from class loader.
271             Path path = matcher.group(1).trim().isEmpty() ?
272                 null : new Path(matcher.group(1).trim());
273             String className = matcher.group(2).trim();
274             if (className.isEmpty()) {
275               LOG.error("Malformed table coprocessor specification: key=" +
276                 key + ", spec: " + spec);
277               continue;
278             }
279             int priority = matcher.group(3).trim().isEmpty() ?
280                 Coprocessor.PRIORITY_USER : Integer.parseInt(matcher.group(3));
281             String cfgSpec = null;
282             try {
283               cfgSpec = matcher.group(4);
284             } catch (IndexOutOfBoundsException ex) {
285               // ignore
286             }
287             Configuration ourConf;
288             if (cfgSpec != null && !cfgSpec.trim().equals("|")) {
289               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
290               // do an explicit deep copy of the passed configuration
291               ourConf = new Configuration(false);
292               HBaseConfiguration.merge(ourConf, conf);
293               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
294               while (m.find()) {
295                 ourConf.set(m.group(1), m.group(2));
296               }
297             } else {
298               ourConf = conf;
299             }
300             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
301           } else {
302             LOG.error("Malformed table coprocessor specification: key=" + key +
303               ", spec: " + spec);
304           }
305         } catch (Exception ioe) {
306           LOG.error("Malformed table coprocessor specification: key=" + key +
307             ", spec: " + spec);
308         }
309       }
310     }
311     return result;
312   }
313 
314   /**
315    * Sanity check the table coprocessor attributes of the supplied schema. Will
316    * throw an exception if there is a problem.
317    * @param conf
318    * @param htd
319    * @throws IOException
320    */
321   public static void testTableCoprocessorAttrs(final Configuration conf,
322       final HTableDescriptor htd) throws IOException {
323     String pathPrefix = UUID.randomUUID().toString();
324     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
325       if (attr.getPriority() < 0) {
326         throw new IOException("Priority for coprocessor " + attr.getClassName() +
327           " cannot be less than 0");
328       }
329       ClassLoader old = Thread.currentThread().getContextClassLoader();
330       try {
331         ClassLoader cl;
332         if (attr.getPath() != null) {
333           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
334             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
335         } else {
336           cl = CoprocessorHost.class.getClassLoader();
337         }
338         Thread.currentThread().setContextClassLoader(cl);
339         cl.loadClass(attr.getClassName());
340       } catch (ClassNotFoundException e) {
341         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
342       } finally {
343         Thread.currentThread().setContextClassLoader(old);
344       }
345     }
346   }
347 
348   void loadTableCoprocessors(final Configuration conf) {
349     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
350       DEFAULT_COPROCESSORS_ENABLED);
351     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
352       DEFAULT_USER_COPROCESSORS_ENABLED);
353     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
354       return;
355     }
356 
357     // scan the table attributes for coprocessor load specifications
358     // initialize the coprocessors
359     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
360     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
361         region.getTableDesc())) {
362       // Load encompasses classloading and coprocessor initialization
363       try {
364         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
365           attr.getConf());
366         configured.add(env);
367         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
368             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
369       } catch (Throwable t) {
370         // Coprocessor failed to load, do we abort on error?
371         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
372           abortServer(attr.getClassName(), t);
373         } else {
374           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
375         }
376       }
377     }
378     // add together to coprocessor set for COW efficiency
379     coprocessors.addAll(configured);
380   }
381 
382   @Override
383   public RegionEnvironment createEnvironment(Class<?> implClass,
384       Coprocessor instance, int priority, int seq, Configuration conf) {
385     // Check if it's an Endpoint.
386     // Due to current dynamic protocol design, Endpoint
387     // uses a different way to be registered and executed.
388     // It uses a visitor pattern to invoke registered Endpoint
389     // method.
390     for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
391       Class<?> c = (Class<?>) itf;
392       if (CoprocessorService.class.isAssignableFrom(c)) {
393         region.registerService( ((CoprocessorService)instance).getService() );
394       }
395     }
396     ConcurrentMap<String, Object> classData;
397     // make sure only one thread can add maps
398     synchronized (sharedDataMap) {
399       // as long as at least one RegionEnvironment holds on to its classData it will
400       // remain in this map
401       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
402       if (classData == null) {
403         classData = new ConcurrentHashMap<String, Object>();
404         sharedDataMap.put(implClass.getName(), classData);
405       }
406     }
407     return new RegionEnvironment(instance, priority, seq, conf, region,
408         rsServices, classData);
409   }
410 
411   /**
412    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
413    *
414    * For example, {@link
415    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
416    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
417    *
418    * See also
419    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
420    *    CoprocessorEnvironment, Throwable)}
421    * @param env The coprocessor that threw the exception.
422    * @param e The exception that was thrown.
423    */
424   private void handleCoprocessorThrowableNoRethrow(
425       final CoprocessorEnvironment env, final Throwable e) {
426     try {
427       handleCoprocessorThrowable(env,e);
428     } catch (IOException ioe) {
429       // We cannot throw exceptions from the caller hook, so ignore.
430       LOG.warn(
431         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
432         e + ". Ignoring.",e);
433     }
434   }
435 
436   /**
437    * Invoked before a region open.
438    *
439    * @throws IOException Signals that an I/O exception has occurred.
440    */
441   public void preOpen() throws IOException {
442     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
443       @Override
444       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
445           throws IOException {
446         oserver.preOpen(ctx);
447       }
448     });
449   }
450 
451   /**
452    * Invoked after a region open
453    */
454   public void postOpen() {
455     try {
456       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
457         @Override
458         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
459             throws IOException {
460           oserver.postOpen(ctx);
461         }
462       });
463     } catch (IOException e) {
464       LOG.warn(e);
465     }
466   }
467 
468   /**
469    * Invoked after log replay on region
470    */
471   public void postLogReplay() {
472     try {
473       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
474         @Override
475         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
476             throws IOException {
477           oserver.postLogReplay(ctx);
478         }
479       });
480     } catch (IOException e) {
481       LOG.warn(e);
482     }
483   }
484 
485   /**
486    * Invoked before a region is closed
487    * @param abortRequested true if the server is aborting
488    */
489   public void preClose(final boolean abortRequested) throws IOException {
490     execOperation(false, new RegionOperation() {
491       @Override
492       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
493           throws IOException {
494         oserver.preClose(ctx, abortRequested);
495       }
496     });
497   }
498 
499   /**
500    * Invoked after a region is closed
501    * @param abortRequested true if the server is aborting
502    */
503   public void postClose(final boolean abortRequested) {
504     try {
505       execOperation(false, new RegionOperation() {
506         @Override
507         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
508             throws IOException {
509           oserver.postClose(ctx, abortRequested);
510         }
511         public void postEnvCall(RegionEnvironment env) {
512           shutdown(env);
513         }
514       });
515     } catch (IOException e) {
516       LOG.warn(e);
517     }
518   }
519 
520   /**
521    * See
522    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
523    */
524   public InternalScanner preCompactScannerOpen(final Store store,
525       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
526       final CompactionRequest request) throws IOException {
527     return execOperationWithResult(null,
528         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
529       @Override
530       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
531           throws IOException {
532         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
533           earliestPutTs, getResult(), request));
534       }
535     });
536   }
537 
538   /**
539    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
540    * available candidates.
541    * @param store The store where compaction is being requested
542    * @param candidates The currently available store files
543    * @param request custom compaction request
544    * @return If {@code true}, skip the normal selection process and use the current list
545    * @throws IOException
546    */
547   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
548       final CompactionRequest request) throws IOException {
549     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
550       @Override
551       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
552           throws IOException {
553         oserver.preCompactSelection(ctx, store, candidates, request);
554       }
555     });
556   }
557 
558   /**
559    * Called after the {@link StoreFile}s to be compacted have been selected from the available
560    * candidates.
561    * @param store The store where compaction is being requested
562    * @param selected The store files selected to compact
563    * @param request custom compaction
564    */
565   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
566       final CompactionRequest request) {
567     try {
568       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
569         @Override
570         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
571             throws IOException {
572           oserver.postCompactSelection(ctx, store, selected, request);
573         }
574       });
575     } catch (IOException e) {
576       LOG.warn(e);
577     }
578   }
579 
580   /**
581    * Called prior to rewriting the store files selected for compaction
582    * @param store the store being compacted
583    * @param scanner the scanner used to read store data during compaction
584    * @param scanType type of Scan
585    * @param request the compaction that will be executed
586    * @throws IOException
587    */
588   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
589       final ScanType scanType, final CompactionRequest request) throws IOException {
590     return execOperationWithResult(false, scanner,
591         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
592       @Override
593       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
594           throws IOException {
595         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
596       }
597     });
598   }
599 
600   /**
601    * Called after the store compaction has completed.
602    * @param store the store being compacted
603    * @param resultFile the new store file written during compaction
604    * @param request the compaction that is being executed
605    * @throws IOException
606    */
607   public void postCompact(final Store store, final StoreFile resultFile,
608       final CompactionRequest request) throws IOException {
609     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
610       @Override
611       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
612           throws IOException {
613         oserver.postCompact(ctx, store, resultFile, request);
614       }
615     });
616   }
617 
618   /**
619    * Invoked before a memstore flush
620    * @throws IOException
621    */
622   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
623       throws IOException {
624     return execOperationWithResult(false, scanner,
625         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
626       @Override
627       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
628           throws IOException {
629         setResult(oserver.preFlush(ctx, store, getResult()));
630       }
631     });
632   }
633 
634   /**
635    * Invoked before a memstore flush
636    * @throws IOException
637    */
638   public void preFlush() throws IOException {
639     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
640       @Override
641       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
642           throws IOException {
643         oserver.preFlush(ctx);
644       }
645     });
646   }
647 
648   /**
649    * See
650    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
651    *    Store, KeyValueScanner, InternalScanner)}
652    */
653   public InternalScanner preFlushScannerOpen(final Store store,
654       final KeyValueScanner memstoreScanner) throws IOException {
655     return execOperationWithResult(null,
656         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
657       @Override
658       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
659           throws IOException {
660         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
661       }
662     });
663   }
664 
665   /**
666    * Invoked after a memstore flush
667    * @throws IOException
668    */
669   public void postFlush() throws IOException {
670     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
671       @Override
672       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
673           throws IOException {
674         oserver.postFlush(ctx);
675       }
676     });
677   }
678 
679   /**
680    * Invoked after a memstore flush
681    * @throws IOException
682    */
683   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
684     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
685       @Override
686       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
687           throws IOException {
688         oserver.postFlush(ctx, store, storeFile);
689       }
690     });
691   }
692 
693   /**
694    * Invoked just before a split
695    * @throws IOException
696    */
697   // TODO: Deprecate this
698   public void preSplit() throws IOException {
699     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
700       @Override
701       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
702           throws IOException {
703         oserver.preSplit(ctx);
704       }
705     });
706   }
707 
708   /**
709    * Invoked just before a split
710    * @throws IOException
711    */
712   public void preSplit(final byte[] splitRow) throws IOException {
713     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
714       @Override
715       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
716           throws IOException {
717         oserver.preSplit(ctx, splitRow);
718       }
719     });
720   }
721 
722   /**
723    * Invoked just after a split
724    * @param l the new left-hand daughter region
725    * @param r the new right-hand daughter region
726    * @throws IOException
727    */
728   public void postSplit(final Region l, final Region r) throws IOException {
729     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
730       @Override
731       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
732           throws IOException {
733         oserver.postSplit(ctx, l, r);
734       }
735     });
736   }
737 
738   public boolean preSplitBeforePONR(final byte[] splitKey,
739       final List<Mutation> metaEntries) throws IOException {
740     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
741       @Override
742       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
743           throws IOException {
744         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
745       }
746     });
747   }
748 
749   public void preSplitAfterPONR() throws IOException {
750     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
751       @Override
752       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
753           throws IOException {
754         oserver.preSplitAfterPONR(ctx);
755       }
756     });
757   }
758 
759   /**
760    * Invoked just before the rollback of a failed split is started
761    * @throws IOException
762    */
763   public void preRollBackSplit() throws IOException {
764     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
765       @Override
766       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
767           throws IOException {
768         oserver.preRollBackSplit(ctx);
769       }
770     });
771   }
772 
773   /**
774    * Invoked just after the rollback of a failed split is done
775    * @throws IOException
776    */
777   public void postRollBackSplit() throws IOException {
778     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
779       @Override
780       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
781           throws IOException {
782         oserver.postRollBackSplit(ctx);
783       }
784     });
785   }
786 
787   /**
788    * Invoked after a split is completed irrespective of a failure or success.
789    * @throws IOException
790    */
791   public void postCompleteSplit() throws IOException {
792     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
793       @Override
794       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
795           throws IOException {
796         oserver.postCompleteSplit(ctx);
797       }
798     });
799   }
800 
801   // RegionObserver support
802 
803   /**
804    * @param row the row key
805    * @param family the family
806    * @param result the result set from the region
807    * @return true if default processing should be bypassed
808    * @exception IOException Exception
809    */
810   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
811       final Result result) throws IOException {
812     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
813       @Override
814       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
815           throws IOException {
816         oserver.preGetClosestRowBefore(ctx, row, family, result);
817       }
818     });
819   }
820 
821   /**
822    * @param row the row key
823    * @param family the family
824    * @param result the result set from the region
825    * @exception IOException Exception
826    */
827   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
828       final Result result) throws IOException {
829     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
830       @Override
831       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
832           throws IOException {
833         oserver.postGetClosestRowBefore(ctx, row, family, result);
834       }
835     });
836   }
837 
838   /**
839    * @param get the Get request
840    * @return true if default processing should be bypassed
841    * @exception IOException Exception
842    */
843   public boolean preGet(final Get get, final List<Cell> results)
844       throws IOException {
845     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
846       @Override
847       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
848           throws IOException {
849         oserver.preGetOp(ctx, get, results);
850       }
851     });
852   }
853 
854   /**
855    * @param get the Get request
856    * @param results the result sett
857    * @exception IOException Exception
858    */
859   public void postGet(final Get get, final List<Cell> results)
860       throws IOException {
861     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
862       @Override
863       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
864           throws IOException {
865         oserver.postGetOp(ctx, get, results);
866       }
867     });
868   }
869 
870   /**
871    * @param get the Get request
872    * @return true or false to return to client if bypassing normal operation,
873    * or null otherwise
874    * @exception IOException Exception
875    */
876   public Boolean preExists(final Get get) throws IOException {
877     return execOperationWithResult(true, false,
878         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
879       @Override
880       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
881           throws IOException {
882         setResult(oserver.preExists(ctx, get, getResult()));
883       }
884     });
885   }
886 
887   /**
888    * @param get the Get request
889    * @param exists the result returned by the region server
890    * @return the result to return to the client
891    * @exception IOException Exception
892    */
893   public boolean postExists(final Get get, boolean exists)
894       throws IOException {
895     return execOperationWithResult(exists,
896         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
897       @Override
898       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
899           throws IOException {
900         setResult(oserver.postExists(ctx, get, getResult()));
901       }
902     });
903   }
904 
905   /**
906    * @param put The Put object
907    * @param edit The WALEdit object.
908    * @param durability The durability used
909    * @return true if default processing should be bypassed
910    * @exception IOException Exception
911    */
912   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
913       throws IOException {
914     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
915       @Override
916       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
917           throws IOException {
918         oserver.prePut(ctx, put, edit, durability);
919       }
920     });
921   }
922 
923   /**
924    * @param mutation - the current mutation
925    * @param kv - the current cell
926    * @param byteNow - current timestamp in bytes
927    * @param get - the get that could be used
928    * Note that the get only does not specify the family and qualifier that should be used
929    * @return true if default processing should be bypassed
930    * @exception IOException
931    *              Exception
932    */
933   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
934       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
935     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
936       @Override
937       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
938           throws IOException {
939         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
940       }
941     });
942   }
943 
944   /**
945    * @param put The Put object
946    * @param edit The WALEdit object.
947    * @param durability The durability used
948    * @exception IOException Exception
949    */
950   public void postPut(final Put put, final WALEdit edit, final Durability durability)
951       throws IOException {
952     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
953       @Override
954       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
955           throws IOException {
956         oserver.postPut(ctx, put, edit, durability);
957       }
958     });
959   }
960 
961   /**
962    * @param delete The Delete object
963    * @param edit The WALEdit object.
964    * @param durability The durability used
965    * @return true if default processing should be bypassed
966    * @exception IOException Exception
967    */
968   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
969       throws IOException {
970     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
971       @Override
972       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
973           throws IOException {
974         oserver.preDelete(ctx, delete, edit, durability);
975       }
976     });
977   }
978 
979   /**
980    * @param delete The Delete object
981    * @param edit The WALEdit object.
982    * @param durability The durability used
983    * @exception IOException Exception
984    */
985   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
986       throws IOException {
987     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
988       @Override
989       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
990           throws IOException {
991         oserver.postDelete(ctx, delete, edit, durability);
992       }
993     });
994   }
995 
996   /**
997    * @param miniBatchOp
998    * @return true if default processing should be bypassed
999    * @throws IOException
1000    */
1001   public boolean preBatchMutate(
1002       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1003     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1004       @Override
1005       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1006           throws IOException {
1007         oserver.preBatchMutate(ctx, miniBatchOp);
1008       }
1009     });
1010   }
1011 
1012   /**
1013    * @param miniBatchOp
1014    * @throws IOException
1015    */
1016   public void postBatchMutate(
1017       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1018     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1019       @Override
1020       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1021           throws IOException {
1022         oserver.postBatchMutate(ctx, miniBatchOp);
1023       }
1024     });
1025   }
1026 
1027   public void postBatchMutateIndispensably(
1028       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1029       throws IOException {
1030     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1031       @Override
1032       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1033           throws IOException {
1034         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1035       }
1036     });
1037   }
1038 
1039   /**
1040    * @param row row to check
1041    * @param family column family
1042    * @param qualifier column qualifier
1043    * @param compareOp the comparison operation
1044    * @param comparator the comparator
1045    * @param put data to put if check succeeds
1046    * @return true or false to return to client if default processing should
1047    * be bypassed, or null otherwise
1048    * @throws IOException e
1049    */
1050   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1051       final byte [] qualifier, final CompareOp compareOp,
1052       final ByteArrayComparable comparator, final Put put)
1053       throws IOException {
1054     return execOperationWithResult(true, false,
1055         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1056       @Override
1057       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1058           throws IOException {
1059         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1060           compareOp, comparator, put, getResult()));
1061       }
1062     });
1063   }
1064 
1065   /**
1066    * @param row row to check
1067    * @param family column family
1068    * @param qualifier column qualifier
1069    * @param compareOp the comparison operation
1070    * @param comparator the comparator
1071    * @param put data to put if check succeeds
1072    * @return true or false to return to client if default processing should
1073    * be bypassed, or null otherwise
1074    * @throws IOException e
1075    */
1076   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1077       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1078       final Put put) throws IOException {
1079     return execOperationWithResult(true, false,
1080         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1081       @Override
1082       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1083           throws IOException {
1084         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1085           compareOp, comparator, put, getResult()));
1086       }
1087     });
1088   }
1089 
1090   /**
1091    * @param row row to check
1092    * @param family column family
1093    * @param qualifier column qualifier
1094    * @param compareOp the comparison operation
1095    * @param comparator the comparator
1096    * @param put data to put if check succeeds
1097    * @throws IOException e
1098    */
1099   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1100       final byte [] qualifier, final CompareOp compareOp,
1101       final ByteArrayComparable comparator, final Put put,
1102       boolean result) throws IOException {
1103     return execOperationWithResult(result,
1104         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1105       @Override
1106       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1107           throws IOException {
1108         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1109           compareOp, comparator, put, getResult()));
1110       }
1111     });
1112   }
1113 
1114   /**
1115    * @param row row to check
1116    * @param family column family
1117    * @param qualifier column qualifier
1118    * @param compareOp the comparison operation
1119    * @param comparator the comparator
1120    * @param delete delete to commit if check succeeds
1121    * @return true or false to return to client if default processing should
1122    * be bypassed, or null otherwise
1123    * @throws IOException e
1124    */
1125   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1126       final byte [] qualifier, final CompareOp compareOp,
1127       final ByteArrayComparable comparator, final Delete delete)
1128       throws IOException {
1129     return execOperationWithResult(true, false,
1130         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1131       @Override
1132       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1133           throws IOException {
1134         setResult(oserver.preCheckAndDelete(ctx, row, family,
1135             qualifier, compareOp, comparator, delete, getResult()));
1136       }
1137     });
1138   }
1139 
1140   /**
1141    * @param row row to check
1142    * @param family column family
1143    * @param qualifier column qualifier
1144    * @param compareOp the comparison operation
1145    * @param comparator the comparator
1146    * @param delete delete to commit if check succeeds
1147    * @return true or false to return to client if default processing should
1148    * be bypassed, or null otherwise
1149    * @throws IOException e
1150    */
1151   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1152       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1153       final Delete delete) throws IOException {
1154     return execOperationWithResult(true, false,
1155         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1156       @Override
1157       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1158           throws IOException {
1159         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1160               family, qualifier, compareOp, comparator, delete, getResult()));
1161       }
1162     });
1163   }
1164 
1165   /**
1166    * @param row row to check
1167    * @param family column family
1168    * @param qualifier column qualifier
1169    * @param compareOp the comparison operation
1170    * @param comparator the comparator
1171    * @param delete delete to commit if check succeeds
1172    * @throws IOException e
1173    */
1174   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1175       final byte [] qualifier, final CompareOp compareOp,
1176       final ByteArrayComparable comparator, final Delete delete,
1177       boolean result) throws IOException {
1178     return execOperationWithResult(result,
1179         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1180       @Override
1181       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1182           throws IOException {
1183         setResult(oserver.postCheckAndDelete(ctx, row, family,
1184             qualifier, compareOp, comparator, delete, getResult()));
1185       }
1186     });
1187   }
1188 
1189   /**
1190    * @param append append object
1191    * @return result to return to client if default operation should be
1192    * bypassed, null otherwise
1193    * @throws IOException if an error occurred on the coprocessor
1194    */
1195   public Result preAppend(final Append append) throws IOException {
1196     return execOperationWithResult(true, null,
1197         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1198       @Override
1199       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1200           throws IOException {
1201         setResult(oserver.preAppend(ctx, append));
1202       }
1203     });
1204   }
1205 
1206   /**
1207    * @param append append object
1208    * @return result to return to client if default operation should be
1209    * bypassed, null otherwise
1210    * @throws IOException if an error occurred on the coprocessor
1211    */
1212   public Result preAppendAfterRowLock(final Append append) throws IOException {
1213     return execOperationWithResult(true, null,
1214         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1215       @Override
1216       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1217           throws IOException {
1218         setResult(oserver.preAppendAfterRowLock(ctx, append));
1219       }
1220     });
1221   }
1222 
1223   /**
1224    * @param increment increment object
1225    * @return result to return to client if default operation should be
1226    * bypassed, null otherwise
1227    * @throws IOException if an error occurred on the coprocessor
1228    */
1229   public Result preIncrement(final Increment increment) throws IOException {
1230     return execOperationWithResult(true, null,
1231         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1232       @Override
1233       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1234           throws IOException {
1235         setResult(oserver.preIncrement(ctx, increment));
1236       }
1237     });
1238   }
1239 
1240   /**
1241    * @param increment increment object
1242    * @return result to return to client if default operation should be
1243    * bypassed, null otherwise
1244    * @throws IOException if an error occurred on the coprocessor
1245    */
1246   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1247     return execOperationWithResult(true, null,
1248         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1249       @Override
1250       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1251           throws IOException {
1252         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1253       }
1254     });
1255   }
1256 
1257   /**
1258    * @param append Append object
1259    * @param result the result returned by the append
1260    * @throws IOException if an error occurred on the coprocessor
1261    */
1262   public void postAppend(final Append append, final Result result) throws IOException {
1263     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1264       @Override
1265       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1266           throws IOException {
1267         oserver.postAppend(ctx, append, result);
1268       }
1269     });
1270   }
1271 
1272   /**
1273    * @param increment increment object
1274    * @param result the result returned by postIncrement
1275    * @throws IOException if an error occurred on the coprocessor
1276    */
1277   public Result postIncrement(final Increment increment, Result result) throws IOException {
1278     return execOperationWithResult(result,
1279         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1280       @Override
1281       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1282           throws IOException {
1283         setResult(oserver.postIncrement(ctx, increment, getResult()));
1284       }
1285     });
1286   }
1287 
1288   /**
1289    * @param scan the Scan specification
1290    * @return scanner id to return to client if default operation should be
1291    * bypassed, null otherwise
1292    * @exception IOException Exception
1293    */
1294   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1295     return execOperationWithResult(true, null,
1296         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1297       @Override
1298       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1299           throws IOException {
1300         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1301       }
1302     });
1303   }
1304 
1305   /**
1306    * See
1307    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1308    *    Store, Scan, NavigableSet, KeyValueScanner)}
1309    */
1310   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1311       final NavigableSet<byte[]> targetCols) throws IOException {
1312     return execOperationWithResult(null,
1313         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1314       @Override
1315       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1316           throws IOException {
1317         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1318       }
1319     });
1320   }
1321 
1322   /**
1323    * @param scan the Scan specification
1324    * @param s the scanner
1325    * @return the scanner instance to use
1326    * @exception IOException Exception
1327    */
1328   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1329     return execOperationWithResult(s,
1330         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1331       @Override
1332       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1333           throws IOException {
1334         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1335       }
1336     });
1337   }
1338 
1339   /**
1340    * @param s the scanner
1341    * @param results the result set returned by the region server
1342    * @param limit the maximum number of results to return
1343    * @return 'has next' indication to client if bypassing default behavior, or
1344    * null otherwise
1345    * @exception IOException Exception
1346    */
1347   public Boolean preScannerNext(final InternalScanner s,
1348       final List<Result> results, final int limit) throws IOException {
1349     return execOperationWithResult(true, false,
1350         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1351       @Override
1352       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1353           throws IOException {
1354         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1355       }
1356     });
1357   }
1358 
1359   /**
1360    * @param s the scanner
1361    * @param results the result set returned by the region server
1362    * @param limit the maximum number of results to return
1363    * @param hasMore
1364    * @return 'has more' indication to give to client
1365    * @exception IOException Exception
1366    */
1367   public boolean postScannerNext(final InternalScanner s,
1368       final List<Result> results, final int limit, boolean hasMore)
1369       throws IOException {
1370     return execOperationWithResult(hasMore,
1371         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1372       @Override
1373       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1374           throws IOException {
1375         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1376       }
1377     });
1378   }
1379 
1380   /**
1381    * This will be called by the scan flow when the current scanned row is being filtered out by the
1382    * filter.
1383    * @param s the scanner
1384    * @param currentRow The current rowkey which got filtered out
1385    * @param offset offset to rowkey
1386    * @param length length of rowkey
1387    * @return whether more rows are available for the scanner or not
1388    * @throws IOException
1389    */
1390   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1391       final int offset, final short length) throws IOException {
1392     // short circuit for performance
1393     if (!hasCustomPostScannerFilterRow) return true;
1394     return execOperationWithResult(true,
1395         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1396       @Override
1397       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1398           throws IOException {
1399         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1400       }
1401     });
1402   }
1403 
1404   /**
1405    * @param s the scanner
1406    * @return true if default behavior should be bypassed, false otherwise
1407    * @exception IOException Exception
1408    */
1409   public boolean preScannerClose(final InternalScanner s) throws IOException {
1410     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1411       @Override
1412       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1413           throws IOException {
1414         oserver.preScannerClose(ctx, s);
1415       }
1416     });
1417   }
1418 
1419   /**
1420    * @exception IOException Exception
1421    */
1422   public void postScannerClose(final InternalScanner s) throws IOException {
1423     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1424       @Override
1425       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1426           throws IOException {
1427         oserver.postScannerClose(ctx, s);
1428       }
1429     });
1430   }
1431 
1432   /**
1433    * @param info
1434    * @param logKey
1435    * @param logEdit
1436    * @return true if default behavior should be bypassed, false otherwise
1437    * @throws IOException
1438    */
1439   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1440       final WALEdit logEdit) throws IOException {
1441     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1442       @Override
1443       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1444           throws IOException {
1445         // Once we don't need to support the legacy call, replace RegionOperation with a version
1446         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1447         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1448         if (env.useLegacyPre) {
1449           if (logKey instanceof HLogKey) {
1450             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1451           } else {
1452             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1453           }
1454         } else {
1455           oserver.preWALRestore(ctx, info, logKey, logEdit);
1456         }
1457       }
1458     });
1459   }
1460 
1461   /**
1462    * @return true if default behavior should be bypassed, false otherwise
1463    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1464    */
1465   @Deprecated
1466   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1467       final WALEdit logEdit) throws IOException {
1468     return preWALRestore(info, (WALKey)logKey, logEdit);
1469   }
1470 
1471   /**
1472    * @param info
1473    * @param logKey
1474    * @param logEdit
1475    * @throws IOException
1476    */
1477   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1478       throws IOException {
1479     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1480       @Override
1481       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1482           throws IOException {
1483         // Once we don't need to support the legacy call, replace RegionOperation with a version
1484         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1485         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1486         if (env.useLegacyPost) {
1487           if (logKey instanceof HLogKey) {
1488             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1489           } else {
1490             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1491           }
1492         } else {
1493           oserver.postWALRestore(ctx, info, logKey, logEdit);
1494         }
1495       }
1496     });
1497   }
1498 
1499   /**
1500    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1501    */
1502   @Deprecated
1503   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1504       throws IOException {
1505     postWALRestore(info, (WALKey)logKey, logEdit);
1506   }
1507 
1508   /**
1509    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1510    * @return true if the default operation should be bypassed
1511    * @throws IOException
1512    */
1513   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1514     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1515       @Override
1516       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1517           throws IOException {
1518         oserver.preBulkLoadHFile(ctx, familyPaths);
1519       }
1520     });
1521   }
1522 
1523   /**
1524    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1525    * @param hasLoaded whether load was successful or not
1526    * @return the possibly modified value of hasLoaded
1527    * @throws IOException
1528    */
1529   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1530       boolean hasLoaded) throws IOException {
1531     return execOperationWithResult(hasLoaded,
1532         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1533       @Override
1534       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1535           throws IOException {
1536         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1537       }
1538     });
1539   }
1540 
1541   public void postStartRegionOperation(final Operation op) throws IOException {
1542     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1543       @Override
1544       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1545           throws IOException {
1546         oserver.postStartRegionOperation(ctx, op);
1547       }
1548     });
1549   }
1550 
1551   public void postCloseRegionOperation(final Operation op) throws IOException {
1552     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1553       @Override
1554       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1555           throws IOException {
1556         oserver.postCloseRegionOperation(ctx, op);
1557       }
1558     });
1559   }
1560 
1561   /**
1562    * @param fs fileystem to read from
1563    * @param p path to the file
1564    * @param in {@link FSDataInputStreamWrapper}
1565    * @param size Full size of the file
1566    * @param cacheConf
1567    * @param r original reference file. This will be not null only when reading a split file.
1568    * @return a Reader instance to use instead of the base reader if overriding
1569    * default behavior, null otherwise
1570    * @throws IOException
1571    */
1572   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1573       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1574       final Reference r) throws IOException {
1575     return execOperationWithResult(null,
1576         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1577       @Override
1578       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1579           throws IOException {
1580         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1581       }
1582     });
1583   }
1584 
1585   /**
1586    * @param fs fileystem to read from
1587    * @param p path to the file
1588    * @param in {@link FSDataInputStreamWrapper}
1589    * @param size Full size of the file
1590    * @param cacheConf
1591    * @param r original reference file. This will be not null only when reading a split file.
1592    * @param reader the base reader instance
1593    * @return The reader to use
1594    * @throws IOException
1595    */
1596   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1597       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1598       final Reference r, final StoreFile.Reader reader) throws IOException {
1599     return execOperationWithResult(reader,
1600         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1601       @Override
1602       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1603           throws IOException {
1604         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1605       }
1606     });
1607   }
1608 
1609   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1610       final Cell oldCell, Cell newCell) throws IOException {
1611     return execOperationWithResult(newCell,
1612         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1613       @Override
1614       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1615           throws IOException {
1616         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1617       }
1618     });
1619   }
1620 
1621   public Message preEndpointInvocation(final Service service, final String methodName,
1622       Message request) throws IOException {
1623     return execOperationWithResult(request,
1624         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1625       @Override
1626       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1627           throws IOException {
1628         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1629       }
1630     });
1631   }
1632 
1633   public void postEndpointInvocation(final Service service, final String methodName,
1634       final Message request, final Message.Builder responseBuilder) throws IOException {
1635     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1636       @Override
1637       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1638           throws IOException {
1639         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1640       }
1641     });
1642   }
1643 
1644   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1645     return execOperationWithResult(tracker,
1646         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1647       @Override
1648       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1649           throws IOException {
1650         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1651       }
1652     });
1653   }
1654 
1655   private static abstract class CoprocessorOperation
1656       extends ObserverContext<RegionCoprocessorEnvironment> {
1657     public abstract void call(Coprocessor observer,
1658         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1659     public abstract boolean hasCall(Coprocessor observer);
1660     public void postEnvCall(RegionEnvironment env) { }
1661   }
1662 
1663   private static abstract class RegionOperation extends CoprocessorOperation {
1664     public abstract void call(RegionObserver observer,
1665         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1666 
1667     public boolean hasCall(Coprocessor observer) {
1668       return observer instanceof RegionObserver;
1669     }
1670 
1671     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1672         throws IOException {
1673       call((RegionObserver)observer, ctx);
1674     }
1675   }
1676 
1677   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1678     private T result = null;
1679     public void setResult(final T result) { this.result = result; }
1680     public T getResult() { return this.result; }
1681   }
1682 
1683   private static abstract class EndpointOperation extends CoprocessorOperation {
1684     public abstract void call(EndpointObserver observer,
1685         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1686 
1687     public boolean hasCall(Coprocessor observer) {
1688       return observer instanceof EndpointObserver;
1689     }
1690 
1691     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1692         throws IOException {
1693       call((EndpointObserver)observer, ctx);
1694     }
1695   }
1696 
1697   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1698     private T result = null;
1699     public void setResult(final T result) { this.result = result; }
1700     public T getResult() { return this.result; }
1701   }
1702 
1703   private boolean execOperation(final CoprocessorOperation ctx)
1704       throws IOException {
1705     return execOperation(true, ctx);
1706   }
1707 
1708   private <T> T execOperationWithResult(final T defaultValue,
1709       final RegionOperationWithResult<T> ctx) throws IOException {
1710     if (ctx == null) return defaultValue;
1711     ctx.setResult(defaultValue);
1712     execOperation(true, ctx);
1713     return ctx.getResult();
1714   }
1715 
1716   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1717       final RegionOperationWithResult<T> ctx) throws IOException {
1718     boolean bypass = false;
1719     T result = defaultValue;
1720     if (ctx != null) {
1721       ctx.setResult(defaultValue);
1722       bypass = execOperation(true, ctx);
1723       result = ctx.getResult();
1724     }
1725     return bypass == ifBypass ? result : null;
1726   }
1727 
1728   private <T> T execOperationWithResult(final T defaultValue,
1729       final EndpointOperationWithResult<T> ctx) throws IOException {
1730     if (ctx == null) return defaultValue;
1731     ctx.setResult(defaultValue);
1732     execOperation(true, ctx);
1733     return ctx.getResult();
1734   }
1735 
1736   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1737       throws IOException {
1738     boolean bypass = false;
1739     List<RegionEnvironment> envs = coprocessors.get();
1740     for (int i = 0; i < envs.size(); i++) {
1741       RegionEnvironment env = envs.get(i);
1742       Coprocessor observer = env.getInstance();
1743       if (ctx.hasCall(observer)) {
1744         ctx.prepare(env);
1745         Thread currentThread = Thread.currentThread();
1746         ClassLoader cl = currentThread.getContextClassLoader();
1747         try {
1748           currentThread.setContextClassLoader(env.getClassLoader());
1749           ctx.call(observer, ctx);
1750         } catch (Throwable e) {
1751           handleCoprocessorThrowable(env, e);
1752         } finally {
1753           currentThread.setContextClassLoader(cl);
1754         }
1755         bypass |= ctx.shouldBypass();
1756         if (earlyExit && ctx.shouldComplete()) {
1757           break;
1758         }
1759       }
1760 
1761       ctx.postEnvCall(env);
1762     }
1763     return bypass;
1764   }
1765 }