001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.UUID;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.RawCellBuilder;
037import org.apache.hadoop.hbase.RawCellBuilderFactory;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.client.Append;
040import org.apache.hadoop.hbase.client.CheckAndMutate;
041import org.apache.hadoop.hbase.client.CheckAndMutateResult;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.SharedConnection;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
055import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
056import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
058import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
059import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
060import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
061import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.ObserverContext;
063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
065import org.apache.hadoop.hbase.coprocessor.RegionObserver;
066import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
067import org.apache.hadoop.hbase.io.Reference;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.metrics.MetricRegistry;
070import org.apache.hadoop.hbase.regionserver.Region.Operation;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
073import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
074import org.apache.hadoop.hbase.security.User;
075import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
076import org.apache.hadoop.hbase.util.Pair;
077import org.apache.hadoop.hbase.wal.WALEdit;
078import org.apache.hadoop.hbase.wal.WALKey;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082import org.apache.hbase.thirdparty.com.google.protobuf.Message;
083import org.apache.hbase.thirdparty.com.google.protobuf.Service;
084import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
085import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
086
087/**
088 * Implements the coprocessor environment and runtime support for coprocessors
089 * loaded within a {@link Region}.
090 */
091@InterfaceAudience.Private
092public class RegionCoprocessorHost
093    extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
094
095  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
096  // The shared data map
097  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
098      new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
099          AbstractReferenceMap.ReferenceStrength.WEAK);
100
101  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
102  private final boolean hasCustomPostScannerFilterRow;
103
104  /*
105   * Whether any configured CPs override postScannerFilterRow hook
106   */
107  public boolean hasCustomPostScannerFilterRow() {
108    return hasCustomPostScannerFilterRow;
109  }
110
111  /**
112   *
113   * Encapsulation of the environment of each coprocessor
114   */
115  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
116      implements RegionCoprocessorEnvironment {
117    private Region region;
118    ConcurrentMap<String, Object> sharedData;
119    private final MetricRegistry metricRegistry;
120    private final RegionServerServices services;
121
122    /**
123     * Constructor
124     * @param impl the coprocessor instance
125     * @param priority chaining priority
126     */
127    public RegionEnvironment(final RegionCoprocessor impl, final int priority,
128        final int seq, final Configuration conf, final Region region,
129        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
130      super(impl, priority, seq, conf);
131      this.region = region;
132      this.sharedData = sharedData;
133      this.services = services;
134      this.metricRegistry =
135          MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
136    }
137
138    /** @return the region */
139    @Override
140    public Region getRegion() {
141      return region;
142    }
143
144    @Override
145    public OnlineRegions getOnlineRegions() {
146      return this.services;
147    }
148
149    @Override
150    public Connection getConnection() {
151      // Mocks may have services as null at test time.
152      return services != null ? new SharedConnection(services.getConnection()) : null;
153    }
154
155    @Override
156    public Connection createConnection(Configuration conf) throws IOException {
157      return services != null ? this.services.createConnection(conf) : null;
158    }
159
160    @Override
161    public ServerName getServerName() {
162      return services != null? services.getServerName(): null;
163    }
164
165    @Override
166    public void shutdown() {
167      super.shutdown();
168      MetricsCoprocessor.removeRegistry(this.metricRegistry);
169    }
170
171    @Override
172    public ConcurrentMap<String, Object> getSharedData() {
173      return sharedData;
174    }
175
176    @Override
177    public RegionInfo getRegionInfo() {
178      return region.getRegionInfo();
179    }
180
181    @Override
182    public MetricRegistry getMetricRegistryForRegionServer() {
183      return metricRegistry;
184    }
185
186    @Override
187    public RawCellBuilder getCellBuilder() {
188      // We always do a DEEP_COPY only
189      return RawCellBuilderFactory.create();
190    }
191  }
192
193  /**
194   * Special version of RegionEnvironment that exposes RegionServerServices for Core
195   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
196   */
197  private static class RegionEnvironmentForCoreCoprocessors extends
198      RegionEnvironment implements HasRegionServerServices {
199    private final RegionServerServices rsServices;
200
201    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
202      final int seq, final Configuration conf, final Region region,
203      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
204      super(impl, priority, seq, conf, region, services, sharedData);
205      this.rsServices = services;
206    }
207
208    /**
209     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
210     * consumption.
211     */
212    @Override
213    public RegionServerServices getRegionServerServices() {
214      return this.rsServices;
215    }
216  }
217
218  static class TableCoprocessorAttribute {
219    private Path path;
220    private String className;
221    private int priority;
222    private Configuration conf;
223
224    public TableCoprocessorAttribute(Path path, String className, int priority,
225        Configuration conf) {
226      this.path = path;
227      this.className = className;
228      this.priority = priority;
229      this.conf = conf;
230    }
231
232    public Path getPath() {
233      return path;
234    }
235
236    public String getClassName() {
237      return className;
238    }
239
240    public int getPriority() {
241      return priority;
242    }
243
244    public Configuration getConf() {
245      return conf;
246    }
247  }
248
249  /** The region server services */
250  RegionServerServices rsServices;
251  /** The region */
252  HRegion region;
253
254  /**
255   * Constructor
256   * @param region the region
257   * @param rsServices interface to available region server functionality
258   * @param conf the configuration
259   */
260  public RegionCoprocessorHost(final HRegion region,
261      final RegionServerServices rsServices, final Configuration conf) {
262    super(rsServices);
263    this.conf = conf;
264    this.rsServices = rsServices;
265    this.region = region;
266    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
267
268    // load system default cp's from configuration.
269    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
270
271    // load system default cp's for user tables from configuration.
272    if (!region.getRegionInfo().getTable().isSystemTable()) {
273      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
274    }
275
276    // load Coprocessor From HDFS
277    loadTableCoprocessors(conf);
278
279    // now check whether any coprocessor implements postScannerFilterRow
280    boolean hasCustomPostScannerFilterRow = false;
281    out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
282      if (env.getInstance() instanceof RegionObserver) {
283        Class<?> clazz = env.getInstance().getClass();
284        for (;;) {
285          if (clazz == Object.class) {
286            // we dont need to look postScannerFilterRow into Object class
287            break; // break the inner loop
288          }
289          try {
290            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
291              InternalScanner.class, Cell.class, boolean.class);
292            // this coprocessor has a custom version of postScannerFilterRow
293            hasCustomPostScannerFilterRow = true;
294            break out;
295          } catch (NoSuchMethodException ignore) {
296          }
297          // the deprecated signature still exists
298          try {
299            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
300              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
301            // this coprocessor has a custom version of postScannerFilterRow
302            hasCustomPostScannerFilterRow = true;
303            break out;
304          } catch (NoSuchMethodException ignore) {
305          }
306          clazz = clazz.getSuperclass();
307        }
308      }
309    }
310    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
311  }
312
313  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
314      TableDescriptor htd) {
315    return htd.getCoprocessorDescriptors().stream().map(cp -> {
316      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
317      Configuration ourConf;
318      if (!cp.getProperties().isEmpty()) {
319        // do an explicit deep copy of the passed configuration
320        ourConf = new Configuration(false);
321        HBaseConfiguration.merge(ourConf, conf);
322        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
323      } else {
324        ourConf = conf;
325      }
326      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
327    }).collect(Collectors.toList());
328  }
329
330  /**
331   * Sanity check the table coprocessor attributes of the supplied schema. Will
332   * throw an exception if there is a problem.
333   * @param conf
334   * @param htd
335   * @throws IOException
336   */
337  public static void testTableCoprocessorAttrs(final Configuration conf,
338      final TableDescriptor htd) throws IOException {
339    String pathPrefix = UUID.randomUUID().toString();
340    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
341      if (attr.getPriority() < 0) {
342        throw new IOException("Priority for coprocessor " + attr.getClassName() +
343          " cannot be less than 0");
344      }
345      ClassLoader old = Thread.currentThread().getContextClassLoader();
346      try {
347        ClassLoader cl;
348        if (attr.getPath() != null) {
349          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
350            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
351        } else {
352          cl = CoprocessorHost.class.getClassLoader();
353        }
354        Thread.currentThread().setContextClassLoader(cl);
355        if (cl instanceof CoprocessorClassLoader) {
356          String[] includedClassPrefixes = null;
357          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
358            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
359            includedClassPrefixes = prefixes.split(";");
360          }
361          ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes);
362        } else {
363          cl.loadClass(attr.getClassName());
364        }
365      } catch (ClassNotFoundException e) {
366        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
367      } finally {
368        Thread.currentThread().setContextClassLoader(old);
369      }
370    }
371  }
372
373  void loadTableCoprocessors(final Configuration conf) {
374    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
375      DEFAULT_COPROCESSORS_ENABLED);
376    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
377      DEFAULT_USER_COPROCESSORS_ENABLED);
378    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
379      return;
380    }
381
382    // scan the table attributes for coprocessor load specifications
383    // initialize the coprocessors
384    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
385    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
386        region.getTableDescriptor())) {
387      // Load encompasses classloading and coprocessor initialization
388      try {
389        RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
390            attr.getPriority(), attr.getConf());
391        if (env == null) {
392          continue;
393        }
394        configured.add(env);
395        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
396            region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
397      } catch (Throwable t) {
398        // Coprocessor failed to load, do we abort on error?
399        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
400          abortServer(attr.getClassName(), t);
401        } else {
402          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
403        }
404      }
405    }
406    // add together to coprocessor set for COW efficiency
407    coprocEnvironments.addAll(configured);
408  }
409
410  @Override
411  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
412      Configuration conf) {
413    // If coprocessor exposes any services, register them.
414    for (Service service : instance.getServices()) {
415      region.registerService(service);
416    }
417    ConcurrentMap<String, Object> classData;
418    // make sure only one thread can add maps
419    synchronized (SHARED_DATA_MAP) {
420      // as long as at least one RegionEnvironment holds on to its classData it will
421      // remain in this map
422      classData =
423          SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
424              k -> new ConcurrentHashMap<>());
425    }
426    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
427    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
428        new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
429            rsServices, classData):
430        new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
431  }
432
433  @Override
434  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
435      throws InstantiationException, IllegalAccessException {
436    try {
437      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
438        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
439      } else {
440        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
441            implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
442        return null;
443      }
444    } catch (NoSuchMethodException | InvocationTargetException e) {
445      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
446    }
447  }
448
449  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
450      RegionCoprocessor::getRegionObserver;
451
452  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
453      RegionCoprocessor::getEndpointObserver;
454
455  abstract class RegionObserverOperationWithoutResult extends
456      ObserverOperationWithoutResult<RegionObserver> {
457    public RegionObserverOperationWithoutResult() {
458      super(regionObserverGetter);
459    }
460
461    public RegionObserverOperationWithoutResult(User user) {
462      super(regionObserverGetter, user);
463    }
464
465    public RegionObserverOperationWithoutResult(boolean bypassable) {
466      super(regionObserverGetter, null, bypassable);
467    }
468
469    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
470      super(regionObserverGetter, user, bypassable);
471    }
472  }
473
474  abstract class BulkLoadObserverOperation extends
475      ObserverOperationWithoutResult<BulkLoadObserver> {
476    public BulkLoadObserverOperation(User user) {
477      super(RegionCoprocessor::getBulkLoadObserver, user);
478    }
479  }
480
481
482  //////////////////////////////////////////////////////////////////////////////////////////////////
483  // Observer operations
484  //////////////////////////////////////////////////////////////////////////////////////////////////
485
486  //////////////////////////////////////////////////////////////////////////////////////////////////
487  // Observer operations
488  //////////////////////////////////////////////////////////////////////////////////////////////////
489
490  /**
491   * Invoked before a region open.
492   *
493   * @throws IOException Signals that an I/O exception has occurred.
494   */
495  public void preOpen() throws IOException {
496    if (coprocEnvironments.isEmpty()) {
497      return;
498    }
499    execOperation(new RegionObserverOperationWithoutResult() {
500      @Override
501      public void call(RegionObserver observer) throws IOException {
502        observer.preOpen(this);
503      }
504    });
505  }
506
507
508  /**
509   * Invoked after a region open
510   */
511  public void postOpen() {
512    if (coprocEnvironments.isEmpty()) {
513      return;
514    }
515    try {
516      execOperation(new RegionObserverOperationWithoutResult() {
517        @Override
518        public void call(RegionObserver observer) throws IOException {
519          observer.postOpen(this);
520        }
521      });
522    } catch (IOException e) {
523      LOG.warn(e.toString(), e);
524    }
525  }
526
527  /**
528   * Invoked before a region is closed
529   * @param abortRequested true if the server is aborting
530   */
531  public void preClose(final boolean abortRequested) throws IOException {
532    execOperation(new RegionObserverOperationWithoutResult() {
533      @Override
534      public void call(RegionObserver observer) throws IOException {
535        observer.preClose(this, abortRequested);
536      }
537    });
538  }
539
540  /**
541   * Invoked after a region is closed
542   * @param abortRequested true if the server is aborting
543   */
544  public void postClose(final boolean abortRequested) {
545    try {
546      execOperation(new RegionObserverOperationWithoutResult() {
547        @Override
548        public void call(RegionObserver observer) throws IOException {
549          observer.postClose(this, abortRequested);
550        }
551
552        @Override
553        public void postEnvCall() {
554          shutdown(this.getEnvironment());
555        }
556      });
557    } catch (IOException e) {
558      LOG.warn(e.toString(), e);
559    }
560  }
561
562  /**
563   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
564   * available candidates.
565   * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
566   * the passed in <code>candidates</code>.
567   * @param store The store where compaction is being requested
568   * @param candidates The currently available store files
569   * @param tracker used to track the life cycle of a compaction
570   * @param user the user
571   * @throws IOException
572   */
573  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
574      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
575    if (coprocEnvironments.isEmpty()) {
576      return false;
577    }
578    boolean bypassable = true;
579    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
580      @Override
581      public void call(RegionObserver observer) throws IOException {
582        observer.preCompactSelection(this, store, candidates, tracker);
583      }
584    });
585  }
586
587  /**
588   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
589   * candidates.
590   * @param store The store where compaction is being requested
591   * @param selected The store files selected to compact
592   * @param tracker used to track the life cycle of a compaction
593   * @param request the compaction request
594   * @param user the user
595   */
596  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
597      final CompactionLifeCycleTracker tracker, final CompactionRequest request,
598      final User user) throws IOException {
599    if (coprocEnvironments.isEmpty()) {
600      return;
601    }
602    execOperation(new RegionObserverOperationWithoutResult(user) {
603      @Override
604      public void call(RegionObserver observer) throws IOException {
605        observer.postCompactSelection(this, store, selected, tracker, request);
606      }
607    });
608  }
609
610  /**
611   * Called prior to opening store scanner for compaction.
612   */
613  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
614      CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
615    if (coprocEnvironments.isEmpty()) {
616      return store.getScanInfo();
617    }
618    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
619    execOperation(new RegionObserverOperationWithoutResult(user) {
620      @Override
621      public void call(RegionObserver observer) throws IOException {
622        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
623      }
624    });
625    return builder.build();
626  }
627
628  /**
629   * Called prior to rewriting the store files selected for compaction
630   * @param store the store being compacted
631   * @param scanner the scanner used to read store data during compaction
632   * @param scanType type of Scan
633   * @param tracker used to track the life cycle of a compaction
634   * @param request the compaction request
635   * @param user the user
636   * @return Scanner to use (cannot be null!)
637   * @throws IOException
638   */
639  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
640      final ScanType scanType, final CompactionLifeCycleTracker tracker,
641      final CompactionRequest request, final User user) throws IOException {
642    InternalScanner defaultResult = scanner;
643    if (coprocEnvironments.isEmpty()) {
644      return defaultResult;
645    }
646    return execOperationWithResult(
647        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter,
648            defaultResult, user) {
649          @Override
650          public InternalScanner call(RegionObserver observer) throws IOException {
651            InternalScanner scanner =
652                observer.preCompact(this, store, getResult(), scanType, tracker, request);
653            if (scanner == null) {
654              throw new CoprocessorException("Null Scanner return disallowed!");
655            }
656            return scanner;
657          }
658        });
659  }
660
661  /**
662   * Called after the store compaction has completed.
663   * @param store the store being compacted
664   * @param resultFile the new store file written during compaction
665   * @param tracker used to track the life cycle of a compaction
666   * @param request the compaction request
667   * @param user the user
668   * @throws IOException
669   */
670  public void postCompact(final HStore store, final HStoreFile resultFile,
671      final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
672      throws IOException {
673    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) {
674      @Override
675      public void call(RegionObserver observer) throws IOException {
676        observer.postCompact(this, store, resultFile, tracker, request);
677      }
678    });
679  }
680
681  /**
682   * Invoked before create StoreScanner for flush.
683   */
684  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
685      throws IOException {
686    if (coprocEnvironments.isEmpty()) {
687      return store.getScanInfo();
688    }
689    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
690    execOperation(new RegionObserverOperationWithoutResult() {
691      @Override
692      public void call(RegionObserver observer) throws IOException {
693        observer.preFlushScannerOpen(this, store, builder, tracker);
694      }
695    });
696    return builder.build();
697  }
698
699  /**
700   * Invoked before a memstore flush
701   * @return Scanner to use (cannot be null!)
702   * @throws IOException
703   */
704  public InternalScanner preFlush(HStore store, InternalScanner scanner,
705      FlushLifeCycleTracker tracker) throws IOException {
706    if (coprocEnvironments.isEmpty()) {
707      return scanner;
708    }
709    return execOperationWithResult(
710        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
711          @Override
712          public InternalScanner call(RegionObserver observer) throws IOException {
713            InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
714            if (scanner == null) {
715              throw new CoprocessorException("Null Scanner return disallowed!");
716            }
717            return scanner;
718          }
719        });
720  }
721
722  /**
723   * Invoked before a memstore flush
724   * @throws IOException
725   */
726  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
727    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
728      @Override
729      public void call(RegionObserver observer) throws IOException {
730        observer.preFlush(this, tracker);
731      }
732    });
733  }
734
735  /**
736   * Invoked after a memstore flush
737   * @throws IOException
738   */
739  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
740    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
741      @Override
742      public void call(RegionObserver observer) throws IOException {
743        observer.postFlush(this, tracker);
744      }
745    });
746  }
747
748  /**
749   * Invoked before in memory compaction.
750   */
751  public void preMemStoreCompaction(HStore store) throws IOException {
752    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
753      @Override
754      public void call(RegionObserver observer) throws IOException {
755        observer.preMemStoreCompaction(this, store);
756      }
757    });
758  }
759
760  /**
761   * Invoked before create StoreScanner for in memory compaction.
762   */
763  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
764    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
765    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
766      @Override
767      public void call(RegionObserver observer) throws IOException {
768        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
769      }
770    });
771    return builder.build();
772  }
773
774  /**
775   * Invoked before compacting memstore.
776   */
777  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
778      throws IOException {
779    if (coprocEnvironments.isEmpty()) {
780      return scanner;
781    }
782    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
783        regionObserverGetter, scanner) {
784      @Override
785      public InternalScanner call(RegionObserver observer) throws IOException {
786        return observer.preMemStoreCompactionCompact(this, store, getResult());
787      }
788    });
789  }
790
791  /**
792   * Invoked after in memory compaction.
793   */
794  public void postMemStoreCompaction(HStore store) throws IOException {
795    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
796      @Override
797      public void call(RegionObserver observer) throws IOException {
798        observer.postMemStoreCompaction(this, store);
799      }
800    });
801  }
802
803  /**
804   * Invoked after a memstore flush
805   * @throws IOException
806   */
807  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
808      throws IOException {
809    if (coprocEnvironments.isEmpty()) {
810      return;
811    }
812    execOperation(new RegionObserverOperationWithoutResult() {
813      @Override
814      public void call(RegionObserver observer) throws IOException {
815        observer.postFlush(this, store, storeFile, tracker);
816      }
817    });
818  }
819
820  // RegionObserver support
821  /**
822   * Supports Coprocessor 'bypass'.
823   * @param get the Get request
824   * @param results What to return if return is true/'bypass'.
825   * @return true if default processing should be bypassed.
826   * @exception IOException Exception
827   */
828  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
829    if (coprocEnvironments.isEmpty()) {
830      return false;
831    }
832    boolean bypassable = true;
833    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
834      @Override
835      public void call(RegionObserver observer) throws IOException {
836        observer.preGetOp(this, get, results);
837      }
838    });
839  }
840
841  /**
842   * @param get the Get request
843   * @param results the result set
844   * @exception IOException Exception
845   */
846  public void postGet(final Get get, final List<Cell> results)
847      throws IOException {
848    if (coprocEnvironments.isEmpty()) {
849      return;
850    }
851    execOperation(new RegionObserverOperationWithoutResult() {
852      @Override
853      public void call(RegionObserver observer) throws IOException {
854        observer.postGetOp(this, get, results);
855      }
856    });
857  }
858
859  /**
860   * Supports Coprocessor 'bypass'.
861   * @param get the Get request
862   * @return true or false to return to client if bypassing normal operation, or null otherwise
863   * @exception IOException Exception
864   */
865  public Boolean preExists(final Get get) throws IOException {
866    boolean bypassable = true;
867    boolean defaultResult = false;
868    if (coprocEnvironments.isEmpty()) {
869      return null;
870    }
871    return execOperationWithResult(
872        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
873            defaultResult, bypassable) {
874          @Override
875          public Boolean call(RegionObserver observer) throws IOException {
876            return observer.preExists(this, get, getResult());
877          }
878        });
879  }
880
881  /**
882   * @param get the Get request
883   * @param result the result returned by the region server
884   * @return the result to return to the client
885   * @exception IOException Exception
886   */
887  public boolean postExists(final Get get, boolean result)
888      throws IOException {
889    if (this.coprocEnvironments.isEmpty()) {
890      return result;
891    }
892    return execOperationWithResult(
893        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
894          @Override
895          public Boolean call(RegionObserver observer) throws IOException {
896            return observer.postExists(this, get, getResult());
897          }
898        });
899  }
900
901  /**
902   * Supports Coprocessor 'bypass'.
903   * @param put The Put object
904   * @param edit The WALEdit object.
905   * @param durability The durability used
906   * @return true if default processing should be bypassed
907   * @exception IOException Exception
908   */
909  public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
910      throws IOException {
911    if (coprocEnvironments.isEmpty()) {
912      return false;
913    }
914    boolean bypassable = true;
915    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
916      @Override
917      public void call(RegionObserver observer) throws IOException {
918        observer.prePut(this, put, edit, durability);
919      }
920    });
921  }
922
923  /**
924   * Supports Coprocessor 'bypass'.
925   * @param mutation - the current mutation
926   * @param kv - the current cell
927   * @param byteNow - current timestamp in bytes
928   * @param get - the get that could be used
929   * Note that the get only does not specify the family and qualifier that should be used
930   * @return true if default processing should be bypassed
931   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
932   * Coprocessor for its needs only. Will be removed.
933   */
934  @Deprecated
935  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
936      final Cell kv, final byte[] byteNow, final Get get) throws IOException {
937    if (coprocEnvironments.isEmpty()) {
938      return false;
939    }
940    boolean bypassable = true;
941    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
942      @Override
943      public void call(RegionObserver observer) throws IOException {
944          observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
945      }
946    });
947  }
948
949  /**
950   * @param put The Put object
951   * @param edit The WALEdit object.
952   * @param durability The durability used
953   * @exception IOException Exception
954   */
955  public void postPut(final Put put, final WALEdit edit, final Durability durability)
956      throws IOException {
957    if (coprocEnvironments.isEmpty()) {
958      return;
959    }
960    execOperation(new RegionObserverOperationWithoutResult() {
961      @Override
962      public void call(RegionObserver observer) throws IOException {
963        observer.postPut(this, put, edit, durability);
964      }
965    });
966  }
967
968  /**
969   * Supports Coprocessor 'bypass'.
970   * @param delete The Delete object
971   * @param edit The WALEdit object.
972   * @param durability The durability used
973   * @return true if default processing should be bypassed
974   * @exception IOException Exception
975   */
976  public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
977      throws IOException {
978    if (this.coprocEnvironments.isEmpty()) {
979      return false;
980    }
981    boolean bypassable = true;
982    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
983      @Override
984      public void call(RegionObserver observer) throws IOException {
985         observer.preDelete(this, delete, edit, durability);
986      }
987    });
988  }
989
990  /**
991   * @param delete The Delete object
992   * @param edit The WALEdit object.
993   * @param durability The durability used
994   * @exception IOException Exception
995   */
996  public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
997      throws IOException {
998    execOperation(coprocEnvironments.isEmpty()? null:
999        new RegionObserverOperationWithoutResult() {
1000      @Override
1001      public void call(RegionObserver observer) throws IOException {
1002        observer.postDelete(this, delete, edit, durability);
1003      }
1004    });
1005  }
1006
1007  public void preBatchMutate(
1008      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1009    if(this.coprocEnvironments.isEmpty()) {
1010      return;
1011    }
1012    execOperation(new RegionObserverOperationWithoutResult() {
1013      @Override
1014      public void call(RegionObserver observer) throws IOException {
1015        observer.preBatchMutate(this, miniBatchOp);
1016      }
1017    });
1018  }
1019
1020  public void postBatchMutate(
1021      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1022    if (this.coprocEnvironments.isEmpty()) {
1023      return;
1024    }
1025    execOperation(new RegionObserverOperationWithoutResult() {
1026      @Override
1027      public void call(RegionObserver observer) throws IOException {
1028        observer.postBatchMutate(this, miniBatchOp);
1029      }
1030    });
1031  }
1032
1033  public void postBatchMutateIndispensably(
1034      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1035      throws IOException {
1036    if (this.coprocEnvironments.isEmpty()) {
1037      return;
1038    }
1039    execOperation(new RegionObserverOperationWithoutResult() {
1040      @Override
1041      public void call(RegionObserver observer) throws IOException {
1042        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1043      }
1044    });
1045  }
1046
1047  /**
1048   * Supports Coprocessor 'bypass'.
1049   * @param checkAndMutate the CheckAndMutate object
1050   * @return true or false to return to client if default processing should be bypassed, or null
1051   *   otherwise
1052   * @throws IOException if an error occurred on the coprocessor
1053   */
1054  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
1055    throws IOException {
1056    boolean bypassable = true;
1057    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1058    if (coprocEnvironments.isEmpty()) {
1059      return null;
1060    }
1061    return execOperationWithResult(
1062      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1063        regionObserverGetter, defaultResult, bypassable) {
1064        @Override
1065        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1066          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1067        }
1068      });
1069  }
1070
1071  /**
1072   * Supports Coprocessor 'bypass'.
1073   * @param checkAndMutate the CheckAndMutate object
1074   * @return true or false to return to client if default processing should be bypassed, or null
1075   *   otherwise
1076   * @throws IOException if an error occurred on the coprocessor
1077   */
1078  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1079    throws IOException {
1080    boolean bypassable = true;
1081    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1082    if (coprocEnvironments.isEmpty()) {
1083      return null;
1084    }
1085    return execOperationWithResult(
1086      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1087        regionObserverGetter, defaultResult, bypassable) {
1088        @Override
1089        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1090          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1091        }
1092      });
1093  }
1094
1095  /**
1096   * @param checkAndMutate the CheckAndMutate object
1097   * @param result the result returned by the checkAndMutate
1098   * @return true or false to return to client if default processing should be bypassed, or null
1099   *   otherwise
1100   * @throws IOException if an error occurred on the coprocessor
1101   */
1102  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1103    CheckAndMutateResult result) throws IOException {
1104    if (this.coprocEnvironments.isEmpty()) {
1105      return result;
1106    }
1107    return execOperationWithResult(
1108      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1109        regionObserverGetter, result) {
1110        @Override
1111        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1112          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1113        }
1114      });
1115  }
1116
1117  /**
1118   * Supports Coprocessor 'bypass'.
1119   * @param append append object
1120   * @return result to return to client if default operation should be bypassed, null otherwise
1121   * @throws IOException if an error occurred on the coprocessor
1122   */
1123  public Result preAppend(final Append append) throws IOException {
1124    boolean bypassable = true;
1125    Result defaultResult = null;
1126    if (this.coprocEnvironments.isEmpty()) {
1127      return defaultResult;
1128    }
1129    return execOperationWithResult(
1130      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1131            bypassable) {
1132          @Override
1133          public Result call(RegionObserver observer) throws IOException {
1134            return observer.preAppend(this, append);
1135          }
1136        });
1137  }
1138
1139  /**
1140   * Supports Coprocessor 'bypass'.
1141   * @param append append object
1142   * @return result to return to client if default operation should be bypassed, null otherwise
1143   * @throws IOException if an error occurred on the coprocessor
1144   */
1145  public Result preAppendAfterRowLock(final Append append) throws IOException {
1146    boolean bypassable = true;
1147    Result defaultResult = null;
1148    if (this.coprocEnvironments.isEmpty()) {
1149      return defaultResult;
1150    }
1151    return execOperationWithResult(
1152        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter,
1153            defaultResult, bypassable) {
1154          @Override
1155          public Result call(RegionObserver observer) throws IOException {
1156            return observer.preAppendAfterRowLock(this, append);
1157          }
1158        });
1159  }
1160
1161  /**
1162   * Supports Coprocessor 'bypass'.
1163   * @param increment increment object
1164   * @return result to return to client if default operation should be bypassed, null otherwise
1165   * @throws IOException if an error occurred on the coprocessor
1166   */
1167  public Result preIncrement(final Increment increment) throws IOException {
1168    boolean bypassable = true;
1169    Result defaultResult = null;
1170    if (coprocEnvironments.isEmpty()) {
1171      return defaultResult;
1172    }
1173    return execOperationWithResult(
1174        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1175            bypassable) {
1176          @Override
1177          public Result call(RegionObserver observer) throws IOException {
1178            return observer.preIncrement(this, increment);
1179          }
1180        });
1181  }
1182
1183  /**
1184   * Supports Coprocessor 'bypass'.
1185   * @param increment increment object
1186   * @return result to return to client if default operation should be bypassed, null otherwise
1187   * @throws IOException if an error occurred on the coprocessor
1188   */
1189  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1190    boolean bypassable = true;
1191    Result defaultResult = null;
1192    if (coprocEnvironments.isEmpty()) {
1193      return defaultResult;
1194    }
1195    return execOperationWithResult(
1196        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1197            bypassable) {
1198          @Override
1199          public Result call(RegionObserver observer) throws IOException {
1200            return observer.preIncrementAfterRowLock(this, increment);
1201          }
1202        });
1203  }
1204
1205  /**
1206   * @param append Append object
1207   * @param result the result returned by the append
1208   * @throws IOException if an error occurred on the coprocessor
1209   */
1210  public Result postAppend(final Append append, final Result result) throws IOException {
1211    if (this.coprocEnvironments.isEmpty()) {
1212      return result;
1213    }
1214    return execOperationWithResult(
1215        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1216          @Override
1217          public Result call(RegionObserver observer) throws IOException {
1218            return observer.postAppend(this, append, result);
1219          }
1220        });
1221  }
1222
1223  /**
1224   * @param increment increment object
1225   * @param result the result returned by postIncrement
1226   * @throws IOException if an error occurred on the coprocessor
1227   */
1228  public Result postIncrement(final Increment increment, Result result) throws IOException {
1229    if (this.coprocEnvironments.isEmpty()) {
1230      return result;
1231    }
1232    return execOperationWithResult(
1233        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1234          @Override
1235          public Result call(RegionObserver observer) throws IOException {
1236            return observer.postIncrement(this, increment, getResult());
1237          }
1238        });
1239  }
1240
1241  /**
1242   * @param scan the Scan specification
1243   * @exception IOException Exception
1244   */
1245  public void preScannerOpen(final Scan scan) throws IOException {
1246    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1247      @Override
1248      public void call(RegionObserver observer) throws IOException {
1249        observer.preScannerOpen(this, scan);
1250      }
1251    });
1252  }
1253
1254  /**
1255   * @param scan the Scan specification
1256   * @param s the scanner
1257   * @return the scanner instance to use
1258   * @exception IOException Exception
1259   */
1260  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1261    if (this.coprocEnvironments.isEmpty()) {
1262      return s;
1263    }
1264    return execOperationWithResult(
1265        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1266          @Override
1267          public RegionScanner call(RegionObserver observer) throws IOException {
1268            return observer.postScannerOpen(this, scan, getResult());
1269          }
1270        });
1271  }
1272
1273  /**
1274   * @param s the scanner
1275   * @param results the result set returned by the region server
1276   * @param limit the maximum number of results to return
1277   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1278   * @exception IOException Exception
1279   */
1280  public Boolean preScannerNext(final InternalScanner s,
1281      final List<Result> results, final int limit) throws IOException {
1282    boolean bypassable = true;
1283    boolean defaultResult = false;
1284    if (coprocEnvironments.isEmpty()) {
1285      return null;
1286    }
1287    return execOperationWithResult(
1288        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1289            defaultResult, bypassable) {
1290          @Override
1291          public Boolean call(RegionObserver observer) throws IOException {
1292            return observer.preScannerNext(this, s, results, limit, getResult());
1293          }
1294        });
1295  }
1296
1297  /**
1298   * @param s the scanner
1299   * @param results the result set returned by the region server
1300   * @param limit the maximum number of results to return
1301   * @param hasMore
1302   * @return 'has more' indication to give to client
1303   * @exception IOException Exception
1304   */
1305  public boolean postScannerNext(final InternalScanner s,
1306      final List<Result> results, final int limit, boolean hasMore)
1307      throws IOException {
1308    if (this.coprocEnvironments.isEmpty()) {
1309      return hasMore;
1310    }
1311    return execOperationWithResult(
1312        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1313          @Override
1314          public Boolean call(RegionObserver observer) throws IOException {
1315            return observer.postScannerNext(this, s, results, limit, getResult());
1316          }
1317        });
1318  }
1319
1320  /**
1321   * This will be called by the scan flow when the current scanned row is being filtered out by the
1322   * filter.
1323   * @param s the scanner
1324   * @param curRowCell The cell in the current row which got filtered out
1325   * @return whether more rows are available for the scanner or not
1326   * @throws IOException
1327   */
1328  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1329      throws IOException {
1330    // short circuit for performance
1331    boolean defaultResult = true;
1332    if (!hasCustomPostScannerFilterRow) {
1333      return defaultResult;
1334    }
1335    if (this.coprocEnvironments.isEmpty()) {
1336      return defaultResult;
1337    }
1338    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1339        regionObserverGetter, defaultResult) {
1340      @Override
1341      public Boolean call(RegionObserver observer) throws IOException {
1342        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1343      }
1344    });
1345  }
1346
1347  /**
1348   * Supports Coprocessor 'bypass'.
1349   * @param s the scanner
1350   * @return true if default behavior should be bypassed, false otherwise
1351   * @exception IOException Exception
1352   */
1353  // Should this be bypassable?
1354  public boolean preScannerClose(final InternalScanner s) throws IOException {
1355    return execOperation(coprocEnvironments.isEmpty()? null:
1356        new RegionObserverOperationWithoutResult(true) {
1357      @Override
1358      public void call(RegionObserver observer) throws IOException {
1359        observer.preScannerClose(this, s);
1360      }
1361    });
1362  }
1363
1364  /**
1365   * @exception IOException Exception
1366   */
1367  public void postScannerClose(final InternalScanner s) throws IOException {
1368    execOperation(coprocEnvironments.isEmpty()? null:
1369        new RegionObserverOperationWithoutResult() {
1370      @Override
1371      public void call(RegionObserver observer) throws IOException {
1372        observer.postScannerClose(this, s);
1373      }
1374    });
1375  }
1376
1377  /**
1378   * Called before open store scanner for user scan.
1379   */
1380  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1381    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1382    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1383    execOperation(new RegionObserverOperationWithoutResult() {
1384      @Override
1385      public void call(RegionObserver observer) throws IOException {
1386        observer.preStoreScannerOpen(this, store, builder);
1387      }
1388    });
1389    return builder.build();
1390  }
1391
1392  /**
1393   * @param info the RegionInfo for this region
1394   * @param edits the file of recovered edits
1395   */
1396  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1397    execOperation(coprocEnvironments.isEmpty()? null:
1398        new RegionObserverOperationWithoutResult(true) {
1399      @Override
1400      public void call(RegionObserver observer) throws IOException {
1401        observer.preReplayWALs(this, info, edits);
1402      }
1403    });
1404  }
1405
1406  /**
1407   * @param info the RegionInfo for this region
1408   * @param edits the file of recovered edits
1409   * @throws IOException Exception
1410   */
1411  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1412    execOperation(coprocEnvironments.isEmpty()? null:
1413        new RegionObserverOperationWithoutResult() {
1414      @Override
1415      public void call(RegionObserver observer) throws IOException {
1416        observer.postReplayWALs(this, info, edits);
1417      }
1418    });
1419  }
1420
1421  /**
1422   * Supports Coprocessor 'bypass'.
1423   * @return true if default behavior should be bypassed, false otherwise
1424   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1425   * with something that doesn't expose IntefaceAudience.Private classes.
1426   */
1427  @Deprecated
1428  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1429      throws IOException {
1430    return execOperation(coprocEnvironments.isEmpty()? null:
1431        new RegionObserverOperationWithoutResult(true) {
1432      @Override
1433      public void call(RegionObserver observer) throws IOException {
1434        observer.preWALRestore(this, info, logKey, logEdit);
1435      }
1436    });
1437  }
1438
1439  /**
1440   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1441   * with something that doesn't expose IntefaceAudience.Private classes.
1442   */
1443  @Deprecated
1444  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1445      throws IOException {
1446    execOperation(coprocEnvironments.isEmpty()? null:
1447        new RegionObserverOperationWithoutResult() {
1448      @Override
1449      public void call(RegionObserver observer) throws IOException {
1450        observer.postWALRestore(this, info, logKey, logEdit);
1451      }
1452    });
1453  }
1454
1455  /**
1456   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1457   */
1458  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1459    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1460      @Override
1461      public void call(RegionObserver observer) throws IOException {
1462        observer.preBulkLoadHFile(this, familyPaths);
1463      }
1464    });
1465  }
1466
1467  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1468      throws IOException {
1469    return execOperation(coprocEnvironments.isEmpty()? null:
1470        new RegionObserverOperationWithoutResult() {
1471      @Override
1472      public void call(RegionObserver observer) throws IOException {
1473        observer.preCommitStoreFile(this, family, pairs);
1474      }
1475    });
1476  }
1477
1478  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
1479    execOperation(coprocEnvironments.isEmpty()? null:
1480        new RegionObserverOperationWithoutResult() {
1481      @Override
1482      public void call(RegionObserver observer) throws IOException {
1483        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1484      }
1485    });
1486  }
1487
1488  /**
1489   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1490   * @param map Map of CF to List of file paths for the final loaded files
1491   * @throws IOException
1492   */
1493  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1494      Map<byte[], List<Path>> map) throws IOException {
1495    if (this.coprocEnvironments.isEmpty()) {
1496      return;
1497    }
1498    execOperation(coprocEnvironments.isEmpty()? null:
1499        new RegionObserverOperationWithoutResult() {
1500          @Override
1501          public void call(RegionObserver observer) throws IOException {
1502            observer.postBulkLoadHFile(this, familyPaths, map);
1503          }
1504        });
1505  }
1506
1507  public void postStartRegionOperation(final Operation op) throws IOException {
1508    execOperation(coprocEnvironments.isEmpty()? null:
1509        new RegionObserverOperationWithoutResult() {
1510      @Override
1511      public void call(RegionObserver observer) throws IOException {
1512        observer.postStartRegionOperation(this, op);
1513      }
1514    });
1515  }
1516
1517  public void postCloseRegionOperation(final Operation op) throws IOException {
1518    execOperation(coprocEnvironments.isEmpty()? null:
1519        new RegionObserverOperationWithoutResult() {
1520      @Override
1521      public void call(RegionObserver observer) throws IOException {
1522        observer.postCloseRegionOperation(this, op);
1523      }
1524    });
1525  }
1526
1527  /**
1528   * @param fs fileystem to read from
1529   * @param p path to the file
1530   * @param in {@link FSDataInputStreamWrapper}
1531   * @param size Full size of the file
1532   * @param cacheConf
1533   * @param r original reference file. This will be not null only when reading a split file.
1534   * @return a Reader instance to use instead of the base reader if overriding
1535   * default behavior, null otherwise
1536   * @throws IOException
1537   */
1538  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1539      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1540      final Reference r) throws IOException {
1541    if (coprocEnvironments.isEmpty()) {
1542      return null;
1543    }
1544    return execOperationWithResult(
1545        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1546          @Override
1547          public StoreFileReader call(RegionObserver observer) throws IOException {
1548            return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1549                getResult());
1550          }
1551        });
1552  }
1553
1554  /**
1555   * @param fs fileystem to read from
1556   * @param p path to the file
1557   * @param in {@link FSDataInputStreamWrapper}
1558   * @param size Full size of the file
1559   * @param cacheConf
1560   * @param r original reference file. This will be not null only when reading a split file.
1561   * @param reader the base reader instance
1562   * @return The reader to use
1563   * @throws IOException
1564   */
1565  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1566      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1567      final Reference r, final StoreFileReader reader) throws IOException {
1568    if (this.coprocEnvironments.isEmpty()) {
1569      return reader;
1570    }
1571    return execOperationWithResult(
1572        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
1573          @Override
1574          public StoreFileReader call(RegionObserver observer) throws IOException {
1575            return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1576                getResult());
1577          }
1578        });
1579  }
1580
1581  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1582      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1583    if (this.coprocEnvironments.isEmpty()) {
1584      return cellPairs;
1585    }
1586    return execOperationWithResult(
1587        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1588            regionObserverGetter, cellPairs) {
1589          @Override
1590          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1591            return observer.postIncrementBeforeWAL(this, mutation, getResult());
1592          }
1593        });
1594  }
1595
1596  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1597      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1598    if (this.coprocEnvironments.isEmpty()) {
1599      return cellPairs;
1600    }
1601    return execOperationWithResult(
1602        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1603            regionObserverGetter, cellPairs) {
1604          @Override
1605          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1606            return observer.postAppendBeforeWAL(this, mutation, getResult());
1607          }
1608        });
1609  }
1610
1611  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1612    if (this.coprocEnvironments.isEmpty()){
1613      return;
1614    }
1615    execOperation(new RegionObserverOperationWithoutResult() {
1616      @Override
1617      public void call(RegionObserver observer) throws IOException {
1618        observer.preWALAppend(this, key, edit);
1619      }
1620    });
1621  }
1622
1623  public Message preEndpointInvocation(final Service service, final String methodName,
1624      Message request) throws IOException {
1625    if (coprocEnvironments.isEmpty()) {
1626      return request;
1627    }
1628    return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver,
1629        Message>(endpointObserverGetter, request) {
1630      @Override
1631      public Message call(EndpointObserver observer) throws IOException {
1632        return observer.preEndpointInvocation(this, service, methodName, getResult());
1633      }
1634    });
1635  }
1636
1637  public void postEndpointInvocation(final Service service, final String methodName,
1638      final Message request, final Message.Builder responseBuilder) throws IOException {
1639    execOperation(coprocEnvironments.isEmpty() ? null :
1640        new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1641          @Override
1642          public void call(EndpointObserver observer) throws IOException {
1643            observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1644          }
1645        });
1646  }
1647
1648  /**
1649   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1650   */
1651  @Deprecated
1652  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1653    if (this.coprocEnvironments.isEmpty()) {
1654      return result;
1655    }
1656    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>(
1657        regionObserverGetter, result) {
1658      @Override
1659      public DeleteTracker call(RegionObserver observer) throws IOException {
1660        return observer.postInstantiateDeleteTracker(this, getResult());
1661      }
1662    });
1663  }
1664
1665  /////////////////////////////////////////////////////////////////////////////////////////////////
1666  // BulkLoadObserver hooks
1667  /////////////////////////////////////////////////////////////////////////////////////////////////
1668  public void prePrepareBulkLoad(User user) throws IOException {
1669    execOperation(coprocEnvironments.isEmpty() ? null :
1670        new BulkLoadObserverOperation(user) {
1671          @Override protected void call(BulkLoadObserver observer) throws IOException {
1672            observer.prePrepareBulkLoad(this);
1673          }
1674        });
1675  }
1676
1677  public void preCleanupBulkLoad(User user) throws IOException {
1678    execOperation(coprocEnvironments.isEmpty() ? null :
1679        new BulkLoadObserverOperation(user) {
1680          @Override protected void call(BulkLoadObserver observer) throws IOException {
1681            observer.preCleanupBulkLoad(this);
1682          }
1683        });
1684  }
1685}