View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.UUID;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.regex.Matcher;
31  
32  import com.google.common.collect.ImmutableList;
33  import com.google.common.collect.Lists;
34  import com.google.protobuf.Message;
35  import com.google.protobuf.Service;
36  
37  import org.apache.commons.collections.map.AbstractReferenceMap;
38  import org.apache.commons.collections.map.ReferenceMap;
39  import org.apache.commons.lang.ClassUtils;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.Cell;
48  import org.apache.hadoop.hbase.Coprocessor;
49  import org.apache.hadoop.hbase.CoprocessorEnvironment;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.client.Append;
56  import org.apache.hadoop.hbase.client.Delete;
57  import org.apache.hadoop.hbase.client.Durability;
58  import org.apache.hadoop.hbase.client.Get;
59  import org.apache.hadoop.hbase.client.Increment;
60  import org.apache.hadoop.hbase.client.Mutation;
61  import org.apache.hadoop.hbase.client.Put;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
69  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
71  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
72  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
73  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
74  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
75  import org.apache.hadoop.hbase.io.Reference;
76  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
77  import org.apache.hadoop.hbase.ipc.RpcServer;
78  import org.apache.hadoop.hbase.regionserver.Region.Operation;
79  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
80  import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
81  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
82  import org.apache.hadoop.hbase.security.User;
83  import org.apache.hadoop.hbase.wal.WALKey;
84  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
85  import org.apache.hadoop.hbase.util.Bytes;
86  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
87  import org.apache.hadoop.hbase.util.Pair;
88
89  /**
90   * Implements the coprocessor environment and runtime support for coprocessors
91   * loaded within a {@link Region}.
92   */
93  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
94  @InterfaceStability.Evolving
95  public class RegionCoprocessorHost
96      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
97
98    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
99    // The shared data map
100   private static ReferenceMap sharedDataMap =
101       new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
102
103   // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
104   private final boolean hasCustomPostScannerFilterRow;
105
106   /**
107    * 
108    * Encapsulation of the environment of each coprocessor
109    */
110   static class RegionEnvironment extends CoprocessorHost.Environment
111       implements RegionCoprocessorEnvironment {
112
113     private Region region;
114     private RegionServerServices rsServices;
115     ConcurrentMap<String, Object> sharedData;
116     private final boolean useLegacyPre;
117     private final boolean useLegacyPost;
118
119     /**
120      * Constructor
121      * @param impl the coprocessor instance
122      * @param priority chaining priority
123      */
124     public RegionEnvironment(final Coprocessor impl, final int priority,
125         final int seq, final Configuration conf, final Region region,
126         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
127       super(impl, priority, seq, conf);
128       this.region = region;
129       this.rsServices = services;
130       this.sharedData = sharedData;
131       // Pick which version of the WAL related events we'll call.
132       // This way we avoid calling the new version on older RegionObservers so
133       // we can maintain binary compatibility.
134       // See notes in javadoc for RegionObserver
135       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
136           HRegionInfo.class, WALKey.class, WALEdit.class);
137       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
138           HRegionInfo.class, WALKey.class, WALEdit.class);
139     }
140
141     /** @return the region */
142     @Override
143     public Region getRegion() {
144       return region;
145     }
146
147     /** @return reference to the region server services */
148     @Override
149     public RegionServerServices getRegionServerServices() {
150       return rsServices;
151     }
152
153     public void shutdown() {
154       super.shutdown();
155     }
156
157     @Override
158     public ConcurrentMap<String, Object> getSharedData() {
159       return sharedData;
160     }
161
162     @Override
163     public HRegionInfo getRegionInfo() {
164       return region.getRegionInfo();
165     }
166
167   }
168
169   static class TableCoprocessorAttribute {
170     private Path path;
171     private String className;
172     private int priority;
173     private Configuration conf;
174
175     public TableCoprocessorAttribute(Path path, String className, int priority,
176         Configuration conf) {
177       this.path = path;
178       this.className = className;
179       this.priority = priority;
180       this.conf = conf;
181     }
182
183     public Path getPath() {
184       return path;
185     }
186
187     public String getClassName() {
188       return className;
189     }
190
191     public int getPriority() {
192       return priority;
193     }
194
195     public Configuration getConf() {
196       return conf;
197     }
198   }
199
200   /** The region server services */
201   RegionServerServices rsServices;
202   /** The region */
203   Region region;
204
205   /**
206    * Constructor
207    * @param region the region
208    * @param rsServices interface to available region server functionality
209    * @param conf the configuration
210    */
211   public RegionCoprocessorHost(final Region region,
212       final RegionServerServices rsServices, final Configuration conf) {
213     super(rsServices);
214     this.conf = conf;
215     this.rsServices = rsServices;
216     this.region = region;
217     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
218
219     // load system default cp's from configuration.
220     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
221
222     // load system default cp's for user tables from configuration.
223     if (!region.getRegionInfo().getTable().isSystemTable()) {
224       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
225     }
226
227     // load Coprocessor From HDFS
228     loadTableCoprocessors(conf);
229
230     // now check whether any coprocessor implements postScannerFilterRow
231     boolean hasCustomPostScannerFilterRow = false;
232     out: for (RegionEnvironment env: coprocessors) {
233       if (env.getInstance() instanceof RegionObserver) {
234         Class<?> clazz = env.getInstance().getClass();
235         for(;;) {
236           if (clazz == null) {
237             // we must have directly implemented RegionObserver
238             hasCustomPostScannerFilterRow = true;
239             break out;
240           }
241           if (clazz == BaseRegionObserver.class) {
242             // we reached BaseRegionObserver, try next coprocessor
243             break;
244           }
245           try {
246             clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
247               InternalScanner.class, Cell.class, boolean.class);
248             // this coprocessor has a custom version of postScannerFilterRow
249             hasCustomPostScannerFilterRow = true;
250             break out;
251           } catch (NoSuchMethodException ignore) {
252           }
253           // the deprecated signature still exists
254           try {
255             clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
256               InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
257             // this coprocessor has a custom version of postScannerFilterRow
258             hasCustomPostScannerFilterRow = true;
259             break out;
260           } catch (NoSuchMethodException ignore) {
261           }
262           clazz = clazz.getSuperclass();
263         }
264       }
265     }
266     this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
267   }
268
269   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
270       HTableDescriptor htd) {
271     List<TableCoprocessorAttribute> result = Lists.newArrayList();
272     for (Map.Entry<Bytes, Bytes> e: htd.getValues().entrySet()) {
273       String key = Bytes.toString(e.getKey().get()).trim();
274       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
275         String spec = Bytes.toString(e.getValue().get()).trim();
276         // found one
277         try {
278           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
279           if (matcher.matches()) {
280             // jar file path can be empty if the cp class can be loaded
281             // from class loader.
282             Path path = matcher.group(1).trim().isEmpty() ?
283                 null : new Path(matcher.group(1).trim());
284             String className = matcher.group(2).trim();
285             if (className.isEmpty()) {
286               LOG.error("Malformed table coprocessor specification: key=" +
287                 key + ", spec: " + spec);
288               continue;
289             }
290             String priorityStr = matcher.group(3).trim();
291             int priority = priorityStr.isEmpty() ?
292                 Coprocessor.PRIORITY_USER : Integer.parseInt(priorityStr);
293             String cfgSpec = null;
294             try {
295               cfgSpec = matcher.group(4);
296             } catch (IndexOutOfBoundsException ex) {
297               // ignore
298             }
299             Configuration ourConf;
300             if (cfgSpec != null && !cfgSpec.trim().equals("|")) {
301               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
302               // do an explicit deep copy of the passed configuration
303               ourConf = new Configuration(false);
304               HBaseConfiguration.merge(ourConf, conf);
305               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
306               while (m.find()) {
307                 ourConf.set(m.group(1), m.group(2));
308               }
309             } else {
310               ourConf = conf;
311             }
312             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
313           } else {
314             LOG.error("Malformed table coprocessor specification: key=" + key +
315               ", spec: " + spec);
316           }
317         } catch (Exception ioe) {
318           LOG.error("Malformed table coprocessor specification: key=" + key +
319             ", spec: " + spec);
320         }
321       }
322     }
323     return result;
324   }
325
326   /**
327    * Sanity check the table coprocessor attributes of the supplied schema. Will
328    * throw an exception if there is a problem.
329    * @param conf
330    * @param htd
331    * @throws IOException
332    */
333   public static void testTableCoprocessorAttrs(final Configuration conf,
334       final HTableDescriptor htd) throws IOException {
335     String pathPrefix = UUID.randomUUID().toString();
336     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
337       if (attr.getPriority() < 0) {
338         throw new IOException("Priority for coprocessor " + attr.getClassName() +
339           " cannot be less than 0");
340       }
341       ClassLoader old = Thread.currentThread().getContextClassLoader();
342       try {
343         ClassLoader cl;
344         if (attr.getPath() != null) {
345           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
346             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
347         } else {
348           cl = CoprocessorHost.class.getClassLoader();
349         }
350         Thread.currentThread().setContextClassLoader(cl);
351         cl.loadClass(attr.getClassName());
352       } catch (ClassNotFoundException e) {
353         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
354       } finally {
355         Thread.currentThread().setContextClassLoader(old);
356       }
357     }
358   }
359
360   void loadTableCoprocessors(final Configuration conf) {
361     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
362       DEFAULT_COPROCESSORS_ENABLED);
363     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
364       DEFAULT_USER_COPROCESSORS_ENABLED);
365     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
366       return;
367     }
368
369     // scan the table attributes for coprocessor load specifications
370     // initialize the coprocessors
371     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
372     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
373         region.getTableDesc())) {
374       // Load encompasses classloading and coprocessor initialization
375       try {
376         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
377           attr.getConf());
378         configured.add(env);
379         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
380             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
381       } catch (Throwable t) {
382         // Coprocessor failed to load, do we abort on error?
383         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
384           abortServer(attr.getClassName(), t);
385         } else {
386           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
387         }
388       }
389     }
390     // add together to coprocessor set for COW efficiency
391     coprocessors.addAll(configured);
392   }
393
394   @Override
395   public RegionEnvironment createEnvironment(Class<?> implClass,
396       Coprocessor instance, int priority, int seq, Configuration conf) {
397     // Check if it's an Endpoint.
398     // Due to current dynamic protocol design, Endpoint
399     // uses a different way to be registered and executed.
400     // It uses a visitor pattern to invoke registered Endpoint
401     // method.
402     for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
403       Class<?> c = (Class<?>) itf;
404       if (CoprocessorService.class.isAssignableFrom(c)) {
405         region.registerService( ((CoprocessorService)instance).getService() );
406       }
407     }
408     ConcurrentMap<String, Object> classData;
409     // make sure only one thread can add maps
410     synchronized (sharedDataMap) {
411       // as long as at least one RegionEnvironment holds on to its classData it will
412       // remain in this map
413       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
414       if (classData == null) {
415         classData = new ConcurrentHashMap<String, Object>();
416         sharedDataMap.put(implClass.getName(), classData);
417       }
418     }
419     return new RegionEnvironment(instance, priority, seq, conf, region,
420         rsServices, classData);
421   }
422
423   /**
424    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
425    *
426    * For example, {@link
427    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
428    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
429    *
430    * See also
431    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
432    *    CoprocessorEnvironment, Throwable)}
433    * @param env The coprocessor that threw the exception.
434    * @param e The exception that was thrown.
435    */
436   private void handleCoprocessorThrowableNoRethrow(
437       final CoprocessorEnvironment env, final Throwable e) {
438     try {
439       handleCoprocessorThrowable(env,e);
440     } catch (IOException ioe) {
441       // We cannot throw exceptions from the caller hook, so ignore.
442       LOG.warn(
443         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
444         e + ". Ignoring.",e);
445     }
446   }
447
448   /**
449    * Invoked before a region open.
450    *
451    * @throws IOException Signals that an I/O exception has occurred.
452    */
453   public void preOpen() throws IOException {
454     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
455       @Override
456       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
457           throws IOException {
458         oserver.preOpen(ctx);
459       }
460     });
461   }
462
463   /**
464    * Invoked after a region open
465    */
466   public void postOpen() {
467     try {
468       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
469         @Override
470         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
471             throws IOException {
472           oserver.postOpen(ctx);
473         }
474       });
475     } catch (IOException e) {
476       LOG.warn(e);
477     }
478   }
479
480   /**
481    * Invoked after log replay on region
482    */
483   public void postLogReplay() {
484     try {
485       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
486         @Override
487         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
488             throws IOException {
489           oserver.postLogReplay(ctx);
490         }
491       });
492     } catch (IOException e) {
493       LOG.warn(e);
494     }
495   }
496
497   /**
498    * Invoked before a region is closed
499    * @param abortRequested true if the server is aborting
500    */
501   public void preClose(final boolean abortRequested) throws IOException {
502     execOperation(false, new RegionOperation() {
503       @Override
504       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
505           throws IOException {
506         oserver.preClose(ctx, abortRequested);
507       }
508     });
509   }
510
511   /**
512    * Invoked after a region is closed
513    * @param abortRequested true if the server is aborting
514    */
515   public void postClose(final boolean abortRequested) {
516     try {
517       execOperation(false, new RegionOperation() {
518         @Override
519         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
520             throws IOException {
521           oserver.postClose(ctx, abortRequested);
522         }
523         public void postEnvCall(RegionEnvironment env) {
524           shutdown(env);
525         }
526       });
527     } catch (IOException e) {
528       LOG.warn(e);
529     }
530   }
531
532   /**
533    * See
534    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
535    */
536   public InternalScanner preCompactScannerOpen(final Store store,
537       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
538       final CompactionRequest request, final User user) throws IOException {
539     return execOperationWithResult(null,
540         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
541       @Override
542       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
543           throws IOException {
544         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
545           earliestPutTs, getResult(), request));
546       }
547     });
548   }
549
550   /**
551    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
552    * available candidates.
553    * @param store The store where compaction is being requested
554    * @param candidates The currently available store files
555    * @param request custom compaction request
556    * @return If {@code true}, skip the normal selection process and use the current list
557    * @throws IOException
558    */
559   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
560       final CompactionRequest request, final User user) throws IOException {
561     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
562       @Override
563       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
564           throws IOException {
565         oserver.preCompactSelection(ctx, store, candidates, request);
566       }
567     });
568   }
569
570   /**
571    * Called after the {@link StoreFile}s to be compacted have been selected from the available
572    * candidates.
573    * @param store The store where compaction is being requested
574    * @param selected The store files selected to compact
575    * @param request custom compaction
576    */
577   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
578       final CompactionRequest request, final User user) {
579     try {
580       execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
581         @Override
582         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
583             throws IOException {
584           oserver.postCompactSelection(ctx, store, selected, request);
585         }
586       });
587     } catch (IOException e) {
588       LOG.warn(e);
589     }
590   }
591
592   /**
593    * Called prior to rewriting the store files selected for compaction
594    * @param store the store being compacted
595    * @param scanner the scanner used to read store data during compaction
596    * @param scanType type of Scan
597    * @param request the compaction that will be executed
598    * @throws IOException
599    */
600   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
601       final ScanType scanType, final CompactionRequest request, final User user)
602       throws IOException {
603     return execOperationWithResult(false, scanner,
604         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) {
605       @Override
606       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
607           throws IOException {
608         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
609       }
610     });
611   }
612
613   /**
614    * Called after the store compaction has completed.
615    * @param store the store being compacted
616    * @param resultFile the new store file written during compaction
617    * @param request the compaction that is being executed
618    * @throws IOException
619    */
620   public void postCompact(final Store store, final StoreFile resultFile,
621       final CompactionRequest request, final User user) throws IOException {
622     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
623       @Override
624       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
625           throws IOException {
626         oserver.postCompact(ctx, store, resultFile, request);
627       }
628     });
629   }
630
631   /**
632    * Invoked before a memstore flush
633    * @throws IOException
634    */
635   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
636       throws IOException {
637     return execOperationWithResult(false, scanner,
638         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
639       @Override
640       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
641           throws IOException {
642         setResult(oserver.preFlush(ctx, store, getResult()));
643       }
644     });
645   }
646
647   /**
648    * Invoked before a memstore flush
649    * @throws IOException
650    */
651   public void preFlush() throws IOException {
652     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
653       @Override
654       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
655           throws IOException {
656         oserver.preFlush(ctx);
657       }
658     });
659   }
660
661   /**
662    * See
663    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
664    *    Store, KeyValueScanner, InternalScanner)}
665    */
666   public InternalScanner preFlushScannerOpen(final Store store,
667       final KeyValueScanner memstoreScanner) throws IOException {
668     return execOperationWithResult(null,
669         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
670       @Override
671       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
672           throws IOException {
673         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
674       }
675     });
676   }
677
678   /**
679    * Invoked after a memstore flush
680    * @throws IOException
681    */
682   public void postFlush() throws IOException {
683     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
684       @Override
685       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
686           throws IOException {
687         oserver.postFlush(ctx);
688       }
689     });
690   }
691
692   /**
693    * Invoked after a memstore flush
694    * @throws IOException
695    */
696   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
697     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
698       @Override
699       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
700           throws IOException {
701         oserver.postFlush(ctx, store, storeFile);
702       }
703     });
704   }
705
706   /**
707    * Invoked just before a split
708    * @throws IOException
709    */
710   // TODO: Deprecate this
711   public void preSplit(final User user) throws IOException {
712     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
713       @Override
714       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
715           throws IOException {
716         oserver.preSplit(ctx);
717       }
718     });
719   }
720
721   /**
722    * Invoked just before a split
723    * @throws IOException
724    */
725   public void preSplit(final byte[] splitRow, final User user) throws IOException {
726     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
727       @Override
728       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
729           throws IOException {
730         oserver.preSplit(ctx, splitRow);
731       }
732     });
733   }
734
735   /**
736    * Invoked just after a split
737    * @param l the new left-hand daughter region
738    * @param r the new right-hand daughter region
739    * @throws IOException
740    */
741   public void postSplit(final Region l, final Region r, final User user) throws IOException {
742     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
743       @Override
744       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
745           throws IOException {
746         oserver.postSplit(ctx, l, r);
747       }
748     });
749   }
750
751   public boolean preSplitBeforePONR(final byte[] splitKey,
752       final List<Mutation> metaEntries, final User user) throws IOException {
753     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
754       @Override
755       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
756           throws IOException {
757         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
758       }
759     });
760   }
761
762   public void preSplitAfterPONR(final User user) throws IOException {
763     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
764       @Override
765       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
766           throws IOException {
767         oserver.preSplitAfterPONR(ctx);
768       }
769     });
770   }
771
772   /**
773    * Invoked just before the rollback of a failed split is started
774    * @throws IOException
775    */
776   public void preRollBackSplit(final User user) throws IOException {
777     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
778       @Override
779       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
780           throws IOException {
781         oserver.preRollBackSplit(ctx);
782       }
783     });
784   }
785
786   /**
787    * Invoked just after the rollback of a failed split is done
788    * @throws IOException
789    */
790   public void postRollBackSplit(final User user) throws IOException {
791     execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) {
792       @Override
793       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
794           throws IOException {
795         oserver.postRollBackSplit(ctx);
796       }
797     });
798   }
799
800   /**
801    * Invoked after a split is completed irrespective of a failure or success.
802    * @throws IOException
803    */
804   public void postCompleteSplit() throws IOException {
805     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
806       @Override
807       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
808           throws IOException {
809         oserver.postCompleteSplit(ctx);
810       }
811     });
812   }
813
814   // RegionObserver support
815
816   /**
817    * @param get the Get request
818    * @return true if default processing should be bypassed
819    * @exception IOException Exception
820    */
821   public boolean preGet(final Get get, final List<Cell> results)
822       throws IOException {
823     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
824       @Override
825       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
826           throws IOException {
827         oserver.preGetOp(ctx, get, results);
828       }
829     });
830   }
831
832   /**
833    * @param get the Get request
834    * @param results the result sett
835    * @exception IOException Exception
836    */
837   public void postGet(final Get get, final List<Cell> results)
838       throws IOException {
839     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
840       @Override
841       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
842           throws IOException {
843         oserver.postGetOp(ctx, get, results);
844       }
845     });
846   }
847
848   /**
849    * @param get the Get request
850    * @return true or false to return to client if bypassing normal operation,
851    * or null otherwise
852    * @exception IOException Exception
853    */
854   public Boolean preExists(final Get get) throws IOException {
855     return execOperationWithResult(true, false,
856         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
857       @Override
858       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
859           throws IOException {
860         setResult(oserver.preExists(ctx, get, getResult()));
861       }
862     });
863   }
864
865   /**
866    * @param get the Get request
867    * @param exists the result returned by the region server
868    * @return the result to return to the client
869    * @exception IOException Exception
870    */
871   public boolean postExists(final Get get, boolean exists)
872       throws IOException {
873     return execOperationWithResult(exists,
874         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
875       @Override
876       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
877           throws IOException {
878         setResult(oserver.postExists(ctx, get, getResult()));
879       }
880     });
881   }
882
883   /**
884    * @param put The Put object
885    * @param edit The WALEdit object.
886    * @param durability The durability used
887    * @return true if default processing should be bypassed
888    * @exception IOException Exception
889    */
890   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
891       throws IOException {
892     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
893       @Override
894       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
895           throws IOException {
896         oserver.prePut(ctx, put, edit, durability);
897       }
898     });
899   }
900
901   /**
902    * @param mutation - the current mutation
903    * @param kv - the current cell
904    * @param byteNow - current timestamp in bytes
905    * @param get - the get that could be used
906    * Note that the get only does not specify the family and qualifier that should be used
907    * @return true if default processing should be bypassed
908    * @exception IOException
909    *              Exception
910    */
911   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
912       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
913     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
914       @Override
915       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
916           throws IOException {
917         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
918       }
919     });
920   }
921
922   /**
923    * @param put The Put object
924    * @param edit The WALEdit object.
925    * @param durability The durability used
926    * @exception IOException Exception
927    */
928   public void postPut(final Put put, final WALEdit edit, final Durability durability)
929       throws IOException {
930     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
931       @Override
932       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
933           throws IOException {
934         oserver.postPut(ctx, put, edit, durability);
935       }
936     });
937   }
938
939   /**
940    * @param delete The Delete object
941    * @param edit The WALEdit object.
942    * @param durability The durability used
943    * @return true if default processing should be bypassed
944    * @exception IOException Exception
945    */
946   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
947       throws IOException {
948     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
949       @Override
950       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
951           throws IOException {
952         oserver.preDelete(ctx, delete, edit, durability);
953       }
954     });
955   }
956
957   /**
958    * @param delete The Delete object
959    * @param edit The WALEdit object.
960    * @param durability The durability used
961    * @exception IOException Exception
962    */
963   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
964       throws IOException {
965     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
966       @Override
967       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
968           throws IOException {
969         oserver.postDelete(ctx, delete, edit, durability);
970       }
971     });
972   }
973
974   /**
975    * @param miniBatchOp
976    * @return true if default processing should be bypassed
977    * @throws IOException
978    */
979   public boolean preBatchMutate(
980       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
981     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
982       @Override
983       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
984           throws IOException {
985         oserver.preBatchMutate(ctx, miniBatchOp);
986       }
987     });
988   }
989
990   /**
991    * @param miniBatchOp
992    * @throws IOException
993    */
994   public void postBatchMutate(
995       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
996     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
997       @Override
998       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
999           throws IOException {
1000         oserver.postBatchMutate(ctx, miniBatchOp);
1001       }
1002     });
1003   }
1004
1005   public void postBatchMutateIndispensably(
1006       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1007       throws IOException {
1008     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1009       @Override
1010       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1011           throws IOException {
1012         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1013       }
1014     });
1015   }
1016
1017   /**
1018    * @param row row to check
1019    * @param family column family
1020    * @param qualifier column qualifier
1021    * @param compareOp the comparison operation
1022    * @param comparator the comparator
1023    * @param put data to put if check succeeds
1024    * @return true or false to return to client if default processing should
1025    * be bypassed, or null otherwise
1026    * @throws IOException e
1027    */
1028   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1029       final byte [] qualifier, final CompareOp compareOp,
1030       final ByteArrayComparable comparator, final Put put)
1031       throws IOException {
1032     return execOperationWithResult(true, false,
1033         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1034       @Override
1035       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1036           throws IOException {
1037         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1038           compareOp, comparator, put, getResult()));
1039       }
1040     });
1041   }
1042
1043   /**
1044    * @param row row to check
1045    * @param family column family
1046    * @param qualifier column qualifier
1047    * @param compareOp the comparison operation
1048    * @param comparator the comparator
1049    * @param put data to put if check succeeds
1050    * @return true or false to return to client if default processing should
1051    * be bypassed, or null otherwise
1052    * @throws IOException e
1053    */
1054   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1055       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1056       final Put put) throws IOException {
1057     return execOperationWithResult(true, false,
1058         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1059       @Override
1060       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1061           throws IOException {
1062         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1063           compareOp, comparator, put, getResult()));
1064       }
1065     });
1066   }
1067
1068   /**
1069    * @param row row to check
1070    * @param family column family
1071    * @param qualifier column qualifier
1072    * @param compareOp the comparison operation
1073    * @param comparator the comparator
1074    * @param put data to put if check succeeds
1075    * @throws IOException e
1076    */
1077   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1078       final byte [] qualifier, final CompareOp compareOp,
1079       final ByteArrayComparable comparator, final Put put,
1080       boolean result) throws IOException {
1081     return execOperationWithResult(result,
1082         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1083       @Override
1084       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1085           throws IOException {
1086         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1087           compareOp, comparator, put, getResult()));
1088       }
1089     });
1090   }
1091
1092   /**
1093    * @param row row to check
1094    * @param family column family
1095    * @param qualifier column qualifier
1096    * @param compareOp the comparison operation
1097    * @param comparator the comparator
1098    * @param delete delete to commit if check succeeds
1099    * @return true or false to return to client if default processing should
1100    * be bypassed, or null otherwise
1101    * @throws IOException e
1102    */
1103   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1104       final byte [] qualifier, final CompareOp compareOp,
1105       final ByteArrayComparable comparator, final Delete delete)
1106       throws IOException {
1107     return execOperationWithResult(true, false,
1108         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1109       @Override
1110       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1111           throws IOException {
1112         setResult(oserver.preCheckAndDelete(ctx, row, family,
1113             qualifier, compareOp, comparator, delete, getResult()));
1114       }
1115     });
1116   }
1117
1118   /**
1119    * @param row row to check
1120    * @param family column family
1121    * @param qualifier column qualifier
1122    * @param compareOp the comparison operation
1123    * @param comparator the comparator
1124    * @param delete delete to commit if check succeeds
1125    * @return true or false to return to client if default processing should
1126    * be bypassed, or null otherwise
1127    * @throws IOException e
1128    */
1129   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1130       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1131       final Delete delete) throws IOException {
1132     return execOperationWithResult(true, false,
1133         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1134       @Override
1135       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1136           throws IOException {
1137         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1138               family, qualifier, compareOp, comparator, delete, getResult()));
1139       }
1140     });
1141   }
1142
1143   /**
1144    * @param row row to check
1145    * @param family column family
1146    * @param qualifier column qualifier
1147    * @param compareOp the comparison operation
1148    * @param comparator the comparator
1149    * @param delete delete to commit if check succeeds
1150    * @throws IOException e
1151    */
1152   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1153       final byte [] qualifier, final CompareOp compareOp,
1154       final ByteArrayComparable comparator, final Delete delete,
1155       boolean result) throws IOException {
1156     return execOperationWithResult(result,
1157         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1158       @Override
1159       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1160           throws IOException {
1161         setResult(oserver.postCheckAndDelete(ctx, row, family,
1162             qualifier, compareOp, comparator, delete, getResult()));
1163       }
1164     });
1165   }
1166
1167   /**
1168    * @param append append object
1169    * @return result to return to client if default operation should be
1170    * bypassed, null otherwise
1171    * @throws IOException if an error occurred on the coprocessor
1172    */
1173   public Result preAppend(final Append append) throws IOException {
1174     return execOperationWithResult(true, null,
1175         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1176       @Override
1177       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1178           throws IOException {
1179         setResult(oserver.preAppend(ctx, append));
1180       }
1181     });
1182   }
1183
1184   /**
1185    * @param append append object
1186    * @return result to return to client if default operation should be
1187    * bypassed, null otherwise
1188    * @throws IOException if an error occurred on the coprocessor
1189    */
1190   public Result preAppendAfterRowLock(final Append append) throws IOException {
1191     return execOperationWithResult(true, null,
1192         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1193       @Override
1194       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1195           throws IOException {
1196         setResult(oserver.preAppendAfterRowLock(ctx, append));
1197       }
1198     });
1199   }
1200
1201   /**
1202    * @param increment increment object
1203    * @return result to return to client if default operation should be
1204    * bypassed, null otherwise
1205    * @throws IOException if an error occurred on the coprocessor
1206    */
1207   public Result preIncrement(final Increment increment) throws IOException {
1208     return execOperationWithResult(true, null,
1209         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1210       @Override
1211       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1212           throws IOException {
1213         setResult(oserver.preIncrement(ctx, increment));
1214       }
1215     });
1216   }
1217
1218   /**
1219    * @param increment increment object
1220    * @return result to return to client if default operation should be
1221    * bypassed, null otherwise
1222    * @throws IOException if an error occurred on the coprocessor
1223    */
1224   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1225     return execOperationWithResult(true, null,
1226         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1227       @Override
1228       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1229           throws IOException {
1230         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1231       }
1232     });
1233   }
1234
1235   /**
1236    * @param append Append object
1237    * @param result the result returned by the append
1238    * @throws IOException if an error occurred on the coprocessor
1239    */
1240   public void postAppend(final Append append, final Result result) throws IOException {
1241     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1242       @Override
1243       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1244           throws IOException {
1245         oserver.postAppend(ctx, append, result);
1246       }
1247     });
1248   }
1249
1250   /**
1251    * @param increment increment object
1252    * @param result the result returned by postIncrement
1253    * @throws IOException if an error occurred on the coprocessor
1254    */
1255   public Result postIncrement(final Increment increment, Result result) throws IOException {
1256     return execOperationWithResult(result,
1257         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1258       @Override
1259       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1260           throws IOException {
1261         setResult(oserver.postIncrement(ctx, increment, getResult()));
1262       }
1263     });
1264   }
1265
1266   /**
1267    * @param scan the Scan specification
1268    * @return scanner id to return to client if default operation should be
1269    * bypassed, false otherwise
1270    * @exception IOException Exception
1271    */
1272   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1273     return execOperationWithResult(true, null,
1274         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1275       @Override
1276       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1277           throws IOException {
1278         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1279       }
1280     });
1281   }
1282
1283   /**
1284    * See
1285    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1286    *    Store, Scan, NavigableSet, KeyValueScanner)}
1287    */
1288   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1289       final NavigableSet<byte[]> targetCols, final long readPt) throws IOException {
1290     return execOperationWithResult(null,
1291         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1292       @Override
1293       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1294           throws IOException {
1295         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt));
1296       }
1297     });
1298   }
1299
1300   /**
1301    * @param scan the Scan specification
1302    * @param s the scanner
1303    * @return the scanner instance to use
1304    * @exception IOException Exception
1305    */
1306   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1307     return execOperationWithResult(s,
1308         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1309       @Override
1310       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1311           throws IOException {
1312         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1313       }
1314     });
1315   }
1316
1317   /**
1318    * @param s the scanner
1319    * @param results the result set returned by the region server
1320    * @param limit the maximum number of results to return
1321    * @return 'has next' indication to client if bypassing default behavior, or
1322    * null otherwise
1323    * @exception IOException Exception
1324    */
1325   public Boolean preScannerNext(final InternalScanner s,
1326       final List<Result> results, final int limit) throws IOException {
1327     return execOperationWithResult(true, false,
1328         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1329       @Override
1330       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1331           throws IOException {
1332         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1333       }
1334     });
1335   }
1336
1337   /**
1338    * @param s the scanner
1339    * @param results the result set returned by the region server
1340    * @param limit the maximum number of results to return
1341    * @param hasMore
1342    * @return 'has more' indication to give to client
1343    * @exception IOException Exception
1344    */
1345   public boolean postScannerNext(final InternalScanner s,
1346       final List<Result> results, final int limit, boolean hasMore)
1347       throws IOException {
1348     return execOperationWithResult(hasMore,
1349         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1350       @Override
1351       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1352           throws IOException {
1353         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1354       }
1355     });
1356   }
1357
1358   /**
1359    * This will be called by the scan flow when the current scanned row is being filtered out by the
1360    * filter.
1361    * @param s the scanner
1362    * @param curRowCell The cell in the current row which got filtered out
1363    * @return whether more rows are available for the scanner or not
1364    * @throws IOException
1365    */
1366   public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1367       throws IOException {
1368     // short circuit for performance
1369     if (!hasCustomPostScannerFilterRow) return true;
1370     return execOperationWithResult(true,
1371         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1372       @Override
1373       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1374           throws IOException {
1375         setResult(oserver.postScannerFilterRow(ctx, s, curRowCell, getResult()));
1376       }
1377     });
1378   }
1379
1380   /**
1381    * @param s the scanner
1382    * @return true if default behavior should be bypassed, false otherwise
1383    * @exception IOException Exception
1384    */
1385   public boolean preScannerClose(final InternalScanner s) throws IOException {
1386     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1387       @Override
1388       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1389           throws IOException {
1390         oserver.preScannerClose(ctx, s);
1391       }
1392     });
1393   }
1394
1395   /**
1396    * @exception IOException Exception
1397    */
1398   public void postScannerClose(final InternalScanner s) throws IOException {
1399     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1400       @Override
1401       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1402           throws IOException {
1403         oserver.postScannerClose(ctx, s);
1404       }
1405     });
1406   }
1407
1408   /**
1409    * @param info the RegionInfo for this region
1410    * @param edits the file of recovered edits
1411    * @throws IOException Exception
1412    */
1413   public void preReplayWALs(final HRegionInfo info, final Path edits) throws IOException {
1414     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1415       @Override
1416       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1417         throws IOException {
1418         oserver.preReplayWALs(ctx, info, edits);
1419       }
1420     });
1421   }
1422
1423   /**
1424    * @param info the RegionInfo for this region
1425    * @param edits the file of recovered edits
1426    * @throws IOException Exception
1427    */
1428   public void postReplayWALs(final HRegionInfo info, final Path edits) throws IOException {
1429     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1430       @Override
1431       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1432         throws IOException {
1433         oserver.postReplayWALs(ctx, info, edits);
1434       }
1435     });
1436   }
1437
1438   /**
1439    * @param info
1440    * @param logKey
1441    * @param logEdit
1442    * @return true if default behavior should be bypassed, false otherwise
1443    * @throws IOException
1444    */
1445   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1446       final WALEdit logEdit) throws IOException {
1447     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1448       @Override
1449       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1450           throws IOException {
1451         // Once we don't need to support the legacy call, replace RegionOperation with a version
1452         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1453         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1454         if (env.useLegacyPre) {
1455           if (logKey instanceof HLogKey) {
1456             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1457           } else {
1458             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1459           }
1460         } else {
1461           oserver.preWALRestore(ctx, info, logKey, logEdit);
1462         }
1463       }
1464     });
1465   }
1466
1467   /**
1468    * @return true if default behavior should be bypassed, false otherwise
1469    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}; as of 2.0, remove in 3.0
1470    */
1471   @Deprecated
1472   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1473       final WALEdit logEdit) throws IOException {
1474     return preWALRestore(info, (WALKey)logKey, logEdit);
1475   }
1476
1477   /**
1478    * @param info
1479    * @param logKey
1480    * @param logEdit
1481    * @throws IOException
1482    */
1483   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1484       throws IOException {
1485     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1486       @Override
1487       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1488           throws IOException {
1489         // Once we don't need to support the legacy call, replace RegionOperation with a version
1490         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1491         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1492         if (env.useLegacyPost) {
1493           if (logKey instanceof HLogKey) {
1494             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1495           } else {
1496             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1497           }
1498         } else {
1499           oserver.postWALRestore(ctx, info, logKey, logEdit);
1500         }
1501       }
1502     });
1503   }
1504
1505   /**
1506    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}; as of 2.0, remove in 3.0
1507    */
1508   @Deprecated
1509   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1510       throws IOException {
1511     postWALRestore(info, (WALKey)logKey, logEdit);
1512   }
1513
1514   /**
1515    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1516    * @return true if the default operation should be bypassed
1517    * @throws IOException
1518    */
1519   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1520     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1521       @Override
1522       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1523           throws IOException {
1524         oserver.preBulkLoadHFile(ctx, familyPaths);
1525       }
1526     });
1527   }
1528
1529   /**
1530    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1531    * @param hasLoaded whether load was successful or not
1532    * @return the possibly modified value of hasLoaded
1533    * @throws IOException
1534    */
1535   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1536       boolean hasLoaded) throws IOException {
1537     return execOperationWithResult(hasLoaded,
1538         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1539       @Override
1540       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1541           throws IOException {
1542         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1543       }
1544     });
1545   }
1546
1547   public void postStartRegionOperation(final Operation op) throws IOException {
1548     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1549       @Override
1550       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1551           throws IOException {
1552         oserver.postStartRegionOperation(ctx, op);
1553       }
1554     });
1555   }
1556
1557   public void postCloseRegionOperation(final Operation op) throws IOException {
1558     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1559       @Override
1560       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1561           throws IOException {
1562         oserver.postCloseRegionOperation(ctx, op);
1563       }
1564     });
1565   }
1566
1567   /**
1568    * @param fs fileystem to read from
1569    * @param p path to the file
1570    * @param in {@link FSDataInputStreamWrapper}
1571    * @param size Full size of the file
1572    * @param cacheConf
1573    * @param r original reference file. This will be not null only when reading a split file.
1574    * @return a Reader instance to use instead of the base reader if overriding
1575    * default behavior, null otherwise
1576    * @throws IOException
1577    */
1578   public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1579       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1580       final Reference r) throws IOException {
1581     return execOperationWithResult(null,
1582         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
1583       @Override
1584       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1585           throws IOException {
1586         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1587       }
1588     });
1589   }
1590
1591   /**
1592    * @param fs fileystem to read from
1593    * @param p path to the file
1594    * @param in {@link FSDataInputStreamWrapper}
1595    * @param size Full size of the file
1596    * @param cacheConf
1597    * @param r original reference file. This will be not null only when reading a split file.
1598    * @param reader the base reader instance
1599    * @return The reader to use
1600    * @throws IOException
1601    */
1602   public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1603       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1604       final Reference r, final StoreFileReader reader) throws IOException {
1605     return execOperationWithResult(reader,
1606         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFileReader>() {
1607       @Override
1608       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1609           throws IOException {
1610         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1611       }
1612     });
1613   }
1614
1615   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1616       final Cell oldCell, Cell newCell) throws IOException {
1617     return execOperationWithResult(newCell,
1618         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1619       @Override
1620       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1621           throws IOException {
1622         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1623       }
1624     });
1625   }
1626
1627   public Message preEndpointInvocation(final Service service, final String methodName,
1628       Message request) throws IOException {
1629     return execOperationWithResult(request,
1630         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1631       @Override
1632       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1633           throws IOException {
1634         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1635       }
1636     });
1637   }
1638
1639   public void postEndpointInvocation(final Service service, final String methodName,
1640       final Message request, final Message.Builder responseBuilder) throws IOException {
1641     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1642       @Override
1643       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1644           throws IOException {
1645         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1646       }
1647     });
1648   }
1649
1650   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1651     return execOperationWithResult(tracker,
1652         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1653       @Override
1654       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1655           throws IOException {
1656         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1657       }
1658     });
1659   }
1660
1661   private static abstract class CoprocessorOperation
1662       extends ObserverContext<RegionCoprocessorEnvironment> {
1663     public CoprocessorOperation() {
1664       this(RpcServer.getRequestUser());
1665     }
1666
1667     public CoprocessorOperation(User user) {
1668       super(user);
1669     }
1670
1671     public abstract void call(Coprocessor observer,
1672         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1673     public abstract boolean hasCall(Coprocessor observer);
1674     public void postEnvCall(RegionEnvironment env) { }
1675   }
1676
1677   private static abstract class RegionOperation extends CoprocessorOperation {
1678     public RegionOperation() {
1679     }
1680
1681     public RegionOperation(User user) {
1682       super(user);
1683     }
1684
1685     public abstract void call(RegionObserver observer,
1686         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1687
1688     public boolean hasCall(Coprocessor observer) {
1689       return observer instanceof RegionObserver;
1690     }
1691
1692     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1693         throws IOException {
1694       call((RegionObserver)observer, ctx);
1695     }
1696   }
1697
1698   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1699     public RegionOperationWithResult() {
1700     }
1701
1702     public RegionOperationWithResult(User user) {
1703       super (user);
1704     }
1705
1706     private T result = null;
1707     public void setResult(final T result) { this.result = result; }
1708     public T getResult() { return this.result; }
1709   }
1710
1711   private static abstract class EndpointOperation extends CoprocessorOperation {
1712     public abstract void call(EndpointObserver observer,
1713         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1714
1715     public boolean hasCall(Coprocessor observer) {
1716       return observer instanceof EndpointObserver;
1717     }
1718
1719     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1720         throws IOException {
1721       call((EndpointObserver)observer, ctx);
1722     }
1723   }
1724
1725   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1726     private T result = null;
1727     public void setResult(final T result) { this.result = result; }
1728     public T getResult() { return this.result; }
1729   }
1730
1731   private boolean execOperation(final CoprocessorOperation ctx)
1732       throws IOException {
1733     return execOperation(true, ctx);
1734   }
1735
1736   private <T> T execOperationWithResult(final T defaultValue,
1737       final RegionOperationWithResult<T> ctx) throws IOException {
1738     if (ctx == null) return defaultValue;
1739     ctx.setResult(defaultValue);
1740     execOperation(true, ctx);
1741     return ctx.getResult();
1742   }
1743
1744   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1745       final RegionOperationWithResult<T> ctx) throws IOException {
1746     boolean bypass = false;
1747     T result = defaultValue;
1748     if (ctx != null) {
1749       ctx.setResult(defaultValue);
1750       bypass = execOperation(true, ctx);
1751       result = ctx.getResult();
1752     }
1753     return bypass == ifBypass ? result : null;
1754   }
1755
1756   private <T> T execOperationWithResult(final T defaultValue,
1757       final EndpointOperationWithResult<T> ctx) throws IOException {
1758     if (ctx == null) return defaultValue;
1759     ctx.setResult(defaultValue);
1760     execOperation(true, ctx);
1761     return ctx.getResult();
1762   }
1763
1764   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1765       throws IOException {
1766     boolean bypass = false;
1767     List<RegionEnvironment> envs = coprocessors.get();
1768     for (int i = 0; i < envs.size(); i++) {
1769       RegionEnvironment env = envs.get(i);
1770       Coprocessor observer = env.getInstance();
1771       if (ctx.hasCall(observer)) {
1772         ctx.prepare(env);
1773         Thread currentThread = Thread.currentThread();
1774         ClassLoader cl = currentThread.getContextClassLoader();
1775         try {
1776           currentThread.setContextClassLoader(env.getClassLoader());
1777           ctx.call(observer, ctx);
1778         } catch (Throwable e) {
1779           handleCoprocessorThrowable(env, e);
1780         } finally {
1781           currentThread.setContextClassLoader(cl);
1782         }
1783         bypass |= ctx.shouldBypass();
1784         if (earlyExit && ctx.shouldComplete()) {
1785           break;
1786         }
1787       }
1788
1789       ctx.postEnvCall(env);
1790     }
1791     return bypass;
1792   }
1793 }