View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.concurrent.ArrayBlockingQueue;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.regex.Matcher;
34  
35  import com.google.common.collect.ImmutableList;
36  import com.google.common.collect.Lists;
37  import com.google.protobuf.Message;
38  import com.google.protobuf.Service;
39  import org.apache.commons.collections.map.AbstractReferenceMap;
40  import org.apache.commons.collections.map.ReferenceMap;
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.Cell;
50  import org.apache.hadoop.hbase.Coprocessor;
51  import org.apache.hadoop.hbase.CoprocessorEnvironment;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.client.Append;
57  import org.apache.hadoop.hbase.client.Delete;
58  import org.apache.hadoop.hbase.client.Durability;
59  import org.apache.hadoop.hbase.client.Get;
60  import org.apache.hadoop.hbase.client.Increment;
61  import org.apache.hadoop.hbase.client.Mutation;
62  import org.apache.hadoop.hbase.client.Put;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
69  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
71  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
72  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
73  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
74  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
75  import org.apache.hadoop.hbase.io.Reference;
76  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
77  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
78  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
79  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
80  import org.apache.hadoop.hbase.wal.WALKey;
81  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
82  import org.apache.hadoop.hbase.util.Bytes;
83  import org.apache.hadoop.hbase.util.Pair;
84  
85  /**
86   * Implements the coprocessor environment and runtime support for coprocessors
87   * loaded within a {@link HRegion}.
88   */
89  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
90  @InterfaceStability.Evolving
91  public class RegionCoprocessorHost
92      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
93  
94    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
95    // The shared data map
96    private static ReferenceMap sharedDataMap =
97        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
98  
99    /**
100    * Encapsulation of the environment of each coprocessor
101    */
102   static class RegionEnvironment extends CoprocessorHost.Environment
103       implements RegionCoprocessorEnvironment {
104 
105     private HRegion region;
106     private RegionServerServices rsServices;
107     ConcurrentMap<String, Object> sharedData;
108     private static final int LATENCY_BUFFER_SIZE = 100;
109     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
110         LATENCY_BUFFER_SIZE);
111     private final boolean useLegacyPre;
112     private final boolean useLegacyPost;
113 
114     /**
115      * Constructor
116      * @param impl the coprocessor instance
117      * @param priority chaining priority
118      */
119     public RegionEnvironment(final Coprocessor impl, final int priority,
120         final int seq, final Configuration conf, final HRegion region,
121         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
122       super(impl, priority, seq, conf);
123       this.region = region;
124       this.rsServices = services;
125       this.sharedData = sharedData;
126       // Pick which version of the WAL related events we'll call.
127       // This way we avoid calling the new version on older RegionObservers so
128       // we can maintain binary compatibility.
129       // See notes in javadoc for RegionObserver
130       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
131           HRegionInfo.class, WALKey.class, WALEdit.class);
132       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
133           HRegionInfo.class, WALKey.class, WALEdit.class);
134     }
135 
136     /** @return the region */
137     @Override
138     public HRegion getRegion() {
139       return region;
140     }
141 
142     /** @return reference to the region server services */
143     @Override
144     public RegionServerServices getRegionServerServices() {
145       return rsServices;
146     }
147 
148     public void shutdown() {
149       super.shutdown();
150     }
151 
152     @Override
153     public ConcurrentMap<String, Object> getSharedData() {
154       return sharedData;
155     }
156 
157     public void offerExecutionLatency(long latencyNanos) {
158       coprocessorTimeNanos.offer(latencyNanos);
159     }
160 
161     public Collection<Long> getExecutionLatenciesNanos() {
162       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
163       coprocessorTimeNanos.drainTo(latencies);
164       return latencies;
165     }
166 
167   }
168 
169   /** The region server services */
170   RegionServerServices rsServices;
171   /** The region */
172   HRegion region;
173 
174   /**
175    * Constructor
176    * @param region the region
177    * @param rsServices interface to available region server functionality
178    * @param conf the configuration
179    */
180   public RegionCoprocessorHost(final HRegion region,
181       final RegionServerServices rsServices, final Configuration conf) {
182     super(rsServices);
183     this.conf = conf;
184     this.rsServices = rsServices;
185     this.region = region;
186     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
187 
188     // load system default cp's from configuration.
189     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
190 
191     // load system default cp's for user tables from configuration.
192     if (!region.getRegionInfo().getTable().isSystemTable()) {
193       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
194     }
195 
196     // load Coprocessor From HDFS
197     loadTableCoprocessors(conf);
198   }
199 
200   void loadTableCoprocessors(final Configuration conf) {
201     // scan the table attributes for coprocessor load specifications
202     // initialize the coprocessors
203     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
204     for (Map.Entry<Bytes, Bytes> e :
205         region.getTableDesc().getValues().entrySet()) {
206       String key = Bytes.toString(e.getKey().get()).trim();
207       String spec = Bytes.toString(e.getValue().get()).trim();
208       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
209         // found one
210         try {
211           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
212           if (matcher.matches()) {
213             // jar file path can be empty if the cp class can be loaded
214             // from class loader.
215             Path path = matcher.group(1).trim().isEmpty() ?
216                 null : new Path(matcher.group(1).trim());
217             String className = matcher.group(2).trim();
218             int priority = matcher.group(3).trim().isEmpty() ?
219                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
220             String cfgSpec = null;
221             try {
222               cfgSpec = matcher.group(4);
223             } catch (IndexOutOfBoundsException ex) {
224               // ignore
225             }
226             Configuration ourConf;
227             if (cfgSpec != null) {
228               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
229               // do an explicit deep copy of the passed configuration
230               ourConf = new Configuration(false);
231               HBaseConfiguration.merge(ourConf, conf);
232               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
233               while (m.find()) {
234                 ourConf.set(m.group(1), m.group(2));
235               }
236             } else {
237               ourConf = conf;
238             }
239             // Load encompasses classloading and coprocessor initialization
240             try {
241               RegionEnvironment env = load(path, className, priority, ourConf);
242               configured.add(env);
243               LOG.info("Loaded coprocessor " + className + " from HTD of " +
244                 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
245             } catch (Throwable t) {
246               // Coprocessor failed to load, do we abort on error?
247               if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
248                 abortServer(className, t);
249               } else {
250                 LOG.error("Failed to load coprocessor " + className, t);
251               }
252             }
253           } else {
254             LOG.error("Malformed table coprocessor specification: key=" + key +
255               ", spec: " + spec);
256           }
257         } catch (Exception ioe) {
258           LOG.error("Malformed table coprocessor specification: key=" + key +
259             ", spec: " + spec);
260         }
261       }
262     }
263     // add together to coprocessor set for COW efficiency
264     coprocessors.addAll(configured);
265   }
266 
267   @Override
268   public RegionEnvironment createEnvironment(Class<?> implClass,
269       Coprocessor instance, int priority, int seq, Configuration conf) {
270     // Check if it's an Endpoint.
271     // Due to current dynamic protocol design, Endpoint
272     // uses a different way to be registered and executed.
273     // It uses a visitor pattern to invoke registered Endpoint
274     // method.
275     for (Class<?> c : implClass.getInterfaces()) {
276       if (CoprocessorService.class.isAssignableFrom(c)) {
277         region.registerService( ((CoprocessorService)instance).getService() );
278       }
279     }
280     ConcurrentMap<String, Object> classData;
281     // make sure only one thread can add maps
282     synchronized (sharedDataMap) {
283       // as long as at least one RegionEnvironment holds on to its classData it will
284       // remain in this map
285       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
286       if (classData == null) {
287         classData = new ConcurrentHashMap<String, Object>();
288         sharedDataMap.put(implClass.getName(), classData);
289       }
290     }
291     return new RegionEnvironment(instance, priority, seq, conf, region,
292         rsServices, classData);
293   }
294 
295   /**
296    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
297    *
298    * For example, {@link
299    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
300    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
301    *
302    * See also
303    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
304    *    CoprocessorEnvironment, Throwable)}
305    * @param env The coprocessor that threw the exception.
306    * @param e The exception that was thrown.
307    */
308   private void handleCoprocessorThrowableNoRethrow(
309       final CoprocessorEnvironment env, final Throwable e) {
310     try {
311       handleCoprocessorThrowable(env,e);
312     } catch (IOException ioe) {
313       // We cannot throw exceptions from the caller hook, so ignore.
314       LOG.warn(
315         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
316         e + ". Ignoring.",e);
317     }
318   }
319 
320   /**
321    * Invoked before a region open.
322    *
323    * @throws IOException Signals that an I/O exception has occurred.
324    */
325   public void preOpen() throws IOException {
326     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
327       @Override
328       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
329           throws IOException {
330         oserver.preOpen(ctx);
331       }
332     });
333   }
334 
335   /**
336    * Invoked after a region open
337    */
338   public void postOpen() {
339     try {
340       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
341         @Override
342         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
343             throws IOException {
344           oserver.postOpen(ctx);
345         }
346       });
347     } catch (IOException e) {
348       LOG.warn(e);
349     }
350   }
351 
352   /**
353    * Invoked after log replay on region
354    */
355   public void postLogReplay() {
356     try {
357       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
358         @Override
359         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
360             throws IOException {
361           oserver.postLogReplay(ctx);
362         }
363       });
364     } catch (IOException e) {
365       LOG.warn(e);
366     }
367   }
368 
369   /**
370    * Invoked before a region is closed
371    * @param abortRequested true if the server is aborting
372    */
373   public void preClose(final boolean abortRequested) throws IOException {
374     execOperation(false, new RegionOperation() {
375       @Override
376       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
377           throws IOException {
378         oserver.preClose(ctx, abortRequested);
379       }
380     });
381   }
382 
383   /**
384    * Invoked after a region is closed
385    * @param abortRequested true if the server is aborting
386    */
387   public void postClose(final boolean abortRequested) {
388     try {
389       execOperation(false, new RegionOperation() {
390         @Override
391         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
392             throws IOException {
393           oserver.postClose(ctx, abortRequested);
394         }
395         public void postEnvCall(RegionEnvironment env) {
396           shutdown(env);
397         }
398       });
399     } catch (IOException e) {
400       LOG.warn(e);
401     }
402   }
403 
404   /**
405    * See
406    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
407    */
408   public InternalScanner preCompactScannerOpen(final Store store,
409       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
410       final CompactionRequest request) throws IOException {
411     return execOperationWithResult(null,
412         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
413       @Override
414       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
415           throws IOException {
416         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
417           earliestPutTs, getResult(), request));
418       }
419     });
420   }
421 
422   /**
423    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
424    * available candidates.
425    * @param store The store where compaction is being requested
426    * @param candidates The currently available store files
427    * @param request custom compaction request
428    * @return If {@code true}, skip the normal selection process and use the current list
429    * @throws IOException
430    */
431   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
432       final CompactionRequest request) throws IOException {
433     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
434       @Override
435       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
436           throws IOException {
437         oserver.preCompactSelection(ctx, store, candidates, request);
438       }
439     });
440   }
441 
442   /**
443    * Called after the {@link StoreFile}s to be compacted have been selected from the available
444    * candidates.
445    * @param store The store where compaction is being requested
446    * @param selected The store files selected to compact
447    * @param request custom compaction
448    */
449   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
450       final CompactionRequest request) {
451     try {
452       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
453         @Override
454         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
455             throws IOException {
456           oserver.postCompactSelection(ctx, store, selected, request);
457         }
458       });
459     } catch (IOException e) {
460       LOG.warn(e);
461     }
462   }
463 
464   /**
465    * Called prior to rewriting the store files selected for compaction
466    * @param store the store being compacted
467    * @param scanner the scanner used to read store data during compaction
468    * @param scanType type of Scan
469    * @param request the compaction that will be executed
470    * @throws IOException
471    */
472   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
473       final ScanType scanType, final CompactionRequest request) throws IOException {
474     return execOperationWithResult(false, scanner,
475         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
476       @Override
477       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
478           throws IOException {
479         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
480       }
481     });
482   }
483 
484   /**
485    * Called after the store compaction has completed.
486    * @param store the store being compacted
487    * @param resultFile the new store file written during compaction
488    * @param request the compaction that is being executed
489    * @throws IOException
490    */
491   public void postCompact(final Store store, final StoreFile resultFile,
492       final CompactionRequest request) throws IOException {
493     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
494       @Override
495       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
496           throws IOException {
497         oserver.postCompact(ctx, store, resultFile, request);
498       }
499     });
500   }
501 
502   /**
503    * Invoked before a memstore flush
504    * @throws IOException
505    */
506   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
507       throws IOException {
508     return execOperationWithResult(false, scanner,
509         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
510       @Override
511       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
512           throws IOException {
513         setResult(oserver.preFlush(ctx, store, getResult()));
514       }
515     });
516   }
517 
518   /**
519    * Invoked before a memstore flush
520    * @throws IOException
521    */
522   public void preFlush() throws IOException {
523     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
524       @Override
525       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
526           throws IOException {
527         oserver.preFlush(ctx);
528       }
529     });
530   }
531 
532   /**
533    * See
534    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
535    *    Store, KeyValueScanner, InternalScanner)}
536    */
537   public InternalScanner preFlushScannerOpen(final Store store,
538       final KeyValueScanner memstoreScanner) throws IOException {
539     return execOperationWithResult(null,
540         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
541       @Override
542       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
543           throws IOException {
544         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
545       }
546     });
547   }
548 
549   /**
550    * Invoked after a memstore flush
551    * @throws IOException
552    */
553   public void postFlush() throws IOException {
554     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
555       @Override
556       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
557           throws IOException {
558         oserver.postFlush(ctx);
559       }
560     });
561   }
562 
563   /**
564    * Invoked after a memstore flush
565    * @throws IOException
566    */
567   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
568     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
569       @Override
570       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
571           throws IOException {
572         oserver.postFlush(ctx, store, storeFile);
573       }
574     });
575   }
576 
577   /**
578    * Invoked just before a split
579    * @throws IOException
580    */
581   // TODO: Deprecate this
582   public void preSplit() throws IOException {
583     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
584       @Override
585       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
586           throws IOException {
587         oserver.preSplit(ctx);
588       }
589     });
590   }
591 
592   /**
593    * Invoked just before a split
594    * @throws IOException
595    */
596   public void preSplit(final byte[] splitRow) throws IOException {
597     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
598       @Override
599       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
600           throws IOException {
601         oserver.preSplit(ctx, splitRow);
602       }
603     });
604   }
605 
606   /**
607    * Invoked just after a split
608    * @param l the new left-hand daughter region
609    * @param r the new right-hand daughter region
610    * @throws IOException
611    */
612   public void postSplit(final HRegion l, final HRegion r) throws IOException {
613     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
614       @Override
615       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
616           throws IOException {
617         oserver.postSplit(ctx, l, r);
618       }
619     });
620   }
621 
622   public boolean preSplitBeforePONR(final byte[] splitKey,
623       final List<Mutation> metaEntries) throws IOException {
624     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
625       @Override
626       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
627           throws IOException {
628         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
629       }
630     });
631   }
632 
633   public void preSplitAfterPONR() throws IOException {
634     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
635       @Override
636       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
637           throws IOException {
638         oserver.preSplitAfterPONR(ctx);
639       }
640     });
641   }
642 
643   /**
644    * Invoked just before the rollback of a failed split is started
645    * @throws IOException
646    */
647   public void preRollBackSplit() throws IOException {
648     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
649       @Override
650       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
651           throws IOException {
652         oserver.preRollBackSplit(ctx);
653       }
654     });
655   }
656 
657   /**
658    * Invoked just after the rollback of a failed split is done
659    * @throws IOException
660    */
661   public void postRollBackSplit() throws IOException {
662     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
663       @Override
664       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
665           throws IOException {
666         oserver.postRollBackSplit(ctx);
667       }
668     });
669   }
670 
671   /**
672    * Invoked after a split is completed irrespective of a failure or success.
673    * @throws IOException
674    */
675   public void postCompleteSplit() throws IOException {
676     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
677       @Override
678       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
679           throws IOException {
680         oserver.postCompleteSplit(ctx);
681       }
682     });
683   }
684 
685   // RegionObserver support
686 
687   /**
688    * @param row the row key
689    * @param family the family
690    * @param result the result set from the region
691    * @return true if default processing should be bypassed
692    * @exception IOException Exception
693    */
694   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
695       final Result result) throws IOException {
696     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
697       @Override
698       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
699           throws IOException {
700         oserver.preGetClosestRowBefore(ctx, row, family, result);
701       }
702     });
703   }
704 
705   /**
706    * @param row the row key
707    * @param family the family
708    * @param result the result set from the region
709    * @exception IOException Exception
710    */
711   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
712       final Result result) throws IOException {
713     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
714       @Override
715       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
716           throws IOException {
717         oserver.postGetClosestRowBefore(ctx, row, family, result);
718       }
719     });
720   }
721 
722   /**
723    * @param get the Get request
724    * @return true if default processing should be bypassed
725    * @exception IOException Exception
726    */
727   public boolean preGet(final Get get, final List<Cell> results)
728       throws IOException {
729     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
730       @Override
731       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
732           throws IOException {
733         oserver.preGetOp(ctx, get, results);
734       }
735     });
736   }
737 
738   /**
739    * @param get the Get request
740    * @param results the result sett
741    * @exception IOException Exception
742    */
743   public void postGet(final Get get, final List<Cell> results)
744       throws IOException {
745     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
746       @Override
747       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
748           throws IOException {
749         oserver.postGetOp(ctx, get, results);
750       }
751     });
752   }
753 
754   /**
755    * @param get the Get request
756    * @return true or false to return to client if bypassing normal operation,
757    * or null otherwise
758    * @exception IOException Exception
759    */
760   public Boolean preExists(final Get get) throws IOException {
761     return execOperationWithResult(true, false,
762         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
763       @Override
764       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
765           throws IOException {
766         setResult(oserver.preExists(ctx, get, getResult()));
767       }
768     });
769   }
770 
771   /**
772    * @param get the Get request
773    * @param exists the result returned by the region server
774    * @return the result to return to the client
775    * @exception IOException Exception
776    */
777   public boolean postExists(final Get get, boolean exists)
778       throws IOException {
779     return execOperationWithResult(exists,
780         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
781       @Override
782       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
783           throws IOException {
784         setResult(oserver.postExists(ctx, get, getResult()));
785       }
786     });
787   }
788 
789   /**
790    * @param put The Put object
791    * @param edit The WALEdit object.
792    * @param durability The durability used
793    * @return true if default processing should be bypassed
794    * @exception IOException Exception
795    */
796   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
797       throws IOException {
798     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
799       @Override
800       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
801           throws IOException {
802         oserver.prePut(ctx, put, edit, durability);
803       }
804     });
805   }
806 
807   /**
808    * @param mutation - the current mutation
809    * @param kv - the current cell
810    * @param byteNow - current timestamp in bytes
811    * @param get - the get that could be used
812    * Note that the get only does not specify the family and qualifier that should be used
813    * @return true if default processing should be bypassed
814    * @exception IOException
815    *              Exception
816    */
817   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
818       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
819     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
820       @Override
821       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
822           throws IOException {
823         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
824       }
825     });
826   }
827 
828   /**
829    * @param put The Put object
830    * @param edit The WALEdit object.
831    * @param durability The durability used
832    * @exception IOException Exception
833    */
834   public void postPut(final Put put, final WALEdit edit, final Durability durability)
835       throws IOException {
836     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
837       @Override
838       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
839           throws IOException {
840         oserver.postPut(ctx, put, edit, durability);
841       }
842     });
843   }
844 
845   /**
846    * @param delete The Delete object
847    * @param edit The WALEdit object.
848    * @param durability The durability used
849    * @return true if default processing should be bypassed
850    * @exception IOException Exception
851    */
852   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
853       throws IOException {
854     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
855       @Override
856       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
857           throws IOException {
858         oserver.preDelete(ctx, delete, edit, durability);
859       }
860     });
861   }
862 
863   /**
864    * @param delete The Delete object
865    * @param edit The WALEdit object.
866    * @param durability The durability used
867    * @exception IOException Exception
868    */
869   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
870       throws IOException {
871     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
872       @Override
873       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
874           throws IOException {
875         oserver.postDelete(ctx, delete, edit, durability);
876       }
877     });
878   }
879 
880   /**
881    * @param miniBatchOp
882    * @return true if default processing should be bypassed
883    * @throws IOException
884    */
885   public boolean preBatchMutate(
886       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
887     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
888       @Override
889       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
890           throws IOException {
891         oserver.preBatchMutate(ctx, miniBatchOp);
892       }
893     });
894   }
895 
896   /**
897    * @param miniBatchOp
898    * @throws IOException
899    */
900   public void postBatchMutate(
901       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
902     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
903       @Override
904       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
905           throws IOException {
906         oserver.postBatchMutate(ctx, miniBatchOp);
907       }
908     });
909   }
910 
911   public void postBatchMutateIndispensably(
912       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
913       throws IOException {
914     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
915       @Override
916       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
917           throws IOException {
918         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
919       }
920     });
921   }
922 
923   /**
924    * @param row row to check
925    * @param family column family
926    * @param qualifier column qualifier
927    * @param compareOp the comparison operation
928    * @param comparator the comparator
929    * @param put data to put if check succeeds
930    * @return true or false to return to client if default processing should
931    * be bypassed, or null otherwise
932    * @throws IOException e
933    */
934   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
935       final byte [] qualifier, final CompareOp compareOp,
936       final ByteArrayComparable comparator, final Put put)
937       throws IOException {
938     return execOperationWithResult(true, false,
939         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
940       @Override
941       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
942           throws IOException {
943         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
944           compareOp, comparator, put, getResult()));
945       }
946     });
947   }
948 
949   /**
950    * @param row row to check
951    * @param family column family
952    * @param qualifier column qualifier
953    * @param compareOp the comparison operation
954    * @param comparator the comparator
955    * @param put data to put if check succeeds
956    * @return true or false to return to client if default processing should
957    * be bypassed, or null otherwise
958    * @throws IOException e
959    */
960   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
961       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
962       final Put put) throws IOException {
963     return execOperationWithResult(true, false,
964         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
965       @Override
966       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
967           throws IOException {
968         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
969           compareOp, comparator, put, getResult()));
970       }
971     });
972   }
973 
974   /**
975    * @param row row to check
976    * @param family column family
977    * @param qualifier column qualifier
978    * @param compareOp the comparison operation
979    * @param comparator the comparator
980    * @param put data to put if check succeeds
981    * @throws IOException e
982    */
983   public boolean postCheckAndPut(final byte [] row, final byte [] family,
984       final byte [] qualifier, final CompareOp compareOp,
985       final ByteArrayComparable comparator, final Put put,
986       boolean result) throws IOException {
987     return execOperationWithResult(result,
988         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
989       @Override
990       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
991           throws IOException {
992         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
993           compareOp, comparator, put, getResult()));
994       }
995     });
996   }
997 
998   /**
999    * @param row row to check
1000    * @param family column family
1001    * @param qualifier column qualifier
1002    * @param compareOp the comparison operation
1003    * @param comparator the comparator
1004    * @param delete delete to commit if check succeeds
1005    * @return true or false to return to client if default processing should
1006    * be bypassed, or null otherwise
1007    * @throws IOException e
1008    */
1009   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1010       final byte [] qualifier, final CompareOp compareOp,
1011       final ByteArrayComparable comparator, final Delete delete)
1012       throws IOException {
1013     return execOperationWithResult(true, false,
1014         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1015       @Override
1016       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1017           throws IOException {
1018         setResult(oserver.preCheckAndDelete(ctx, row, family,
1019             qualifier, compareOp, comparator, delete, getResult()));
1020       }
1021     });
1022   }
1023 
1024   /**
1025    * @param row row to check
1026    * @param family column family
1027    * @param qualifier column qualifier
1028    * @param compareOp the comparison operation
1029    * @param comparator the comparator
1030    * @param delete delete to commit if check succeeds
1031    * @return true or false to return to client if default processing should
1032    * be bypassed, or null otherwise
1033    * @throws IOException e
1034    */
1035   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1036       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1037       final Delete delete) throws IOException {
1038     return execOperationWithResult(true, false,
1039         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1040       @Override
1041       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1042           throws IOException {
1043         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1044               family, qualifier, compareOp, comparator, delete, getResult()));
1045       }
1046     });
1047   }
1048 
1049   /**
1050    * @param row row to check
1051    * @param family column family
1052    * @param qualifier column qualifier
1053    * @param compareOp the comparison operation
1054    * @param comparator the comparator
1055    * @param delete delete to commit if check succeeds
1056    * @throws IOException e
1057    */
1058   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1059       final byte [] qualifier, final CompareOp compareOp,
1060       final ByteArrayComparable comparator, final Delete delete,
1061       boolean result) throws IOException {
1062     return execOperationWithResult(result,
1063         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1064       @Override
1065       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1066           throws IOException {
1067         setResult(oserver.postCheckAndDelete(ctx, row, family,
1068             qualifier, compareOp, comparator, delete, getResult()));
1069       }
1070     });
1071   }
1072 
1073   /**
1074    * @param append append object
1075    * @return result to return to client if default operation should be
1076    * bypassed, null otherwise
1077    * @throws IOException if an error occurred on the coprocessor
1078    */
1079   public Result preAppend(final Append append) throws IOException {
1080     return execOperationWithResult(true, null,
1081         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1082       @Override
1083       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1084           throws IOException {
1085         setResult(oserver.preAppend(ctx, append));
1086       }
1087     });
1088   }
1089 
1090   /**
1091    * @param append append object
1092    * @return result to return to client if default operation should be
1093    * bypassed, null otherwise
1094    * @throws IOException if an error occurred on the coprocessor
1095    */
1096   public Result preAppendAfterRowLock(final Append append) throws IOException {
1097     return execOperationWithResult(true, null,
1098         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1099       @Override
1100       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1101           throws IOException {
1102         setResult(oserver.preAppendAfterRowLock(ctx, append));
1103       }
1104     });
1105   }
1106 
1107   /**
1108    * @param increment increment object
1109    * @return result to return to client if default operation should be
1110    * bypassed, null otherwise
1111    * @throws IOException if an error occurred on the coprocessor
1112    */
1113   public Result preIncrement(final Increment increment) throws IOException {
1114     return execOperationWithResult(true, null,
1115         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1116       @Override
1117       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1118           throws IOException {
1119         setResult(oserver.preIncrement(ctx, increment));
1120       }
1121     });
1122   }
1123 
1124   /**
1125    * @param increment increment object
1126    * @return result to return to client if default operation should be
1127    * bypassed, null otherwise
1128    * @throws IOException if an error occurred on the coprocessor
1129    */
1130   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1131     return execOperationWithResult(true, null,
1132         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1133       @Override
1134       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1135           throws IOException {
1136         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1137       }
1138     });
1139   }
1140 
1141   /**
1142    * @param append Append object
1143    * @param result the result returned by the append
1144    * @throws IOException if an error occurred on the coprocessor
1145    */
1146   public void postAppend(final Append append, final Result result) throws IOException {
1147     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1148       @Override
1149       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1150           throws IOException {
1151         oserver.postAppend(ctx, append, result);
1152       }
1153     });
1154   }
1155 
1156   /**
1157    * @param increment increment object
1158    * @param result the result returned by postIncrement
1159    * @throws IOException if an error occurred on the coprocessor
1160    */
1161   public Result postIncrement(final Increment increment, Result result) throws IOException {
1162     return execOperationWithResult(result,
1163         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1164       @Override
1165       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1166           throws IOException {
1167         setResult(oserver.postIncrement(ctx, increment, getResult()));
1168       }
1169     });
1170   }
1171 
1172   /**
1173    * @param scan the Scan specification
1174    * @return scanner id to return to client if default operation should be
1175    * bypassed, false otherwise
1176    * @exception IOException Exception
1177    */
1178   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1179     return execOperationWithResult(true, null,
1180         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1181       @Override
1182       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1183           throws IOException {
1184         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1185       }
1186     });
1187   }
1188 
1189   /**
1190    * See
1191    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1192    *    Store, Scan, NavigableSet, KeyValueScanner)}
1193    */
1194   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1195       final NavigableSet<byte[]> targetCols) throws IOException {
1196     return execOperationWithResult(null,
1197         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1198       @Override
1199       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1200           throws IOException {
1201         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1202       }
1203     });
1204   }
1205 
1206   /**
1207    * @param scan the Scan specification
1208    * @param s the scanner
1209    * @return the scanner instance to use
1210    * @exception IOException Exception
1211    */
1212   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1213     return execOperationWithResult(s,
1214         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1215       @Override
1216       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1217           throws IOException {
1218         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1219       }
1220     });
1221   }
1222 
1223   /**
1224    * @param s the scanner
1225    * @param results the result set returned by the region server
1226    * @param limit the maximum number of results to return
1227    * @return 'has next' indication to client if bypassing default behavior, or
1228    * null otherwise
1229    * @exception IOException Exception
1230    */
1231   public Boolean preScannerNext(final InternalScanner s,
1232       final List<Result> results, final int limit) throws IOException {
1233     return execOperationWithResult(true, false,
1234         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1235       @Override
1236       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1237           throws IOException {
1238         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1239       }
1240     });
1241   }
1242 
1243   /**
1244    * @param s the scanner
1245    * @param results the result set returned by the region server
1246    * @param limit the maximum number of results to return
1247    * @param hasMore
1248    * @return 'has more' indication to give to client
1249    * @exception IOException Exception
1250    */
1251   public boolean postScannerNext(final InternalScanner s,
1252       final List<Result> results, final int limit, boolean hasMore)
1253       throws IOException {
1254     return execOperationWithResult(hasMore,
1255         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1256       @Override
1257       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1258           throws IOException {
1259         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1260       }
1261     });
1262   }
1263 
1264   /**
1265    * This will be called by the scan flow when the current scanned row is being filtered out by the
1266    * filter.
1267    * @param s the scanner
1268    * @param currentRow The current rowkey which got filtered out
1269    * @param offset offset to rowkey
1270    * @param length length of rowkey
1271    * @return whether more rows are available for the scanner or not
1272    * @throws IOException
1273    */
1274   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1275       final int offset, final short length) throws IOException {
1276     return execOperationWithResult(true,
1277         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1278       @Override
1279       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1280           throws IOException {
1281         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1282       }
1283     });
1284   }
1285 
1286   /**
1287    * @param s the scanner
1288    * @return true if default behavior should be bypassed, false otherwise
1289    * @exception IOException Exception
1290    */
1291   public boolean preScannerClose(final InternalScanner s) throws IOException {
1292     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1293       @Override
1294       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1295           throws IOException {
1296         oserver.preScannerClose(ctx, s);
1297       }
1298     });
1299   }
1300 
1301   /**
1302    * @exception IOException Exception
1303    */
1304   public void postScannerClose(final InternalScanner s) throws IOException {
1305     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1306       @Override
1307       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1308           throws IOException {
1309         oserver.postScannerClose(ctx, s);
1310       }
1311     });
1312   }
1313 
1314   /**
1315    * @param info
1316    * @param logKey
1317    * @param logEdit
1318    * @return true if default behavior should be bypassed, false otherwise
1319    * @throws IOException
1320    */
1321   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1322       final WALEdit logEdit) throws IOException {
1323     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1324       @Override
1325       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1326           throws IOException {
1327         // Once we don't need to support the legacy call, replace RegionOperation with a version
1328         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1329         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1330         if (env.useLegacyPre) {
1331           if (logKey instanceof HLogKey) {
1332             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1333           } else {
1334             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1335           }
1336         } else {
1337           oserver.preWALRestore(ctx, info, logKey, logEdit);
1338         }
1339       }
1340     });
1341   }
1342 
1343   /**
1344    * @return true if default behavior should be bypassed, false otherwise
1345    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1346    */
1347   @Deprecated
1348   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1349       final WALEdit logEdit) throws IOException {
1350     return preWALRestore(info, (WALKey)logKey, logEdit);
1351   }
1352 
1353   /**
1354    * @param info
1355    * @param logKey
1356    * @param logEdit
1357    * @throws IOException
1358    */
1359   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1360       throws IOException {
1361     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1362       @Override
1363       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1364           throws IOException {
1365         // Once we don't need to support the legacy call, replace RegionOperation with a version
1366         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1367         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1368         if (env.useLegacyPost) {
1369           if (logKey instanceof HLogKey) {
1370             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1371           } else {
1372             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1373           }
1374         } else {
1375           oserver.postWALRestore(ctx, info, logKey, logEdit);
1376         }
1377       }
1378     });
1379   }
1380 
1381   /**
1382    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1383    */
1384   @Deprecated
1385   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1386       throws IOException {
1387     postWALRestore(info, (WALKey)logKey, logEdit);
1388   }
1389 
1390   /**
1391    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1392    * @return true if the default operation should be bypassed
1393    * @throws IOException
1394    */
1395   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1396     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1397       @Override
1398       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1399           throws IOException {
1400         oserver.preBulkLoadHFile(ctx, familyPaths);
1401       }
1402     });
1403   }
1404 
1405   /**
1406    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1407    * @param hasLoaded whether load was successful or not
1408    * @return the possibly modified value of hasLoaded
1409    * @throws IOException
1410    */
1411   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1412       boolean hasLoaded) throws IOException {
1413     return execOperationWithResult(hasLoaded,
1414         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1415       @Override
1416       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1417           throws IOException {
1418         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1419       }
1420     });
1421   }
1422 
1423   public void postStartRegionOperation(final Operation op) throws IOException {
1424     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1425       @Override
1426       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1427           throws IOException {
1428         oserver.postStartRegionOperation(ctx, op);
1429       }
1430     });
1431   }
1432 
1433   public void postCloseRegionOperation(final Operation op) throws IOException {
1434     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1435       @Override
1436       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1437           throws IOException {
1438         oserver.postCloseRegionOperation(ctx, op);
1439       }
1440     });
1441   }
1442 
1443   /**
1444    * @param fs fileystem to read from
1445    * @param p path to the file
1446    * @param in {@link FSDataInputStreamWrapper}
1447    * @param size Full size of the file
1448    * @param cacheConf
1449    * @param r original reference file. This will be not null only when reading a split file.
1450    * @return a Reader instance to use instead of the base reader if overriding
1451    * default behavior, null otherwise
1452    * @throws IOException
1453    */
1454   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1455       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1456       final Reference r) throws IOException {
1457     return execOperationWithResult(null,
1458         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1459       @Override
1460       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1461           throws IOException {
1462         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1463       }
1464     });
1465   }
1466 
1467   /**
1468    * @param fs fileystem to read from
1469    * @param p path to the file
1470    * @param in {@link FSDataInputStreamWrapper}
1471    * @param size Full size of the file
1472    * @param cacheConf
1473    * @param r original reference file. This will be not null only when reading a split file.
1474    * @param reader the base reader instance
1475    * @return The reader to use
1476    * @throws IOException
1477    */
1478   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1479       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1480       final Reference r, final StoreFile.Reader reader) throws IOException {
1481     return execOperationWithResult(reader,
1482         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1483       @Override
1484       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1485           throws IOException {
1486         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1487       }
1488     });
1489   }
1490 
1491   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1492       final Cell oldCell, Cell newCell) throws IOException {
1493     return execOperationWithResult(newCell,
1494         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1495       @Override
1496       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1497           throws IOException {
1498         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1499       }
1500     });
1501   }
1502 
1503   public Message preEndpointInvocation(final Service service, final String methodName,
1504       Message request) throws IOException {
1505     return execOperationWithResult(request,
1506         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1507       @Override
1508       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1509           throws IOException {
1510         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1511       }
1512     });
1513   }
1514 
1515   public void postEndpointInvocation(final Service service, final String methodName,
1516       final Message request, final Message.Builder responseBuilder) throws IOException {
1517     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1518       @Override
1519       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1520           throws IOException {
1521         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1522       }
1523     });
1524   }
1525 
1526   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1527     return execOperationWithResult(tracker,
1528         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1529       @Override
1530       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1531           throws IOException {
1532         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1533       }
1534     });
1535   }
1536 
1537   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1538     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1539     for (RegionEnvironment env : coprocessors) {
1540       DescriptiveStatistics ds = new DescriptiveStatistics();
1541       if (env.getInstance() instanceof RegionObserver) {
1542         for (Long time : env.getExecutionLatenciesNanos()) {
1543           ds.addValue(time);
1544         }
1545         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1546         if (ds.getN() == 0) {
1547           ds.addValue(0);
1548         }
1549         results.put(env.getInstance().getClass().getSimpleName(), ds);
1550       }
1551     }
1552     return results;
1553   }
1554 
1555   private static abstract class CoprocessorOperation
1556       extends ObserverContext<RegionCoprocessorEnvironment> {
1557     public abstract void call(Coprocessor observer,
1558         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1559     public abstract boolean hasCall(Coprocessor observer);
1560     public void postEnvCall(RegionEnvironment env) { }
1561   }
1562 
1563   private static abstract class RegionOperation extends CoprocessorOperation {
1564     public abstract void call(RegionObserver observer,
1565         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1566 
1567     public boolean hasCall(Coprocessor observer) {
1568       return observer instanceof RegionObserver;
1569     }
1570 
1571     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1572         throws IOException {
1573       call((RegionObserver)observer, ctx);
1574     }
1575   }
1576 
1577   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1578     private T result = null;
1579     public void setResult(final T result) { this.result = result; }
1580     public T getResult() { return this.result; }
1581   }
1582 
1583   private static abstract class EndpointOperation extends CoprocessorOperation {
1584     public abstract void call(EndpointObserver observer,
1585         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1586 
1587     public boolean hasCall(Coprocessor observer) {
1588       return observer instanceof EndpointObserver;
1589     }
1590 
1591     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1592         throws IOException {
1593       call((EndpointObserver)observer, ctx);
1594     }
1595   }
1596 
1597   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1598     private T result = null;
1599     public void setResult(final T result) { this.result = result; }
1600     public T getResult() { return this.result; }
1601   }
1602 
1603   private boolean execOperation(final CoprocessorOperation ctx)
1604       throws IOException {
1605     return execOperation(true, ctx);
1606   }
1607 
1608   private <T> T execOperationWithResult(final T defaultValue,
1609       final RegionOperationWithResult<T> ctx) throws IOException {
1610     if (ctx == null) return defaultValue;
1611     ctx.setResult(defaultValue);
1612     execOperation(true, ctx);
1613     return ctx.getResult();
1614   }
1615 
1616   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1617       final RegionOperationWithResult<T> ctx) throws IOException {
1618     boolean bypass = false;
1619     T result = defaultValue;
1620     if (ctx != null) {
1621       ctx.setResult(defaultValue);
1622       bypass = execOperation(true, ctx);
1623       result = ctx.getResult();
1624     }
1625     return bypass == ifBypass ? result : null;
1626   }
1627 
1628   private <T> T execOperationWithResult(final T defaultValue,
1629       final EndpointOperationWithResult<T> ctx) throws IOException {
1630     if (ctx == null) return defaultValue;
1631     ctx.setResult(defaultValue);
1632     execOperation(true, ctx);
1633     return ctx.getResult();
1634   }
1635 
1636   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1637       throws IOException {
1638     boolean bypass = false;
1639     for (RegionEnvironment env: coprocessors) {
1640       Coprocessor observer = env.getInstance();
1641       if (ctx.hasCall(observer)) {
1642         long startTime = System.nanoTime();
1643         ctx.prepare(env);
1644         Thread currentThread = Thread.currentThread();
1645         ClassLoader cl = currentThread.getContextClassLoader();
1646         try {
1647           currentThread.setContextClassLoader(env.getClassLoader());
1648           ctx.call(observer, ctx);
1649         } catch (Throwable e) {
1650           handleCoprocessorThrowable(env, e);
1651         } finally {
1652           currentThread.setContextClassLoader(cl);
1653         }
1654         env.offerExecutionLatency(System.nanoTime() - startTime);
1655         bypass |= ctx.shouldBypass();
1656         if (earlyExit && ctx.shouldComplete()) {
1657           break;
1658         }
1659       }
1660 
1661       ctx.postEnvCall(env);
1662     }
1663     return bypass;
1664   }
1665 }