View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NavigableSet;
29  import java.util.concurrent.ArrayBlockingQueue;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.regex.Matcher;
34  
35  import org.apache.commons.collections.map.AbstractReferenceMap;
36  import org.apache.commons.collections.map.ReferenceMap;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.Coprocessor;
45  import org.apache.hadoop.hbase.CoprocessorEnvironment;
46  import org.apache.hadoop.hbase.HBaseConfiguration;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.client.Append;
50  import org.apache.hadoop.hbase.client.Delete;
51  import org.apache.hadoop.hbase.client.Durability;
52  import org.apache.hadoop.hbase.client.Get;
53  import org.apache.hadoop.hbase.client.Increment;
54  import org.apache.hadoop.hbase.client.Mutation;
55  import org.apache.hadoop.hbase.client.Put;
56  import org.apache.hadoop.hbase.client.Result;
57  import org.apache.hadoop.hbase.client.Scan;
58  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
59  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
60  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
61  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
62  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
63  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
64  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
65  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
66  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
67  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
68  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
69  import org.apache.hadoop.hbase.io.Reference;
70  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
71  import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
72  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
73  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
74  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.Pair;
77  
78  import com.google.common.collect.ImmutableList;
79  import com.google.common.collect.Lists;
80  import com.google.protobuf.Message;
81  import com.google.protobuf.Service;
82  
83  /**
84   * Implements the coprocessor environment and runtime support for coprocessors
85   * loaded within a {@link HRegion}.
86   */
87  public class RegionCoprocessorHost
88      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
89  
90    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
91    // The shared data map
92    private static ReferenceMap sharedDataMap =
93        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
94  
95    /**
96     * Encapsulation of the environment of each coprocessor
97     */
98    static class RegionEnvironment extends CoprocessorHost.Environment
99        implements RegionCoprocessorEnvironment {
100 
101     private HRegion region;
102     private RegionServerServices rsServices;
103     ConcurrentMap<String, Object> sharedData;
104     private static final int LATENCY_BUFFER_SIZE = 100;
105     private final BlockingQueue<Long> coprocessorTimeNanos = new ArrayBlockingQueue<Long>(
106         LATENCY_BUFFER_SIZE);
107 
108     /**
109      * Constructor
110      * @param impl the coprocessor instance
111      * @param priority chaining priority
112      */
113     public RegionEnvironment(final Coprocessor impl, final int priority,
114         final int seq, final Configuration conf, final HRegion region,
115         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
116       super(impl, priority, seq, conf);
117       this.region = region;
118       this.rsServices = services;
119       this.sharedData = sharedData;
120     }
121 
122     /** @return the region */
123     @Override
124     public HRegion getRegion() {
125       return region;
126     }
127 
128     /** @return reference to the region server services */
129     @Override
130     public RegionServerServices getRegionServerServices() {
131       return rsServices;
132     }
133 
134     public void shutdown() {
135       super.shutdown();
136     }
137 
138     @Override
139     public ConcurrentMap<String, Object> getSharedData() {
140       return sharedData;
141     }
142 
143     public void offerExecutionLatency(long latencyNanos) {
144       coprocessorTimeNanos.offer(latencyNanos);
145     }
146 
147     public Collection<Long> getExecutionLatenciesNanos() {
148       final List<Long> latencies = Lists.newArrayListWithCapacity(coprocessorTimeNanos.size());
149       coprocessorTimeNanos.drainTo(latencies);
150       return latencies;
151     }
152 
153   }
154 
155   /** The region server services */
156   RegionServerServices rsServices;
157   /** The region */
158   HRegion region;
159 
160   /**
161    * Constructor
162    * @param region the region
163    * @param rsServices interface to available region server functionality
164    * @param conf the configuration
165    */
166   public RegionCoprocessorHost(final HRegion region,
167       final RegionServerServices rsServices, final Configuration conf) {
168     super(rsServices);
169     this.conf = conf;
170     this.rsServices = rsServices;
171     this.region = region;
172     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
173 
174     // load system default cp's from configuration.
175     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
176 
177     // load system default cp's for user tables from configuration.
178     if (!region.getRegionInfo().getTable().isSystemTable()) {
179       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
180     }
181 
182     // load Coprocessor From HDFS
183     loadTableCoprocessors(conf);
184   }
185 
186   void loadTableCoprocessors(final Configuration conf) {
187     // scan the table attributes for coprocessor load specifications
188     // initialize the coprocessors
189     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
190     for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
191         region.getTableDesc().getValues().entrySet()) {
192       String key = Bytes.toString(e.getKey().get()).trim();
193       String spec = Bytes.toString(e.getValue().get()).trim();
194       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
195         // found one
196         try {
197           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
198           if (matcher.matches()) {
199             // jar file path can be empty if the cp class can be loaded
200             // from class loader.
201             Path path = matcher.group(1).trim().isEmpty() ?
202                 null : new Path(matcher.group(1).trim());
203             String className = matcher.group(2).trim();
204             int priority = matcher.group(3).trim().isEmpty() ?
205                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
206             String cfgSpec = null;
207             try {
208               cfgSpec = matcher.group(4);
209             } catch (IndexOutOfBoundsException ex) {
210               // ignore
211             }
212             Configuration ourConf;
213             if (cfgSpec != null) {
214               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
215               // do an explicit deep copy of the passed configuration
216               ourConf = new Configuration(false);
217               HBaseConfiguration.merge(ourConf, conf);
218               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
219               while (m.find()) {
220                 ourConf.set(m.group(1), m.group(2));
221               }
222             } else {
223               ourConf = conf;
224             }
225             // Load encompasses classloading and coprocessor initialization
226             try {
227               RegionEnvironment env = load(path, className, priority, ourConf);
228               configured.add(env);
229               LOG.info("Loaded coprocessor " + className + " from HTD of " +
230                 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
231             } catch (Throwable t) {
232               // Coprocessor failed to load, do we abort on error?
233               if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
234                 abortServer(className, t);
235               } else {
236                 LOG.error("Failed to load coprocessor " + className, t);
237               }
238             }
239           } else {
240             LOG.error("Malformed table coprocessor specification: key=" + key +
241               ", spec: " + spec);
242           }
243         } catch (Exception ioe) {
244           LOG.error("Malformed table coprocessor specification: key=" + key +
245             ", spec: " + spec);
246         }
247       }
248     }
249     // add together to coprocessor set for COW efficiency
250     coprocessors.addAll(configured);
251   }
252 
253   @Override
254   public RegionEnvironment createEnvironment(Class<?> implClass,
255       Coprocessor instance, int priority, int seq, Configuration conf) {
256     // Check if it's an Endpoint.
257     // Due to current dynamic protocol design, Endpoint
258     // uses a different way to be registered and executed.
259     // It uses a visitor pattern to invoke registered Endpoint
260     // method.
261     for (Class<?> c : implClass.getInterfaces()) {
262       if (CoprocessorService.class.isAssignableFrom(c)) {
263         region.registerService( ((CoprocessorService)instance).getService() );
264       }
265     }
266     ConcurrentMap<String, Object> classData;
267     // make sure only one thread can add maps
268     synchronized (sharedDataMap) {
269       // as long as at least one RegionEnvironment holds on to its classData it will
270       // remain in this map
271       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
272       if (classData == null) {
273         classData = new ConcurrentHashMap<String, Object>();
274         sharedDataMap.put(implClass.getName(), classData);
275       }
276     }
277     return new RegionEnvironment(instance, priority, seq, conf, region,
278         rsServices, classData);
279   }
280 
281   /**
282    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
283    *
284    * For example, {@link
285    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
286    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
287    *
288    * See also
289    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
290    *    CoprocessorEnvironment, Throwable)}
291    * @param env The coprocessor that threw the exception.
292    * @param e The exception that was thrown.
293    */
294   private void handleCoprocessorThrowableNoRethrow(
295       final CoprocessorEnvironment env, final Throwable e) {
296     try {
297       handleCoprocessorThrowable(env,e);
298     } catch (IOException ioe) {
299       // We cannot throw exceptions from the caller hook, so ignore.
300       LOG.warn(
301         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
302         e + ". Ignoring.",e);
303     }
304   }
305 
306   /**
307    * Invoked before a region open.
308    *
309    * @throws IOException Signals that an I/O exception has occurred.
310    */
311   public void preOpen() throws IOException {
312     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
313       @Override
314       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
315           throws IOException {
316         oserver.preOpen(ctx);
317       }
318     });
319   }
320 
321   /**
322    * Invoked after a region open
323    */
324   public void postOpen() {
325     try {
326       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
327         @Override
328         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
329             throws IOException {
330           oserver.postOpen(ctx);
331         }
332       });
333     } catch (IOException e) {
334       LOG.warn(e);
335     }
336   }
337 
338   /**
339    * Invoked after log replay on region
340    */
341   public void postLogReplay() {
342     try {
343       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
344         @Override
345         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
346             throws IOException {
347           oserver.postLogReplay(ctx);
348         }
349       });
350     } catch (IOException e) {
351       LOG.warn(e);
352     }
353   }
354 
355   /**
356    * Invoked before a region is closed
357    * @param abortRequested true if the server is aborting
358    */
359   public void preClose(final boolean abortRequested) throws IOException {
360     execOperation(false, new RegionOperation() {
361       @Override
362       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
363           throws IOException {
364         oserver.preClose(ctx, abortRequested);
365       }
366     });
367   }
368 
369   /**
370    * Invoked after a region is closed
371    * @param abortRequested true if the server is aborting
372    */
373   public void postClose(final boolean abortRequested) {
374     try {
375       execOperation(false, new RegionOperation() {
376         @Override
377         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
378             throws IOException {
379           oserver.postClose(ctx, abortRequested);
380         }
381         public void postEnvCall(RegionEnvironment env) {
382           shutdown(env);
383         }
384       });
385     } catch (IOException e) {
386       LOG.warn(e);
387     }
388   }
389 
390   /**
391    * See
392    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
393    */
394   public InternalScanner preCompactScannerOpen(final Store store,
395       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
396       final CompactionRequest request) throws IOException {
397     return execOperationWithResult(null,
398         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
399       @Override
400       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
401           throws IOException {
402         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
403           earliestPutTs, getResult(), request));
404       }
405     });
406   }
407 
408   /**
409    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
410    * available candidates.
411    * @param store The store where compaction is being requested
412    * @param candidates The currently available store files
413    * @param request custom compaction request
414    * @return If {@code true}, skip the normal selection process and use the current list
415    * @throws IOException
416    */
417   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
418       final CompactionRequest request) throws IOException {
419     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
420       @Override
421       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
422           throws IOException {
423         oserver.preCompactSelection(ctx, store, candidates, request);
424       }
425     });
426   }
427 
428   /**
429    * Called after the {@link StoreFile}s to be compacted have been selected from the available
430    * candidates.
431    * @param store The store where compaction is being requested
432    * @param selected The store files selected to compact
433    * @param request custom compaction
434    */
435   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
436       final CompactionRequest request) {
437     try {
438       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
439         @Override
440         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
441             throws IOException {
442           oserver.postCompactSelection(ctx, store, selected, request);
443         }
444       });
445     } catch (IOException e) {
446       LOG.warn(e);
447     }
448   }
449 
450   /**
451    * Called prior to rewriting the store files selected for compaction
452    * @param store the store being compacted
453    * @param scanner the scanner used to read store data during compaction
454    * @param scanType type of Scan
455    * @param request the compaction that will be executed
456    * @throws IOException
457    */
458   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
459       final ScanType scanType, final CompactionRequest request) throws IOException {
460     return execOperationWithResult(false, scanner,
461         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
462       @Override
463       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
464           throws IOException {
465         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
466       }
467     });
468   }
469 
470   /**
471    * Called after the store compaction has completed.
472    * @param store the store being compacted
473    * @param resultFile the new store file written during compaction
474    * @param request the compaction that is being executed
475    * @throws IOException
476    */
477   public void postCompact(final Store store, final StoreFile resultFile,
478       final CompactionRequest request) throws IOException {
479     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
480       @Override
481       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
482           throws IOException {
483         oserver.postCompact(ctx, store, resultFile, request);
484       }
485     });
486   }
487 
488   /**
489    * Invoked before a memstore flush
490    * @throws IOException
491    */
492   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
493       throws IOException {
494     return execOperationWithResult(false, scanner,
495         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
496       @Override
497       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
498           throws IOException {
499         setResult(oserver.preFlush(ctx, store, getResult()));
500       }
501     });
502   }
503 
504   /**
505    * Invoked before a memstore flush
506    * @throws IOException
507    */
508   public void preFlush() throws IOException {
509     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
510       @Override
511       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
512           throws IOException {
513         oserver.preFlush(ctx);
514       }
515     });
516   }
517 
518   /**
519    * See
520    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
521    *    Store, KeyValueScanner, InternalScanner)}
522    */
523   public InternalScanner preFlushScannerOpen(final Store store,
524       final KeyValueScanner memstoreScanner) throws IOException {
525     return execOperationWithResult(null,
526         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
527       @Override
528       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
529           throws IOException {
530         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
531       }
532     });
533   }
534 
535   /**
536    * Invoked after a memstore flush
537    * @throws IOException
538    */
539   public void postFlush() throws IOException {
540     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
541       @Override
542       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
543           throws IOException {
544         oserver.postFlush(ctx);
545       }
546     });
547   }
548 
549   /**
550    * Invoked after a memstore flush
551    * @throws IOException
552    */
553   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
554     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
555       @Override
556       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
557           throws IOException {
558         oserver.postFlush(ctx, store, storeFile);
559       }
560     });
561   }
562 
563   /**
564    * Invoked just before a split
565    * @throws IOException
566    */
567   public void preSplit() throws IOException {
568     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
569       @Override
570       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
571           throws IOException {
572         oserver.preSplit(ctx);
573       }
574     });
575   }
576 
577   /**
578    * Invoked just before a split
579    * @throws IOException
580    */
581   public void preSplit(final byte[] splitRow) throws IOException {
582     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
583       @Override
584       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
585           throws IOException {
586         oserver.preSplit(ctx, splitRow);
587       }
588     });
589   }
590 
591   /**
592    * Invoked just after a split
593    * @param l the new left-hand daughter region
594    * @param r the new right-hand daughter region
595    * @throws IOException
596    */
597   public void postSplit(final HRegion l, final HRegion r) throws IOException {
598     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
599       @Override
600       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
601           throws IOException {
602         oserver.postSplit(ctx, l, r);
603       }
604     });
605   }
606 
607   public boolean preSplitBeforePONR(final byte[] splitKey,
608       final List<Mutation> metaEntries) throws IOException {
609     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
610       @Override
611       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
612           throws IOException {
613         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
614       }
615     });
616   }
617 
618   public void preSplitAfterPONR() throws IOException {
619     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
620       @Override
621       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
622           throws IOException {
623         oserver.preSplitAfterPONR(ctx);
624       }
625     });
626   }
627 
628   /**
629    * Invoked just before the rollback of a failed split is started
630    * @throws IOException
631    */
632   public void preRollBackSplit() throws IOException {
633     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
634       @Override
635       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
636           throws IOException {
637         oserver.preRollBackSplit(ctx);
638       }
639     });
640   }
641 
642   /**
643    * Invoked just after the rollback of a failed split is done
644    * @throws IOException
645    */
646   public void postRollBackSplit() throws IOException {
647     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
648       @Override
649       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
650           throws IOException {
651         oserver.postRollBackSplit(ctx);
652       }
653     });
654   }
655 
656   /**
657    * Invoked after a split is completed irrespective of a failure or success.
658    * @throws IOException
659    */
660   public void postCompleteSplit() throws IOException {
661     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
662       @Override
663       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
664           throws IOException {
665         oserver.postCompleteSplit(ctx);
666       }
667     });
668   }
669 
670   // RegionObserver support
671 
672   /**
673    * @param row the row key
674    * @param family the family
675    * @param result the result set from the region
676    * @return true if default processing should be bypassed
677    * @exception IOException Exception
678    */
679   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
680       final Result result) throws IOException {
681     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
682       @Override
683       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
684           throws IOException {
685         oserver.preGetClosestRowBefore(ctx, row, family, result);
686       }
687     });
688   }
689 
690   /**
691    * @param row the row key
692    * @param family the family
693    * @param result the result set from the region
694    * @exception IOException Exception
695    */
696   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
697       final Result result) throws IOException {
698     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
699       @Override
700       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
701           throws IOException {
702         oserver.postGetClosestRowBefore(ctx, row, family, result);
703       }
704     });
705   }
706 
707   /**
708    * @param get the Get request
709    * @return true if default processing should be bypassed
710    * @exception IOException Exception
711    */
712   public boolean preGet(final Get get, final List<Cell> results)
713       throws IOException {
714     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
715       @Override
716       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
717           throws IOException {
718         oserver.preGetOp(ctx, get, results);
719       }
720     });
721   }
722 
723   /**
724    * @param get the Get request
725    * @param results the result sett
726    * @exception IOException Exception
727    */
728   public void postGet(final Get get, final List<Cell> results)
729       throws IOException {
730     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
731       @Override
732       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
733           throws IOException {
734         oserver.postGetOp(ctx, get, results);
735       }
736     });
737   }
738 
739   /**
740    * @param get the Get request
741    * @return true or false to return to client if bypassing normal operation,
742    * or null otherwise
743    * @exception IOException Exception
744    */
745   public Boolean preExists(final Get get) throws IOException {
746     return execOperationWithResult(true, false,
747         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
748       @Override
749       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
750           throws IOException {
751         setResult(oserver.preExists(ctx, get, getResult()));
752       }
753     });
754   }
755 
756   /**
757    * @param get the Get request
758    * @param exists the result returned by the region server
759    * @return the result to return to the client
760    * @exception IOException Exception
761    */
762   public boolean postExists(final Get get, boolean exists)
763       throws IOException {
764     return execOperationWithResult(exists,
765         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
766       @Override
767       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
768           throws IOException {
769         setResult(oserver.postExists(ctx, get, getResult()));
770       }
771     });
772   }
773 
774   /**
775    * @param put The Put object
776    * @param edit The WALEdit object.
777    * @param durability The durability used
778    * @return true if default processing should be bypassed
779    * @exception IOException Exception
780    */
781   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
782       throws IOException {
783     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
784       @Override
785       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
786           throws IOException {
787         oserver.prePut(ctx, put, edit, durability);
788       }
789     });
790   }
791 
792   /**
793    * @param mutation - the current mutation
794    * @param kv - the current cell
795    * @param byteNow - current timestamp in bytes
796    * @param get - the get that could be used
797    * Note that the get only does not specify the family and qualifier that should be used
798    * @return true if default processing should be bypassed
799    * @exception IOException
800    *              Exception
801    */
802   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
803       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
804     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
805       @Override
806       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
807           throws IOException {
808         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
809       }
810     });
811   }
812 
813   /**
814    * @param put The Put object
815    * @param edit The WALEdit object.
816    * @param durability The durability used
817    * @exception IOException Exception
818    */
819   public void postPut(final Put put, final WALEdit edit, final Durability durability)
820       throws IOException {
821     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
822       @Override
823       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
824           throws IOException {
825         oserver.postPut(ctx, put, edit, durability);
826       }
827     });
828   }
829 
830   /**
831    * @param delete The Delete object
832    * @param edit The WALEdit object.
833    * @param durability The durability used
834    * @return true if default processing should be bypassed
835    * @exception IOException Exception
836    */
837   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
838       throws IOException {
839     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
840       @Override
841       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
842           throws IOException {
843         oserver.preDelete(ctx, delete, edit, durability);
844       }
845     });
846   }
847 
848   /**
849    * @param delete The Delete object
850    * @param edit The WALEdit object.
851    * @param durability The durability used
852    * @exception IOException Exception
853    */
854   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
855       throws IOException {
856     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
857       @Override
858       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
859           throws IOException {
860         oserver.postDelete(ctx, delete, edit, durability);
861       }
862     });
863   }
864 
865   /**
866    * @param miniBatchOp
867    * @return true if default processing should be bypassed
868    * @throws IOException
869    */
870   public boolean preBatchMutate(
871       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
872     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
873       @Override
874       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
875           throws IOException {
876         oserver.preBatchMutate(ctx, miniBatchOp);
877       }
878     });
879   }
880 
881   /**
882    * @param miniBatchOp
883    * @throws IOException
884    */
885   public void postBatchMutate(
886       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
887     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
888       @Override
889       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
890           throws IOException {
891         oserver.postBatchMutate(ctx, miniBatchOp);
892       }
893     });
894   }
895 
896   public void postBatchMutateIndispensably(
897       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
898       throws IOException {
899     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
900       @Override
901       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
902           throws IOException {
903         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
904       }
905     });
906   }
907 
908   /**
909    * @param row row to check
910    * @param family column family
911    * @param qualifier column qualifier
912    * @param compareOp the comparison operation
913    * @param comparator the comparator
914    * @param put data to put if check succeeds
915    * @return true or false to return to client if default processing should
916    * be bypassed, or null otherwise
917    * @throws IOException e
918    */
919   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
920       final byte [] qualifier, final CompareOp compareOp,
921       final ByteArrayComparable comparator, final Put put)
922       throws IOException {
923     return execOperationWithResult(true, false,
924         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
925       @Override
926       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
927           throws IOException {
928         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
929           compareOp, comparator, put, getResult()));
930       }
931     });
932   }
933 
934   /**
935    * @param row row to check
936    * @param family column family
937    * @param qualifier column qualifier
938    * @param compareOp the comparison operation
939    * @param comparator the comparator
940    * @param put data to put if check succeeds
941    * @return true or false to return to client if default processing should
942    * be bypassed, or null otherwise
943    * @throws IOException e
944    */
945   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
946       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
947       final Put put) throws IOException {
948     return execOperationWithResult(true, false,
949         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
950       @Override
951       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
952           throws IOException {
953         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
954           compareOp, comparator, put, getResult()));
955       }
956     });
957   }
958 
959   /**
960    * @param row row to check
961    * @param family column family
962    * @param qualifier column qualifier
963    * @param compareOp the comparison operation
964    * @param comparator the comparator
965    * @param put data to put if check succeeds
966    * @throws IOException e
967    */
968   public boolean postCheckAndPut(final byte [] row, final byte [] family,
969       final byte [] qualifier, final CompareOp compareOp,
970       final ByteArrayComparable comparator, final Put put,
971       boolean result) throws IOException {
972     return execOperationWithResult(result,
973         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
974       @Override
975       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
976           throws IOException {
977         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
978           compareOp, comparator, put, getResult()));
979       }
980     });
981   }
982 
983   /**
984    * @param row row to check
985    * @param family column family
986    * @param qualifier column qualifier
987    * @param compareOp the comparison operation
988    * @param comparator the comparator
989    * @param delete delete to commit if check succeeds
990    * @return true or false to return to client if default processing should
991    * be bypassed, or null otherwise
992    * @throws IOException e
993    */
994   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
995       final byte [] qualifier, final CompareOp compareOp,
996       final ByteArrayComparable comparator, final Delete delete)
997       throws IOException {
998     return execOperationWithResult(true, false,
999         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1000       @Override
1001       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1002           throws IOException {
1003         setResult(oserver.preCheckAndDelete(ctx, row, family,
1004             qualifier, compareOp, comparator, delete, getResult()));
1005       }
1006     });
1007   }
1008 
1009   /**
1010    * @param row row to check
1011    * @param family column family
1012    * @param qualifier column qualifier
1013    * @param compareOp the comparison operation
1014    * @param comparator the comparator
1015    * @param delete delete to commit if check succeeds
1016    * @return true or false to return to client if default processing should
1017    * be bypassed, or null otherwise
1018    * @throws IOException e
1019    */
1020   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1021       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1022       final Delete delete) throws IOException {
1023     return execOperationWithResult(true, false,
1024         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1025       @Override
1026       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1027           throws IOException {
1028         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1029               family, qualifier, compareOp, comparator, delete, getResult()));
1030       }
1031     });
1032   }
1033 
1034   /**
1035    * @param row row to check
1036    * @param family column family
1037    * @param qualifier column qualifier
1038    * @param compareOp the comparison operation
1039    * @param comparator the comparator
1040    * @param delete delete to commit if check succeeds
1041    * @throws IOException e
1042    */
1043   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1044       final byte [] qualifier, final CompareOp compareOp,
1045       final ByteArrayComparable comparator, final Delete delete,
1046       boolean result) throws IOException {
1047     return execOperationWithResult(result,
1048         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1049       @Override
1050       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1051           throws IOException {
1052         setResult(oserver.postCheckAndDelete(ctx, row, family,
1053             qualifier, compareOp, comparator, delete, getResult()));
1054       }
1055     });
1056   }
1057 
1058   /**
1059    * @param append append object
1060    * @return result to return to client if default operation should be
1061    * bypassed, null otherwise
1062    * @throws IOException if an error occurred on the coprocessor
1063    */
1064   public Result preAppend(final Append append) throws IOException {
1065     return execOperationWithResult(true, null,
1066         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1067       @Override
1068       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1069           throws IOException {
1070         setResult(oserver.preAppend(ctx, append));
1071       }
1072     });
1073   }
1074 
1075   /**
1076    * @param append append object
1077    * @return result to return to client if default operation should be
1078    * bypassed, null otherwise
1079    * @throws IOException if an error occurred on the coprocessor
1080    */
1081   public Result preAppendAfterRowLock(final Append append) throws IOException {
1082     return execOperationWithResult(true, null,
1083         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1084       @Override
1085       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1086           throws IOException {
1087         setResult(oserver.preAppendAfterRowLock(ctx, append));
1088       }
1089     });
1090   }
1091 
1092   /**
1093    * @param increment increment object
1094    * @return result to return to client if default operation should be
1095    * bypassed, null otherwise
1096    * @throws IOException if an error occurred on the coprocessor
1097    */
1098   public Result preIncrement(final Increment increment) throws IOException {
1099     return execOperationWithResult(true, null,
1100         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1101       @Override
1102       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1103           throws IOException {
1104         setResult(oserver.preIncrement(ctx, increment));
1105       }
1106     });
1107   }
1108 
1109   /**
1110    * @param increment increment object
1111    * @return result to return to client if default operation should be
1112    * bypassed, null otherwise
1113    * @throws IOException if an error occurred on the coprocessor
1114    */
1115   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1116     return execOperationWithResult(true, null,
1117         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1118       @Override
1119       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1120           throws IOException {
1121         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1122       }
1123     });
1124   }
1125 
1126   /**
1127    * @param append Append object
1128    * @param result the result returned by the append
1129    * @throws IOException if an error occurred on the coprocessor
1130    */
1131   public void postAppend(final Append append, final Result result) throws IOException {
1132     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1133       @Override
1134       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1135           throws IOException {
1136         oserver.postAppend(ctx, append, result);
1137       }
1138     });
1139   }
1140 
1141   /**
1142    * @param increment increment object
1143    * @param result the result returned by postIncrement
1144    * @throws IOException if an error occurred on the coprocessor
1145    */
1146   public Result postIncrement(final Increment increment, Result result) throws IOException {
1147     return execOperationWithResult(result,
1148         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1149       @Override
1150       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1151           throws IOException {
1152         setResult(oserver.postIncrement(ctx, increment, getResult()));
1153       }
1154     });
1155   }
1156 
1157   /**
1158    * @param scan the Scan specification
1159    * @return scanner id to return to client if default operation should be
1160    * bypassed, false otherwise
1161    * @exception IOException Exception
1162    */
1163   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1164     return execOperationWithResult(true, null,
1165         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1166       @Override
1167       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1168           throws IOException {
1169         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1170       }
1171     });
1172   }
1173 
1174   /**
1175    * See
1176    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1177    *    Store, Scan, NavigableSet, KeyValueScanner)}
1178    */
1179   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1180       final NavigableSet<byte[]> targetCols) throws IOException {
1181     return execOperationWithResult(null,
1182         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1183       @Override
1184       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1185           throws IOException {
1186         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1187       }
1188     });
1189   }
1190 
1191   /**
1192    * @param scan the Scan specification
1193    * @param s the scanner
1194    * @return the scanner instance to use
1195    * @exception IOException Exception
1196    */
1197   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1198     return execOperationWithResult(s,
1199         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1200       @Override
1201       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1202           throws IOException {
1203         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1204       }
1205     });
1206   }
1207 
1208   /**
1209    * @param s the scanner
1210    * @param results the result set returned by the region server
1211    * @param limit the maximum number of results to return
1212    * @return 'has next' indication to client if bypassing default behavior, or
1213    * null otherwise
1214    * @exception IOException Exception
1215    */
1216   public Boolean preScannerNext(final InternalScanner s,
1217       final List<Result> results, final int limit) throws IOException {
1218     return execOperationWithResult(true, false,
1219         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1220       @Override
1221       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1222           throws IOException {
1223         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1224       }
1225     });
1226   }
1227 
1228   /**
1229    * @param s the scanner
1230    * @param results the result set returned by the region server
1231    * @param limit the maximum number of results to return
1232    * @param hasMore
1233    * @return 'has more' indication to give to client
1234    * @exception IOException Exception
1235    */
1236   public boolean postScannerNext(final InternalScanner s,
1237       final List<Result> results, final int limit, boolean hasMore)
1238       throws IOException {
1239     return execOperationWithResult(hasMore,
1240         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1241       @Override
1242       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1243           throws IOException {
1244         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1245       }
1246     });
1247   }
1248 
1249   /**
1250    * This will be called by the scan flow when the current scanned row is being filtered out by the
1251    * filter.
1252    * @param s the scanner
1253    * @param currentRow The current rowkey which got filtered out
1254    * @param offset offset to rowkey
1255    * @param length length of rowkey
1256    * @return whether more rows are available for the scanner or not
1257    * @throws IOException
1258    */
1259   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1260       final int offset, final short length) throws IOException {
1261     return execOperationWithResult(true,
1262         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1263       @Override
1264       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1265           throws IOException {
1266         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1267       }
1268     });
1269   }
1270 
1271   /**
1272    * @param s the scanner
1273    * @return true if default behavior should be bypassed, false otherwise
1274    * @exception IOException Exception
1275    */
1276   public boolean preScannerClose(final InternalScanner s) throws IOException {
1277     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1278       @Override
1279       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1280           throws IOException {
1281         oserver.preScannerClose(ctx, s);
1282       }
1283     });
1284   }
1285 
1286   /**
1287    * @exception IOException Exception
1288    */
1289   public void postScannerClose(final InternalScanner s) throws IOException {
1290     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1291       @Override
1292       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1293           throws IOException {
1294         oserver.postScannerClose(ctx, s);
1295       }
1296     });
1297   }
1298 
1299   /**
1300    * @param info
1301    * @param logKey
1302    * @param logEdit
1303    * @return true if default behavior should be bypassed, false otherwise
1304    * @throws IOException
1305    */
1306   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1307       final WALEdit logEdit) throws IOException {
1308     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1309       @Override
1310       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1311           throws IOException {
1312         oserver.preWALRestore(ctx, info, logKey, logEdit);
1313       }
1314     });
1315   }
1316 
1317   /**
1318    * @param info
1319    * @param logKey
1320    * @param logEdit
1321    * @throws IOException
1322    */
1323   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1324       throws IOException {
1325     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1326       @Override
1327       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1328           throws IOException {
1329         oserver.postWALRestore(ctx, info, logKey, logEdit);
1330       }
1331     });
1332   }
1333 
1334   /**
1335    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1336    * @return true if the default operation should be bypassed
1337    * @throws IOException
1338    */
1339   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1340     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1341       @Override
1342       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1343           throws IOException {
1344         oserver.preBulkLoadHFile(ctx, familyPaths);
1345       }
1346     });
1347   }
1348 
1349   /**
1350    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1351    * @param hasLoaded whether load was successful or not
1352    * @return the possibly modified value of hasLoaded
1353    * @throws IOException
1354    */
1355   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1356       boolean hasLoaded) throws IOException {
1357     return execOperationWithResult(hasLoaded,
1358         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1359       @Override
1360       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1361           throws IOException {
1362         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, getResult()));
1363       }
1364     });
1365   }
1366 
1367   public void postStartRegionOperation(final Operation op) throws IOException {
1368     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1369       @Override
1370       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1371           throws IOException {
1372         oserver.postStartRegionOperation(ctx, op);
1373       }
1374     });
1375   }
1376 
1377   public void postCloseRegionOperation(final Operation op) throws IOException {
1378     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1379       @Override
1380       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1381           throws IOException {
1382         oserver.postCloseRegionOperation(ctx, op);
1383       }
1384     });
1385   }
1386 
1387   /**
1388    * @param fs fileystem to read from
1389    * @param p path to the file
1390    * @param in {@link FSDataInputStreamWrapper}
1391    * @param size Full size of the file
1392    * @param cacheConf
1393    * @param r original reference file. This will be not null only when reading a split file.
1394    * @return a Reader instance to use instead of the base reader if overriding
1395    * default behavior, null otherwise
1396    * @throws IOException
1397    */
1398   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1399       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1400       final Reference r) throws IOException {
1401     return execOperationWithResult(null,
1402         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1403       @Override
1404       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1405           throws IOException {
1406         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1407       }
1408     });
1409   }
1410 
1411   /**
1412    * @param fs fileystem to read from
1413    * @param p path to the file
1414    * @param in {@link FSDataInputStreamWrapper}
1415    * @param size Full size of the file
1416    * @param cacheConf
1417    * @param r original reference file. This will be not null only when reading a split file.
1418    * @param reader the base reader instance
1419    * @return The reader to use
1420    * @throws IOException
1421    */
1422   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1423       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1424       final Reference r, final StoreFile.Reader reader) throws IOException {
1425     return execOperationWithResult(reader,
1426         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1427       @Override
1428       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1429           throws IOException {
1430         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1431       }
1432     });
1433   }
1434 
1435   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1436       final Cell oldCell, Cell newCell) throws IOException {
1437     return execOperationWithResult(newCell,
1438         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1439       @Override
1440       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1441           throws IOException {
1442         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1443       }
1444     });
1445   }
1446 
1447   public Message preEndpointInvocation(final Service service, final String methodName,
1448       Message request) throws IOException {
1449     return execOperationWithResult(request,
1450         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1451       @Override
1452       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1453           throws IOException {
1454         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1455       }
1456     });
1457   }
1458 
1459   public void postEndpointInvocation(final Service service, final String methodName,
1460       final Message request, final Message.Builder responseBuilder) throws IOException {
1461     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1462       @Override
1463       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1464           throws IOException {
1465         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1466       }
1467     });
1468   }
1469 
1470   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1471     return execOperationWithResult(tracker,
1472         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1473       @Override
1474       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1475           throws IOException {
1476         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1477       }
1478     });
1479   }
1480 
1481   public Map<String, DescriptiveStatistics> getCoprocessorExecutionStatistics() {
1482     Map<String, DescriptiveStatistics> results = new HashMap<String, DescriptiveStatistics>();
1483     for (RegionEnvironment env : coprocessors) {
1484       DescriptiveStatistics ds = new DescriptiveStatistics();
1485       if (env.getInstance() instanceof RegionObserver) {
1486         for (Long time : env.getExecutionLatenciesNanos()) {
1487           ds.addValue(time);
1488         }
1489         // Ensures that web ui circumvents the display of NaN values when there are zero samples.
1490         if (ds.getN() == 0) {
1491           ds.addValue(0);
1492         }
1493         results.put(env.getInstance().getClass().getSimpleName(), ds);
1494       }
1495     }
1496     return results;
1497   }
1498 
1499   private static abstract class CoprocessorOperation
1500       extends ObserverContext<RegionCoprocessorEnvironment> {
1501     public abstract void call(Coprocessor observer,
1502         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1503     public abstract boolean hasCall(Coprocessor observer);
1504     public void postEnvCall(RegionEnvironment env) { }
1505   }
1506 
1507   private static abstract class RegionOperation extends CoprocessorOperation {
1508     public abstract void call(RegionObserver observer,
1509         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1510 
1511     public boolean hasCall(Coprocessor observer) {
1512       return observer instanceof RegionObserver;
1513     }
1514 
1515     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1516         throws IOException {
1517       call((RegionObserver)observer, ctx);
1518     }
1519   }
1520 
1521   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1522     private T result = null;
1523     public void setResult(final T result) { this.result = result; }
1524     public T getResult() { return this.result; }
1525   }
1526 
1527   private static abstract class EndpointOperation extends CoprocessorOperation {
1528     public abstract void call(EndpointObserver observer,
1529         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1530 
1531     public boolean hasCall(Coprocessor observer) {
1532       return observer instanceof EndpointObserver;
1533     }
1534 
1535     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1536         throws IOException {
1537       call((EndpointObserver)observer, ctx);
1538     }
1539   }
1540 
1541   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1542     private T result = null;
1543     public void setResult(final T result) { this.result = result; }
1544     public T getResult() { return this.result; }
1545   }
1546 
1547   private boolean execOperation(final CoprocessorOperation ctx)
1548       throws IOException {
1549     return execOperation(true, ctx);
1550   }
1551 
1552   private <T> T execOperationWithResult(final T defaultValue,
1553       final RegionOperationWithResult<T> ctx) throws IOException {
1554     if (ctx == null) return defaultValue;
1555     ctx.setResult(defaultValue);
1556     execOperation(true, ctx);
1557     return ctx.getResult();
1558   }
1559 
1560   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1561       final RegionOperationWithResult<T> ctx) throws IOException {
1562     boolean bypass = false;
1563     T result = defaultValue;
1564     if (ctx != null) {
1565       ctx.setResult(defaultValue);
1566       bypass = execOperation(true, ctx);
1567       result = ctx.getResult();
1568     }
1569     return bypass == ifBypass ? result : null;
1570   }
1571 
1572   private <T> T execOperationWithResult(final T defaultValue,
1573       final EndpointOperationWithResult<T> ctx) throws IOException {
1574     if (ctx == null) return defaultValue;
1575     ctx.setResult(defaultValue);
1576     execOperation(true, ctx);
1577     return ctx.getResult();
1578   }
1579 
1580   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1581       throws IOException {
1582     boolean bypass = false;
1583     for (RegionEnvironment env: coprocessors) {
1584       Coprocessor observer = env.getInstance();
1585       if (ctx.hasCall(observer)) {
1586         long startTime = System.nanoTime();
1587         ctx.prepare(env);
1588         Thread currentThread = Thread.currentThread();
1589         ClassLoader cl = currentThread.getContextClassLoader();
1590         try {
1591           currentThread.setContextClassLoader(env.getClassLoader());
1592           ctx.call(observer, ctx);
1593         } catch (Throwable e) {
1594           handleCoprocessorThrowable(env, e);
1595         } finally {
1596           currentThread.setContextClassLoader(cl);
1597         }
1598         env.offerExecutionLatency(System.nanoTime() - startTime);
1599         bypass |= ctx.shouldBypass();
1600         if (earlyExit && ctx.shouldComplete()) {
1601           break;
1602         }
1603       }
1604 
1605       ctx.postEnvCall(env);
1606     }
1607     return bypass;
1608   }
1609 }