View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.concurrent.ArrayBlockingQueue;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.regex.Matcher;
34  
35  import com.google.common.collect.ImmutableList;
36  import com.google.common.collect.Lists;
37  import com.google.protobuf.Message;
38  import com.google.protobuf.Service;
39  import org.apache.commons.collections.map.AbstractReferenceMap;
40  import org.apache.commons.collections.map.ReferenceMap;
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
44  import org.apache.hadoop.classification.InterfaceAudience;
45  import org.apache.hadoop.classification.InterfaceStability;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.Cell;
50  import org.apache.hadoop.hbase.Coprocessor;
51  import org.apache.hadoop.hbase.CoprocessorEnvironment;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.client.Append;
57  import org.apache.hadoop.hbase.client.Delete;
58  import org.apache.hadoop.hbase.client.Durability;
59  import org.apache.hadoop.hbase.client.Get;
60  import org.apache.hadoop.hbase.client.Increment;
61  import org.apache.hadoop.hbase.client.Mutation;
62  import org.apache.hadoop.hbase.client.Put;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.client.Scan;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
68  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
69  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
71  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
72  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
73  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
74  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
75  import org.apache.hadoop.hbase.io.Reference;
76  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
77  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
78  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
79  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
80  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
81  import org.apache.hadoop.hbase.util.Bytes;
82  import org.apache.hadoop.hbase.util.Pair;
83  
84  /**
85   * Implements the coprocessor environment and runtime support for coprocessors
86   * loaded within a {@link HRegion}.
87   */
88  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
89  @InterfaceStability.Evolving
90  public class RegionCoprocessorHost
91      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
92  
93    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
94    // The shared data map
95    private static ReferenceMap sharedDataMap =
96        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
97  
98    /**
99     * Encapsulation of the environment of each coprocessor
100    */
101   static class RegionEnvironment extends CoprocessorHost.Environment
102       implements RegionCoprocessorEnvironment {
103 
104     private HRegion region;
105     private RegionServerServices rsServices;
106     ConcurrentMap<String, Object> sharedData;
107     private static final int LATENCY_BUFFER_SIZE = 100;
108     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
109         LATENCY_BUFFER_SIZE);
110 
111     /**
112      * Constructor
113      * @param impl the coprocessor instance
114      * @param priority chaining priority
115      */
116     public RegionEnvironment(final Coprocessor impl, final int priority,
117         final int seq, final Configuration conf, final HRegion region,
118         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
119       super(impl, priority, seq, conf);
120       this.region = region;
121       this.rsServices = services;
122       this.sharedData = sharedData;
123     }
124 
125     /** @return the region */
126     @Override
127     public HRegion getRegion() {
128       return region;
129     }
130 
131     /** @return reference to the region server services */
132     @Override
133     public RegionServerServices getRegionServerServices() {
134       return rsServices;
135     }
136 
137     public void shutdown() {
138       super.shutdown();
139     }
140 
141     @Override
142     public ConcurrentMap<String, Object> getSharedData() {
143       return sharedData;
144     }
145 
146     public void offerExecutionLatency(long latencyNanos) {
147       coprocessorTimeNanos.offer(latencyNanos);
148     }
149 
150     public Collection<Long> getExecutionLatenciesNanos() {
151       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
152       coprocessorTimeNanos.drainTo(latencies);
153       return latencies;
154     }
155 
156   }
157 
158   /** The region server services */
159   RegionServerServices rsServices;
160   /** The region */
161   HRegion region;
162 
163   /**
164    * Constructor
165    * @param region the region
166    * @param rsServices interface to available region server functionality
167    * @param conf the configuration
168    */
169   public RegionCoprocessorHost(final HRegion region,
170       final RegionServerServices rsServices, final Configuration conf) {
171     super(rsServices);
172     this.conf = conf;
173     this.rsServices = rsServices;
174     this.region = region;
175     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
176 
177     // load system default cp's from configuration.
178     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
179 
180     // load system default cp's for user tables from configuration.
181     if (!region.getRegionInfo().getTable().isSystemTable()) {
182       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
183     }
184 
185     // load Coprocessor From HDFS
186     loadTableCoprocessors(conf);
187   }
188 
189   void loadTableCoprocessors(final Configuration conf) {
190     // scan the table attributes for coprocessor load specifications
191     // initialize the coprocessors
192     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
193     for (Map.Entry<Bytes, Bytes> e :
194         region.getTableDesc().getValues().entrySet()) {
195       String key = Bytes.toString(e.getKey().get()).trim();
196       String spec = Bytes.toString(e.getValue().get()).trim();
197       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
198         // found one
199         try {
200           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
201           if (matcher.matches()) {
202             // jar file path can be empty if the cp class can be loaded
203             // from class loader.
204             Path path = matcher.group(1).trim().isEmpty() ?
205                 null : new Path(matcher.group(1).trim());
206             String className = matcher.group(2).trim();
207             int priority = matcher.group(3).trim().isEmpty() ?
208                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
209             String cfgSpec = null;
210             try {
211               cfgSpec = matcher.group(4);
212             } catch (IndexOutOfBoundsException ex) {
213               // ignore
214             }
215             Configuration ourConf;
216             if (cfgSpec != null) {
217               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
218               // do an explicit deep copy of the passed configuration
219               ourConf = new Configuration(false);
220               HBaseConfiguration.merge(ourConf, conf);
221               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
222               while (m.find()) {
223                 ourConf.set(m.group(1), m.group(2));
224               }
225             } else {
226               ourConf = conf;
227             }
228             // Load encompasses classloading and coprocessor initialization
229             try {
230               RegionEnvironment env = load(path, className, priority, ourConf);
231               configured.add(env);
232               LOG.info("Loaded coprocessor " + className + " from HTD of " +
233                 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
234             } catch (Throwable t) {
235               // Coprocessor failed to load, do we abort on error?
236               if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
237                 abortServer(className, t);
238               } else {
239                 LOG.error("Failed to load coprocessor " + className, t);
240               }
241             }
242           } else {
243             LOG.error("Malformed table coprocessor specification: key=" + key +
244               ", spec: " + spec);
245           }
246         } catch (Exception ioe) {
247           LOG.error("Malformed table coprocessor specification: key=" + key +
248             ", spec: " + spec);
249         }
250       }
251     }
252     // add together to coprocessor set for COW efficiency
253     coprocessors.addAll(configured);
254   }
255 
256   @Override
257   public RegionEnvironment createEnvironment(Class<?> implClass,
258       Coprocessor instance, int priority, int seq, Configuration conf) {
259     // Check if it's an Endpoint.
260     // Due to current dynamic protocol design, Endpoint
261     // uses a different way to be registered and executed.
262     // It uses a visitor pattern to invoke registered Endpoint
263     // method.
264     for (Class<?> c : implClass.getInterfaces()) {
265       if (CoprocessorService.class.isAssignableFrom(c)) {
266         region.registerService( ((CoprocessorService)instance).getService() );
267       }
268     }
269     ConcurrentMap<String, Object> classData;
270     // make sure only one thread can add maps
271     synchronized (sharedDataMap) {
272       // as long as at least one RegionEnvironment holds on to its classData it will
273       // remain in this map
274       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
275       if (classData == null) {
276         classData = new ConcurrentHashMap<String, Object>();
277         sharedDataMap.put(implClass.getName(), classData);
278       }
279     }
280     return new RegionEnvironment(instance, priority, seq, conf, region,
281         rsServices, classData);
282   }
283 
284   /**
285    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
286    *
287    * For example, {@link
288    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
289    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
290    *
291    * See also
292    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
293    *    CoprocessorEnvironment, Throwable)}
294    * @param env The coprocessor that threw the exception.
295    * @param e The exception that was thrown.
296    */
297   private void handleCoprocessorThrowableNoRethrow(
298       final CoprocessorEnvironment env, final Throwable e) {
299     try {
300       handleCoprocessorThrowable(env,e);
301     } catch (IOException ioe) {
302       // We cannot throw exceptions from the caller hook, so ignore.
303       LOG.warn(
304         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
305         e + ". Ignoring.",e);
306     }
307   }
308 
309   /**
310    * Invoked before a region open.
311    *
312    * @throws IOException Signals that an I/O exception has occurred.
313    */
314   public void preOpen() throws IOException {
315     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
316       @Override
317       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
318           throws IOException {
319         oserver.preOpen(ctx);
320       }
321     });
322   }
323 
324   /**
325    * Invoked after a region open
326    */
327   public void postOpen() {
328     try {
329       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
330         @Override
331         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
332             throws IOException {
333           oserver.postOpen(ctx);
334         }
335       });
336     } catch (IOException e) {
337       LOG.warn(e);
338     }
339   }
340 
341   /**
342    * Invoked after log replay on region
343    */
344   public void postLogReplay() {
345     try {
346       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
347         @Override
348         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
349             throws IOException {
350           oserver.postLogReplay(ctx);
351         }
352       });
353     } catch (IOException e) {
354       LOG.warn(e);
355     }
356   }
357 
358   /**
359    * Invoked before a region is closed
360    * @param abortRequested true if the server is aborting
361    */
362   public void preClose(final boolean abortRequested) throws IOException {
363     execOperation(false, new RegionOperation() {
364       @Override
365       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
366           throws IOException {
367         oserver.preClose(ctx, abortRequested);
368       }
369     });
370   }
371 
372   /**
373    * Invoked after a region is closed
374    * @param abortRequested true if the server is aborting
375    */
376   public void postClose(final boolean abortRequested) {
377     try {
378       execOperation(false, new RegionOperation() {
379         @Override
380         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
381             throws IOException {
382           oserver.postClose(ctx, abortRequested);
383         }
384         public void postEnvCall(RegionEnvironment env) {
385           shutdown(env);
386         }
387       });
388     } catch (IOException e) {
389       LOG.warn(e);
390     }
391   }
392 
393   /**
394    * See
395    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
396    */
397   public InternalScanner preCompactScannerOpen(final Store store,
398       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
399       final CompactionRequest request) throws IOException {
400     return execOperationWithResult(null,
401         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
402       @Override
403       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
404           throws IOException {
405         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
406           earliestPutTs, getResult(), request));
407       }
408     });
409   }
410 
411   /**
412    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
413    * available candidates.
414    * @param store The store where compaction is being requested
415    * @param candidates The currently available store files
416    * @param request custom compaction request
417    * @return If {@code true}, skip the normal selection process and use the current list
418    * @throws IOException
419    */
420   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
421       final CompactionRequest request) throws IOException {
422     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
423       @Override
424       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
425           throws IOException {
426         oserver.preCompactSelection(ctx, store, candidates, request);
427       }
428     });
429   }
430 
431   /**
432    * Called after the {@link StoreFile}s to be compacted have been selected from the available
433    * candidates.
434    * @param store The store where compaction is being requested
435    * @param selected The store files selected to compact
436    * @param request custom compaction
437    */
438   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
439       final CompactionRequest request) {
440     try {
441       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
442         @Override
443         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
444             throws IOException {
445           oserver.postCompactSelection(ctx, store, selected, request);
446         }
447       });
448     } catch (IOException e) {
449       LOG.warn(e);
450     }
451   }
452 
453   /**
454    * Called prior to rewriting the store files selected for compaction
455    * @param store the store being compacted
456    * @param scanner the scanner used to read store data during compaction
457    * @param scanType type of Scan
458    * @param request the compaction that will be executed
459    * @throws IOException
460    */
461   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
462       final ScanType scanType, final CompactionRequest request) throws IOException {
463     return execOperationWithResult(false, scanner,
464         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
465       @Override
466       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
467           throws IOException {
468         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
469       }
470     });
471   }
472 
473   /**
474    * Called after the store compaction has completed.
475    * @param store the store being compacted
476    * @param resultFile the new store file written during compaction
477    * @param request the compaction that is being executed
478    * @throws IOException
479    */
480   public void postCompact(final Store store, final StoreFile resultFile,
481       final CompactionRequest request) throws IOException {
482     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
483       @Override
484       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
485           throws IOException {
486         oserver.postCompact(ctx, store, resultFile, request);
487       }
488     });
489   }
490 
491   /**
492    * Invoked before a memstore flush
493    * @throws IOException
494    */
495   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
496       throws IOException {
497     return execOperationWithResult(false, scanner,
498         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
499       @Override
500       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
501           throws IOException {
502         setResult(oserver.preFlush(ctx, store, getResult()));
503       }
504     });
505   }
506 
507   /**
508    * Invoked before a memstore flush
509    * @throws IOException
510    */
511   public void preFlush() throws IOException {
512     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
513       @Override
514       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
515           throws IOException {
516         oserver.preFlush(ctx);
517       }
518     });
519   }
520 
521   /**
522    * See
523    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
524    *    Store, KeyValueScanner, InternalScanner)}
525    */
526   public InternalScanner preFlushScannerOpen(final Store store,
527       final KeyValueScanner memstoreScanner) throws IOException {
528     return execOperationWithResult(null,
529         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
530       @Override
531       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
532           throws IOException {
533         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
534       }
535     });
536   }
537 
538   /**
539    * Invoked after a memstore flush
540    * @throws IOException
541    */
542   public void postFlush() throws IOException {
543     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
544       @Override
545       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
546           throws IOException {
547         oserver.postFlush(ctx);
548       }
549     });
550   }
551 
552   /**
553    * Invoked after a memstore flush
554    * @throws IOException
555    */
556   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
557     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
558       @Override
559       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
560           throws IOException {
561         oserver.postFlush(ctx, store, storeFile);
562       }
563     });
564   }
565 
566   /**
567    * Invoked just before a split
568    * @throws IOException
569    */
570   public void preSplit() throws IOException {
571     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
572       @Override
573       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
574           throws IOException {
575         oserver.preSplit(ctx);
576       }
577     });
578   }
579 
580   /**
581    * Invoked just before a split
582    * @throws IOException
583    */
584   public void preSplit(final byte[] splitRow) throws IOException {
585     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
586       @Override
587       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
588           throws IOException {
589         oserver.preSplit(ctx, splitRow);
590       }
591     });
592   }
593 
594   /**
595    * Invoked just after a split
596    * @param l the new left-hand daughter region
597    * @param r the new right-hand daughter region
598    * @throws IOException
599    */
600   public void postSplit(final HRegion l, final HRegion r) throws IOException {
601     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
602       @Override
603       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
604           throws IOException {
605         oserver.postSplit(ctx, l, r);
606       }
607     });
608   }
609 
610   public boolean preSplitBeforePONR(final byte[] splitKey,
611       final List<Mutation> metaEntries) throws IOException {
612     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
613       @Override
614       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
615           throws IOException {
616         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
617       }
618     });
619   }
620 
621   public void preSplitAfterPONR() throws IOException {
622     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
623       @Override
624       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
625           throws IOException {
626         oserver.preSplitAfterPONR(ctx);
627       }
628     });
629   }
630 
631   /**
632    * Invoked just before the rollback of a failed split is started
633    * @throws IOException
634    */
635   public void preRollBackSplit() throws IOException {
636     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
637       @Override
638       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
639           throws IOException {
640         oserver.preRollBackSplit(ctx);
641       }
642     });
643   }
644 
645   /**
646    * Invoked just after the rollback of a failed split is done
647    * @throws IOException
648    */
649   public void postRollBackSplit() throws IOException {
650     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
651       @Override
652       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
653           throws IOException {
654         oserver.postRollBackSplit(ctx);
655       }
656     });
657   }
658 
659   /**
660    * Invoked after a split is completed irrespective of a failure or success.
661    * @throws IOException
662    */
663   public void postCompleteSplit() throws IOException {
664     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
665       @Override
666       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
667           throws IOException {
668         oserver.postCompleteSplit(ctx);
669       }
670     });
671   }
672 
673   // RegionObserver support
674 
675   /**
676    * @param row the row key
677    * @param family the family
678    * @param result the result set from the region
679    * @return true if default processing should be bypassed
680    * @exception IOException Exception
681    */
682   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
683       final Result result) throws IOException {
684     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
685       @Override
686       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
687           throws IOException {
688         oserver.preGetClosestRowBefore(ctx, row, family, result);
689       }
690     });
691   }
692 
693   /**
694    * @param row the row key
695    * @param family the family
696    * @param result the result set from the region
697    * @exception IOException Exception
698    */
699   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
700       final Result result) throws IOException {
701     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
702       @Override
703       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
704           throws IOException {
705         oserver.postGetClosestRowBefore(ctx, row, family, result);
706       }
707     });
708   }
709 
710   /**
711    * @param get the Get request
712    * @return true if default processing should be bypassed
713    * @exception IOException Exception
714    */
715   public boolean preGet(final Get get, final List<Cell> results)
716       throws IOException {
717     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
718       @Override
719       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
720           throws IOException {
721         oserver.preGetOp(ctx, get, results);
722       }
723     });
724   }
725 
726   /**
727    * @param get the Get request
728    * @param results the result sett
729    * @exception IOException Exception
730    */
731   public void postGet(final Get get, final List<Cell> results)
732       throws IOException {
733     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
734       @Override
735       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
736           throws IOException {
737         oserver.postGetOp(ctx, get, results);
738       }
739     });
740   }
741 
742   /**
743    * @param get the Get request
744    * @return true or false to return to client if bypassing normal operation,
745    * or null otherwise
746    * @exception IOException Exception
747    */
748   public Boolean preExists(final Get get) throws IOException {
749     return execOperationWithResult(true, false,
750         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
751       @Override
752       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
753           throws IOException {
754         setResult(oserver.preExists(ctx, get, getResult()));
755       }
756     });
757   }
758 
759   /**
760    * @param get the Get request
761    * @param exists the result returned by the region server
762    * @return the result to return to the client
763    * @exception IOException Exception
764    */
765   public boolean postExists(final Get get, boolean exists)
766       throws IOException {
767     return execOperationWithResult(exists,
768         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
769       @Override
770       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
771           throws IOException {
772         setResult(oserver.postExists(ctx, get, getResult()));
773       }
774     });
775   }
776 
777   /**
778    * @param put The Put object
779    * @param edit The WALEdit object.
780    * @param durability The durability used
781    * @return true if default processing should be bypassed
782    * @exception IOException Exception
783    */
784   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
785       throws IOException {
786     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
787       @Override
788       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
789           throws IOException {
790         oserver.prePut(ctx, put, edit, durability);
791       }
792     });
793   }
794 
795   /**
796    * @param mutation - the current mutation
797    * @param kv - the current cell
798    * @param byteNow - current timestamp in bytes
799    * @param get - the get that could be used
800    * Note that the get only does not specify the family and qualifier that should be used
801    * @return true if default processing should be bypassed
802    * @exception IOException
803    *              Exception
804    */
805   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
806       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
807     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
808       @Override
809       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
810           throws IOException {
811         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
812       }
813     });
814   }
815 
816   /**
817    * @param put The Put object
818    * @param edit The WALEdit object.
819    * @param durability The durability used
820    * @exception IOException Exception
821    */
822   public void postPut(final Put put, final WALEdit edit, final Durability durability)
823       throws IOException {
824     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
825       @Override
826       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
827           throws IOException {
828         oserver.postPut(ctx, put, edit, durability);
829       }
830     });
831   }
832 
833   /**
834    * @param delete The Delete object
835    * @param edit The WALEdit object.
836    * @param durability The durability used
837    * @return true if default processing should be bypassed
838    * @exception IOException Exception
839    */
840   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
841       throws IOException {
842     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
843       @Override
844       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
845           throws IOException {
846         oserver.preDelete(ctx, delete, edit, durability);
847       }
848     });
849   }
850 
851   /**
852    * @param delete The Delete object
853    * @param edit The WALEdit object.
854    * @param durability The durability used
855    * @exception IOException Exception
856    */
857   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
858       throws IOException {
859     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
860       @Override
861       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
862           throws IOException {
863         oserver.postDelete(ctx, delete, edit, durability);
864       }
865     });
866   }
867 
868   /**
869    * @param miniBatchOp
870    * @return true if default processing should be bypassed
871    * @throws IOException
872    */
873   public boolean preBatchMutate(
874       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
875     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
876       @Override
877       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
878           throws IOException {
879         oserver.preBatchMutate(ctx, miniBatchOp);
880       }
881     });
882   }
883 
884   /**
885    * @param miniBatchOp
886    * @throws IOException
887    */
888   public void postBatchMutate(
889       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
890     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
891       @Override
892       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
893           throws IOException {
894         oserver.postBatchMutate(ctx, miniBatchOp);
895       }
896     });
897   }
898 
899   public void postBatchMutateIndispensably(
900       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
901       throws IOException {
902     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
903       @Override
904       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
905           throws IOException {
906         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
907       }
908     });
909   }
910 
911   /**
912    * @param row row to check
913    * @param family column family
914    * @param qualifier column qualifier
915    * @param compareOp the comparison operation
916    * @param comparator the comparator
917    * @param put data to put if check succeeds
918    * @return true or false to return to client if default processing should
919    * be bypassed, or null otherwise
920    * @throws IOException e
921    */
922   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
923       final byte [] qualifier, final CompareOp compareOp,
924       final ByteArrayComparable comparator, final Put put)
925       throws IOException {
926     return execOperationWithResult(true, false,
927         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
928       @Override
929       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
930           throws IOException {
931         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
932           compareOp, comparator, put, getResult()));
933       }
934     });
935   }
936 
937   /**
938    * @param row row to check
939    * @param family column family
940    * @param qualifier column qualifier
941    * @param compareOp the comparison operation
942    * @param comparator the comparator
943    * @param put data to put if check succeeds
944    * @return true or false to return to client if default processing should
945    * be bypassed, or null otherwise
946    * @throws IOException e
947    */
948   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
949       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
950       final Put put) throws IOException {
951     return execOperationWithResult(true, false,
952         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
953       @Override
954       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
955           throws IOException {
956         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
957           compareOp, comparator, put, getResult()));
958       }
959     });
960   }
961 
962   /**
963    * @param row row to check
964    * @param family column family
965    * @param qualifier column qualifier
966    * @param compareOp the comparison operation
967    * @param comparator the comparator
968    * @param put data to put if check succeeds
969    * @throws IOException e
970    */
971   public boolean postCheckAndPut(final byte [] row, final byte [] family,
972       final byte [] qualifier, final CompareOp compareOp,
973       final ByteArrayComparable comparator, final Put put,
974       boolean result) throws IOException {
975     return execOperationWithResult(result,
976         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
977       @Override
978       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
979           throws IOException {
980         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
981           compareOp, comparator, put, getResult()));
982       }
983     });
984   }
985 
986   /**
987    * @param row row to check
988    * @param family column family
989    * @param qualifier column qualifier
990    * @param compareOp the comparison operation
991    * @param comparator the comparator
992    * @param delete delete to commit if check succeeds
993    * @return true or false to return to client if default processing should
994    * be bypassed, or null otherwise
995    * @throws IOException e
996    */
997   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
998       final byte [] qualifier, final CompareOp compareOp,
999       final ByteArrayComparable comparator, final Delete delete)
1000       throws IOException {
1001     return execOperationWithResult(true, false,
1002         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1003       @Override
1004       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1005           throws IOException {
1006         setResult(oserver.preCheckAndDelete(ctx, row, family,
1007             qualifier, compareOp, comparator, delete, getResult()));
1008       }
1009     });
1010   }
1011 
1012   /**
1013    * @param row row to check
1014    * @param family column family
1015    * @param qualifier column qualifier
1016    * @param compareOp the comparison operation
1017    * @param comparator the comparator
1018    * @param delete delete to commit if check succeeds
1019    * @return true or false to return to client if default processing should
1020    * be bypassed, or null otherwise
1021    * @throws IOException e
1022    */
1023   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1024       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1025       final Delete delete) throws IOException {
1026     return execOperationWithResult(true, false,
1027         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1028       @Override
1029       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1030           throws IOException {
1031         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1032               family, qualifier, compareOp, comparator, delete, getResult()));
1033       }
1034     });
1035   }
1036 
1037   /**
1038    * @param row row to check
1039    * @param family column family
1040    * @param qualifier column qualifier
1041    * @param compareOp the comparison operation
1042    * @param comparator the comparator
1043    * @param delete delete to commit if check succeeds
1044    * @throws IOException e
1045    */
1046   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1047       final byte [] qualifier, final CompareOp compareOp,
1048       final ByteArrayComparable comparator, final Delete delete,
1049       boolean result) throws IOException {
1050     return execOperationWithResult(result,
1051         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1052       @Override
1053       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1054           throws IOException {
1055         setResult(oserver.postCheckAndDelete(ctx, row, family,
1056             qualifier, compareOp, comparator, delete, getResult()));
1057       }
1058     });
1059   }
1060 
1061   /**
1062    * @param append append object
1063    * @return result to return to client if default operation should be
1064    * bypassed, null otherwise
1065    * @throws IOException if an error occurred on the coprocessor
1066    */
1067   public Result preAppend(final Append append) throws IOException {
1068     return execOperationWithResult(true, null,
1069         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1070       @Override
1071       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1072           throws IOException {
1073         setResult(oserver.preAppend(ctx, append));
1074       }
1075     });
1076   }
1077 
1078   /**
1079    * @param append append object
1080    * @return result to return to client if default operation should be
1081    * bypassed, null otherwise
1082    * @throws IOException if an error occurred on the coprocessor
1083    */
1084   public Result preAppendAfterRowLock(final Append append) throws IOException {
1085     return execOperationWithResult(true, null,
1086         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1087       @Override
1088       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1089           throws IOException {
1090         setResult(oserver.preAppendAfterRowLock(ctx, append));
1091       }
1092     });
1093   }
1094 
1095   /**
1096    * @param increment increment object
1097    * @return result to return to client if default operation should be
1098    * bypassed, null otherwise
1099    * @throws IOException if an error occurred on the coprocessor
1100    */
1101   public Result preIncrement(final Increment increment) throws IOException {
1102     return execOperationWithResult(true, null,
1103         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1104       @Override
1105       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1106           throws IOException {
1107         setResult(oserver.preIncrement(ctx, increment));
1108       }
1109     });
1110   }
1111 
1112   /**
1113    * @param increment increment object
1114    * @return result to return to client if default operation should be
1115    * bypassed, null otherwise
1116    * @throws IOException if an error occurred on the coprocessor
1117    */
1118   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1119     return execOperationWithResult(true, null,
1120         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1121       @Override
1122       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1123           throws IOException {
1124         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1125       }
1126     });
1127   }
1128 
1129   /**
1130    * @param append Append object
1131    * @param result the result returned by the append
1132    * @throws IOException if an error occurred on the coprocessor
1133    */
1134   public void postAppend(final Append append, final Result result) throws IOException {
1135     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1136       @Override
1137       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1138           throws IOException {
1139         oserver.postAppend(ctx, append, result);
1140       }
1141     });
1142   }
1143 
1144   /**
1145    * @param increment increment object
1146    * @param result the result returned by postIncrement
1147    * @throws IOException if an error occurred on the coprocessor
1148    */
1149   public Result postIncrement(final Increment increment, Result result) throws IOException {
1150     return execOperationWithResult(result,
1151         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1152       @Override
1153       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1154           throws IOException {
1155         setResult(oserver.postIncrement(ctx, increment, getResult()));
1156       }
1157     });
1158   }
1159 
1160   /**
1161    * @param scan the Scan specification
1162    * @return scanner id to return to client if default operation should be
1163    * bypassed, false otherwise
1164    * @exception IOException Exception
1165    */
1166   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1167     return execOperationWithResult(true, null,
1168         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1169       @Override
1170       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1171           throws IOException {
1172         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1173       }
1174     });
1175   }
1176 
1177   /**
1178    * See
1179    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1180    *    Store, Scan, NavigableSet, KeyValueScanner)}
1181    */
1182   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1183       final NavigableSet<byte[]> targetCols) throws IOException {
1184     return execOperationWithResult(null,
1185         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1186       @Override
1187       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1188           throws IOException {
1189         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1190       }
1191     });
1192   }
1193 
1194   /**
1195    * @param scan the Scan specification
1196    * @param s the scanner
1197    * @return the scanner instance to use
1198    * @exception IOException Exception
1199    */
1200   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1201     return execOperationWithResult(s,
1202         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1203       @Override
1204       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1205           throws IOException {
1206         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1207       }
1208     });
1209   }
1210 
1211   /**
1212    * @param s the scanner
1213    * @param results the result set returned by the region server
1214    * @param limit the maximum number of results to return
1215    * @return 'has next' indication to client if bypassing default behavior, or
1216    * null otherwise
1217    * @exception IOException Exception
1218    */
1219   public Boolean preScannerNext(final InternalScanner s,
1220       final List<Result> results, final int limit) throws IOException {
1221     return execOperationWithResult(true, false,
1222         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1223       @Override
1224       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1225           throws IOException {
1226         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1227       }
1228     });
1229   }
1230 
1231   /**
1232    * @param s the scanner
1233    * @param results the result set returned by the region server
1234    * @param limit the maximum number of results to return
1235    * @param hasMore
1236    * @return 'has more' indication to give to client
1237    * @exception IOException Exception
1238    */
1239   public boolean postScannerNext(final InternalScanner s,
1240       final List<Result> results, final int limit, boolean hasMore)
1241       throws IOException {
1242     return execOperationWithResult(hasMore,
1243         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1244       @Override
1245       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1246           throws IOException {
1247         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1248       }
1249     });
1250   }
1251 
1252   /**
1253    * This will be called by the scan flow when the current scanned row is being filtered out by the
1254    * filter.
1255    * @param s the scanner
1256    * @param currentRow The current rowkey which got filtered out
1257    * @param offset offset to rowkey
1258    * @param length length of rowkey
1259    * @return whether more rows are available for the scanner or not
1260    * @throws IOException
1261    */
1262   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1263       final int offset, final short length) throws IOException {
1264     return execOperationWithResult(true,
1265         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1266       @Override
1267       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1268           throws IOException {
1269         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1270       }
1271     });
1272   }
1273 
1274   /**
1275    * @param s the scanner
1276    * @return true if default behavior should be bypassed, false otherwise
1277    * @exception IOException Exception
1278    */
1279   public boolean preScannerClose(final InternalScanner s) throws IOException {
1280     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1281       @Override
1282       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1283           throws IOException {
1284         oserver.preScannerClose(ctx, s);
1285       }
1286     });
1287   }
1288 
1289   /**
1290    * @exception IOException Exception
1291    */
1292   public void postScannerClose(final InternalScanner s) throws IOException {
1293     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1294       @Override
1295       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1296           throws IOException {
1297         oserver.postScannerClose(ctx, s);
1298       }
1299     });
1300   }
1301 
1302   /**
1303    * @param info
1304    * @param logKey
1305    * @param logEdit
1306    * @return true if default behavior should be bypassed, false otherwise
1307    * @throws IOException
1308    */
1309   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1310       final WALEdit logEdit) throws IOException {
1311     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1312       @Override
1313       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1314           throws IOException {
1315         oserver.preWALRestore(ctx, info, logKey, logEdit);
1316       }
1317     });
1318   }
1319 
1320   /**
1321    * @param info
1322    * @param logKey
1323    * @param logEdit
1324    * @throws IOException
1325    */
1326   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1327       throws IOException {
1328     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1329       @Override
1330       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1331           throws IOException {
1332         oserver.postWALRestore(ctx, info, logKey, logEdit);
1333       }
1334     });
1335   }
1336 
1337   /**
1338    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1339    * @return true if the default operation should be bypassed
1340    * @throws IOException
1341    */
1342   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1343     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1344       @Override
1345       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1346           throws IOException {
1347         oserver.preBulkLoadHFile(ctx, familyPaths);
1348       }
1349     });
1350   }
1351 
1352   /**
1353    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1354    * @param hasLoaded whether load was successful or not
1355    * @return the possibly modified value of hasLoaded
1356    * @throws IOException
1357    */
1358   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1359       boolean hasLoaded) throws IOException {
1360     return execOperationWithResult(hasLoaded,
1361         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1362       @Override
1363       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1364           throws IOException {
1365         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1366       }
1367     });
1368   }
1369 
1370   public void postStartRegionOperation(final Operation op) throws IOException {
1371     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1372       @Override
1373       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1374           throws IOException {
1375         oserver.postStartRegionOperation(ctx, op);
1376       }
1377     });
1378   }
1379 
1380   public void postCloseRegionOperation(final Operation op) throws IOException {
1381     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1382       @Override
1383       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1384           throws IOException {
1385         oserver.postCloseRegionOperation(ctx, op);
1386       }
1387     });
1388   }
1389 
1390   /**
1391    * @param fs fileystem to read from
1392    * @param p path to the file
1393    * @param in {@link FSDataInputStreamWrapper}
1394    * @param size Full size of the file
1395    * @param cacheConf
1396    * @param r original reference file. This will be not null only when reading a split file.
1397    * @return a Reader instance to use instead of the base reader if overriding
1398    * default behavior, null otherwise
1399    * @throws IOException
1400    */
1401   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1402       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1403       final Reference r) throws IOException {
1404     return execOperationWithResult(null,
1405         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1406       @Override
1407       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1408           throws IOException {
1409         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1410       }
1411     });
1412   }
1413 
1414   /**
1415    * @param fs fileystem to read from
1416    * @param p path to the file
1417    * @param in {@link FSDataInputStreamWrapper}
1418    * @param size Full size of the file
1419    * @param cacheConf
1420    * @param r original reference file. This will be not null only when reading a split file.
1421    * @param reader the base reader instance
1422    * @return The reader to use
1423    * @throws IOException
1424    */
1425   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1426       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1427       final Reference r, final StoreFile.Reader reader) throws IOException {
1428     return execOperationWithResult(reader,
1429         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1430       @Override
1431       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1432           throws IOException {
1433         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1434       }
1435     });
1436   }
1437 
1438   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1439       final Cell oldCell, Cell newCell) throws IOException {
1440     return execOperationWithResult(newCell,
1441         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1442       @Override
1443       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1444           throws IOException {
1445         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1446       }
1447     });
1448   }
1449 
1450   public Message preEndpointInvocation(final Service service, final String methodName,
1451       Message request) throws IOException {
1452     return execOperationWithResult(request,
1453         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1454       @Override
1455       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1456           throws IOException {
1457         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1458       }
1459     });
1460   }
1461 
1462   public void postEndpointInvocation(final Service service, final String methodName,
1463       final Message request, final Message.Builder responseBuilder) throws IOException {
1464     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1465       @Override
1466       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1467           throws IOException {
1468         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1469       }
1470     });
1471   }
1472 
1473   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1474     return execOperationWithResult(tracker,
1475         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1476       @Override
1477       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1478           throws IOException {
1479         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1480       }
1481     });
1482   }
1483 
1484   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1485     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1486     for (RegionEnvironment env : coprocessors) {
1487       DescriptiveStatistics ds = new DescriptiveStatistics();
1488       if (env.getInstance() instanceof RegionObserver) {
1489         for (Long time : env.getExecutionLatenciesNanos()) {
1490           ds.addValue(time);
1491         }
1492         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1493         if (ds.getN() == 0) {
1494           ds.addValue(0);
1495         }
1496         results.put(env.getInstance().getClass().getSimpleName(), ds);
1497       }
1498     }
1499     return results;
1500   }
1501 
1502   private static abstract class CoprocessorOperation
1503       extends ObserverContext<RegionCoprocessorEnvironment> {
1504     public abstract void call(Coprocessor observer,
1505         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1506     public abstract boolean hasCall(Coprocessor observer);
1507     public void postEnvCall(RegionEnvironment env) { }
1508   }
1509 
1510   private static abstract class RegionOperation extends CoprocessorOperation {
1511     public abstract void call(RegionObserver observer,
1512         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1513 
1514     public boolean hasCall(Coprocessor observer) {
1515       return observer instanceof RegionObserver;
1516     }
1517 
1518     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1519         throws IOException {
1520       call((RegionObserver)observer, ctx);
1521     }
1522   }
1523 
1524   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1525     private T result = null;
1526     public void setResult(final T result) { this.result = result; }
1527     public T getResult() { return this.result; }
1528   }
1529 
1530   private static abstract class EndpointOperation extends CoprocessorOperation {
1531     public abstract void call(EndpointObserver observer,
1532         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1533 
1534     public boolean hasCall(Coprocessor observer) {
1535       return observer instanceof EndpointObserver;
1536     }
1537 
1538     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1539         throws IOException {
1540       call((EndpointObserver)observer, ctx);
1541     }
1542   }
1543 
1544   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1545     private T result = null;
1546     public void setResult(final T result) { this.result = result; }
1547     public T getResult() { return this.result; }
1548   }
1549 
1550   private boolean execOperation(final CoprocessorOperation ctx)
1551       throws IOException {
1552     return execOperation(true, ctx);
1553   }
1554 
1555   private <T> T execOperationWithResult(final T defaultValue,
1556       final RegionOperationWithResult<T> ctx) throws IOException {
1557     if (ctx == null) return defaultValue;
1558     ctx.setResult(defaultValue);
1559     execOperation(true, ctx);
1560     return ctx.getResult();
1561   }
1562 
1563   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1564       final RegionOperationWithResult<T> ctx) throws IOException {
1565     boolean bypass = false;
1566     T result = defaultValue;
1567     if (ctx != null) {
1568       ctx.setResult(defaultValue);
1569       bypass = execOperation(true, ctx);
1570       result = ctx.getResult();
1571     }
1572     return bypass == ifBypass ? result : null;
1573   }
1574 
1575   private <T> T execOperationWithResult(final T defaultValue,
1576       final EndpointOperationWithResult<T> ctx) throws IOException {
1577     if (ctx == null) return defaultValue;
1578     ctx.setResult(defaultValue);
1579     execOperation(true, ctx);
1580     return ctx.getResult();
1581   }
1582 
1583   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1584       throws IOException {
1585     boolean bypass = false;
1586     for (RegionEnvironment env: coprocessors) {
1587       Coprocessor observer = env.getInstance();
1588       if (ctx.hasCall(observer)) {
1589         long startTime = System.nanoTime();
1590         ctx.prepare(env);
1591         Thread currentThread = Thread.currentThread();
1592         ClassLoader cl = currentThread.getContextClassLoader();
1593         try {
1594           currentThread.setContextClassLoader(env.getClassLoader());
1595           ctx.call(observer, ctx);
1596         } catch (Throwable e) {
1597           handleCoprocessorThrowable(env, e);
1598         } finally {
1599           currentThread.setContextClassLoader(cl);
1600         }
1601         env.offerExecutionLatency(System.nanoTime() - startTime);
1602         bypass |= ctx.shouldBypass();
1603         if (earlyExit && ctx.shouldComplete()) {
1604           break;
1605         }
1606       }
1607 
1608       ctx.postEnvCall(env);
1609     }
1610     return bypass;
1611   }
1612 }