001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RawCellBuilder;
036import org.apache.hadoop.hbase.RawCellBuilderFactory;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.CheckAndMutate;
040import org.apache.hadoop.hbase.client.CheckAndMutateResult;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.SharedConnection;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
057import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
064import org.apache.hadoop.hbase.coprocessor.RegionObserver;
065import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
066import org.apache.hadoop.hbase.io.Reference;
067import org.apache.hadoop.hbase.io.hfile.CacheConfig;
068import org.apache.hadoop.hbase.metrics.MetricRegistry;
069import org.apache.hadoop.hbase.quotas.OperationQuota;
070import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
071import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
072import org.apache.hadoop.hbase.regionserver.Region.Operation;
073import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
074import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
075import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
076import org.apache.hadoop.hbase.security.User;
077import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALKey;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.com.google.protobuf.Message;
086import org.apache.hbase.thirdparty.com.google.protobuf.Service;
087import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
089
090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
092
093/**
094 * Implements the coprocessor environment and runtime support for coprocessors loaded within a
095 * {@link Region}.
096 */
097@InterfaceAudience.Private
098public class RegionCoprocessorHost
099  extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
100
101  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
102  // The shared data map
103  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
104    new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
105      AbstractReferenceMap.ReferenceStrength.WEAK);
106
107  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
108  private final boolean hasCustomPostScannerFilterRow;
109
110  /*
111   * Whether any configured CPs override postScannerFilterRow hook
112   */
113  public boolean hasCustomPostScannerFilterRow() {
114    return hasCustomPostScannerFilterRow;
115  }
116
117  /**
118   * Encapsulation of the environment of each coprocessor
119   */
120  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
121    implements RegionCoprocessorEnvironment {
122    private Region region;
123    ConcurrentMap<String, Object> sharedData;
124    private final MetricRegistry metricRegistry;
125    private final RegionServerServices services;
126    private final RpcQuotaManager rpcQuotaManager;
127
128    /**
129     * Constructor
130     * @param impl     the coprocessor instance
131     * @param priority chaining priority
132     */
133    public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq,
134      final Configuration conf, final Region region, final RegionServerServices services,
135      final ConcurrentMap<String, Object> sharedData) {
136      super(impl, priority, seq, conf);
137      this.region = region;
138      this.sharedData = sharedData;
139      this.services = services;
140      this.metricRegistry =
141        MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
142      // Some unit tests reach this line with services == null, and are okay with rpcQuotaManager
143      // being null. Let these unit tests succeed. This should not happen in real usage.
144      if (services != null) {
145        this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
146      } else {
147        this.rpcQuotaManager = null;
148      }
149    }
150
151    /** Returns the region */
152    @Override
153    public Region getRegion() {
154      return region;
155    }
156
157    @Override
158    public OnlineRegions getOnlineRegions() {
159      return this.services;
160    }
161
162    @Override
163    public Connection getConnection() {
164      // Mocks may have services as null at test time.
165      return services != null ? new SharedConnection(services.getConnection()) : null;
166    }
167
168    @Override
169    public Connection createConnection(Configuration conf) throws IOException {
170      return services != null ? this.services.createConnection(conf) : null;
171    }
172
173    @Override
174    public ServerName getServerName() {
175      return services != null ? services.getServerName() : null;
176    }
177
178    @Override
179    public void shutdown() {
180      super.shutdown();
181      MetricsCoprocessor.removeRegistry(this.metricRegistry);
182    }
183
184    @Override
185    public ConcurrentMap<String, Object> getSharedData() {
186      return sharedData;
187    }
188
189    @Override
190    public RegionInfo getRegionInfo() {
191      return region.getRegionInfo();
192    }
193
194    @Override
195    public MetricRegistry getMetricRegistryForRegionServer() {
196      return metricRegistry;
197    }
198
199    @Override
200    public RawCellBuilder getCellBuilder() {
201      // We always do a DEEP_COPY only
202      return RawCellBuilderFactory.create();
203    }
204
205    @Override
206    public RpcQuotaManager getRpcQuotaManager() {
207      return rpcQuotaManager;
208    }
209
210    @Override
211    public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
212      long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException {
213      ClientProtos.ScanRequest scanRequest = RequestConverter
214        .buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false);
215      long maxScannerResultSize =
216        services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
217          HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
218      return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize,
219        maxBlockBytesScanned, prevBlockBytesScannedDifference);
220    }
221
222    @Override
223    public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
224      throws IOException, RpcThrottlingException {
225      return rpcQuotaManager.checkBatchQuota(region, type);
226    }
227
228    @Override
229    public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
230      throws IOException, RpcThrottlingException {
231      return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads, false);
232    }
233  }
234
235  /**
236   * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors
237   * only. Temporary hack until Core Coprocessors are integrated into Core.
238   */
239  private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment
240    implements HasRegionServerServices {
241    private final RegionServerServices rsServices;
242
243    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
244      final int seq, final Configuration conf, final Region region,
245      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
246      super(impl, priority, seq, conf, region, services, sharedData);
247      this.rsServices = services;
248    }
249
250    /**
251     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
252     *         consumption.
253     */
254    @Override
255    public RegionServerServices getRegionServerServices() {
256      return this.rsServices;
257    }
258  }
259
260  static class TableCoprocessorAttribute {
261    private Path path;
262    private String className;
263    private int priority;
264    private Configuration conf;
265
266    public TableCoprocessorAttribute(Path path, String className, int priority,
267      Configuration conf) {
268      this.path = path;
269      this.className = className;
270      this.priority = priority;
271      this.conf = conf;
272    }
273
274    public Path getPath() {
275      return path;
276    }
277
278    public String getClassName() {
279      return className;
280    }
281
282    public int getPriority() {
283      return priority;
284    }
285
286    public Configuration getConf() {
287      return conf;
288    }
289  }
290
291  /** The region server services */
292  RegionServerServices rsServices;
293  /** The region */
294  HRegion region;
295
296  /**
297   * Constructor
298   * @param region     the region
299   * @param rsServices interface to available region server functionality
300   * @param conf       the configuration
301   */
302  @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization
303  public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices,
304    final Configuration conf) {
305    super(rsServices);
306    this.conf = conf;
307    this.rsServices = rsServices;
308    this.region = region;
309    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
310
311    // load system default cp's from configuration.
312    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
313
314    // load system default cp's for user tables from configuration.
315    if (!region.getRegionInfo().getTable().isSystemTable()) {
316      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
317    }
318
319    // load Coprocessor From HDFS
320    loadTableCoprocessors(conf);
321
322    // now check whether any coprocessor implements postScannerFilterRow
323    boolean hasCustomPostScannerFilterRow = false;
324    out: for (RegionCoprocessorEnvironment env : coprocEnvironments) {
325      if (env.getInstance() instanceof RegionObserver) {
326        Class<?> clazz = env.getInstance().getClass();
327        for (;;) {
328          if (clazz == Object.class) {
329            // we dont need to look postScannerFilterRow into Object class
330            break; // break the inner loop
331          }
332          try {
333            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
334              InternalScanner.class, Cell.class, boolean.class);
335            // this coprocessor has a custom version of postScannerFilterRow
336            hasCustomPostScannerFilterRow = true;
337            break out;
338          } catch (NoSuchMethodException ignore) {
339          }
340          // the deprecated signature still exists
341          try {
342            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
343              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
344            // this coprocessor has a custom version of postScannerFilterRow
345            hasCustomPostScannerFilterRow = true;
346            break out;
347          } catch (NoSuchMethodException ignore) {
348          }
349          clazz = clazz.getSuperclass();
350        }
351      }
352    }
353    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
354  }
355
356  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
357    TableDescriptor htd) {
358    return htd.getCoprocessorDescriptors().stream().map(cp -> {
359      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
360      Configuration ourConf;
361      if (!cp.getProperties().isEmpty()) {
362        // do an explicit deep copy of the passed configuration
363        ourConf = new Configuration(false);
364        HBaseConfiguration.merge(ourConf, conf);
365        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
366      } else {
367        ourConf = conf;
368      }
369      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
370    }).collect(Collectors.toList());
371  }
372
373  /**
374   * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception
375   * if there is a problem.
376   */
377  public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd)
378    throws IOException {
379    String pathPrefix = UUID.randomUUID().toString();
380    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) {
381      if (attr.getPriority() < 0) {
382        throw new IOException(
383          "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0");
384      }
385      ClassLoader old = Thread.currentThread().getContextClassLoader();
386      try {
387        ClassLoader cl;
388        if (attr.getPath() != null) {
389          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
390            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
391        } else {
392          cl = CoprocessorHost.class.getClassLoader();
393        }
394        Thread.currentThread().setContextClassLoader(cl);
395        if (cl instanceof CoprocessorClassLoader) {
396          String[] includedClassPrefixes = null;
397          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
398            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
399            includedClassPrefixes = prefixes.split(";");
400          }
401          ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes);
402        } else {
403          cl.loadClass(attr.getClassName());
404        }
405      } catch (ClassNotFoundException e) {
406        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
407      } finally {
408        Thread.currentThread().setContextClassLoader(old);
409      }
410    }
411  }
412
413  void loadTableCoprocessors(final Configuration conf) {
414    boolean coprocessorsEnabled =
415      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
416    boolean tableCoprocessorsEnabled =
417      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
418    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
419      return;
420    }
421
422    // scan the table attributes for coprocessor load specifications
423    // initialize the coprocessors
424    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
425    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf,
426      region.getTableDescriptor())) {
427      // Load encompasses classloading and coprocessor initialization
428      try {
429        RegionCoprocessorEnvironment env =
430          load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf());
431        if (env == null) {
432          continue;
433        }
434        configured.add(env);
435        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of "
436          + region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
437      } catch (Throwable t) {
438        // Coprocessor failed to load, do we abort on error?
439        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
440          abortServer(attr.getClassName(), t);
441        } else {
442          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
443        }
444      }
445    }
446    // add together to coprocessor set for COW efficiency
447    coprocEnvironments.addAll(configured);
448  }
449
450  @Override
451  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
452    Configuration conf) {
453    // If coprocessor exposes any services, register them.
454    for (Service service : instance.getServices()) {
455      region.registerService(service);
456    }
457    ConcurrentMap<String, Object> classData;
458    // make sure only one thread can add maps
459    synchronized (SHARED_DATA_MAP) {
460      // as long as at least one RegionEnvironment holds on to its classData it will
461      // remain in this map
462      classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
463        k -> new ConcurrentHashMap<>());
464    }
465    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
466    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
467      ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices,
468        classData)
469      : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
470  }
471
472  @Override
473  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
474    throws InstantiationException, IllegalAccessException {
475    try {
476      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
477        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
478      } else {
479        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
480          implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
481        return null;
482      }
483    } catch (NoSuchMethodException | InvocationTargetException e) {
484      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
485    }
486  }
487
488  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
489    RegionCoprocessor::getRegionObserver;
490
491  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
492    RegionCoprocessor::getEndpointObserver;
493
494  abstract class RegionObserverOperationWithoutResult
495    extends ObserverOperationWithoutResult<RegionObserver> {
496    public RegionObserverOperationWithoutResult() {
497      super(regionObserverGetter);
498    }
499
500    public RegionObserverOperationWithoutResult(User user) {
501      super(regionObserverGetter, user);
502    }
503
504    public RegionObserverOperationWithoutResult(ObserverRpcCallContext rpcCallContext) {
505      super(regionObserverGetter, rpcCallContext);
506    }
507
508    public RegionObserverOperationWithoutResult(boolean bypassable) {
509      super(regionObserverGetter, (ObserverRpcCallContext) null, bypassable);
510    }
511
512    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
513      super(regionObserverGetter, user, bypassable);
514    }
515
516    public RegionObserverOperationWithoutResult(ObserverRpcCallContext rpcCallContext,
517      boolean bypassable) {
518      super(regionObserverGetter, rpcCallContext, bypassable);
519    }
520  }
521
522  abstract class BulkLoadObserverOperation
523    extends ObserverOperationWithoutResult<BulkLoadObserver> {
524    public BulkLoadObserverOperation(User user) {
525      super(RegionCoprocessor::getBulkLoadObserver, user);
526    }
527  }
528
529  //////////////////////////////////////////////////////////////////////////////////////////////////
530  // Observer operations
531  //////////////////////////////////////////////////////////////////////////////////////////////////
532
533  //////////////////////////////////////////////////////////////////////////////////////////////////
534  // Observer operations
535  //////////////////////////////////////////////////////////////////////////////////////////////////
536
537  /**
538   * Invoked before a region open.
539   * @throws IOException Signals that an I/O exception has occurred.
540   */
541  public void preOpen() throws IOException {
542    if (coprocEnvironments.isEmpty()) {
543      return;
544    }
545    execOperation(new RegionObserverOperationWithoutResult() {
546      @Override
547      public void call(RegionObserver observer) throws IOException {
548        observer.preOpen(this);
549      }
550    });
551  }
552
553  /**
554   * Invoked after a region open
555   */
556  public void postOpen() {
557    if (coprocEnvironments.isEmpty()) {
558      return;
559    }
560    try {
561      execOperation(new RegionObserverOperationWithoutResult() {
562        @Override
563        public void call(RegionObserver observer) throws IOException {
564          observer.postOpen(this);
565        }
566      });
567    } catch (IOException e) {
568      LOG.warn(e.toString(), e);
569    }
570  }
571
572  /**
573   * Invoked before a region is closed
574   * @param abortRequested true if the server is aborting
575   */
576  public void preClose(final boolean abortRequested) throws IOException {
577    execOperation(new RegionObserverOperationWithoutResult() {
578      @Override
579      public void call(RegionObserver observer) throws IOException {
580        observer.preClose(this, abortRequested);
581      }
582    });
583  }
584
585  /**
586   * Invoked after a region is closed
587   * @param abortRequested true if the server is aborting
588   */
589  public void postClose(final boolean abortRequested) {
590    try {
591      execOperation(new RegionObserverOperationWithoutResult() {
592        @Override
593        public void call(RegionObserver observer) throws IOException {
594          observer.postClose(this, abortRequested);
595        }
596
597        @Override
598        public void postEnvCall() {
599          shutdown(this.getEnvironment());
600        }
601      });
602    } catch (IOException e) {
603      LOG.warn(e.toString(), e);
604    }
605  }
606
607  /**
608   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
609   * available candidates.
610   * <p>
611   * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the
612   * passed in <code>candidates</code>.
613   * @param store      The store where compaction is being requested
614   * @param candidates The currently available store files
615   * @param tracker    used to track the life cycle of a compaction
616   * @param user       the user
617   */
618  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
619    final CompactionLifeCycleTracker tracker, final User user) throws IOException {
620    if (coprocEnvironments.isEmpty()) {
621      return false;
622    }
623    boolean bypassable = true;
624    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
625      @Override
626      public void call(RegionObserver observer) throws IOException {
627        observer.preCompactSelection(this, store, candidates, tracker);
628      }
629    });
630  }
631
632  /**
633   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
634   * candidates.
635   * @param store    The store where compaction is being requested
636   * @param selected The store files selected to compact
637   * @param tracker  used to track the life cycle of a compaction
638   * @param request  the compaction request
639   * @param user     the user
640   */
641  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
642    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
643    throws IOException {
644    if (coprocEnvironments.isEmpty()) {
645      return;
646    }
647    execOperation(new RegionObserverOperationWithoutResult(user) {
648      @Override
649      public void call(RegionObserver observer) throws IOException {
650        observer.postCompactSelection(this, store, selected, tracker, request);
651      }
652    });
653  }
654
655  /**
656   * Called prior to opening store scanner for compaction.
657   */
658  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
659    CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
660    if (coprocEnvironments.isEmpty()) {
661      return store.getScanInfo();
662    }
663    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
664    execOperation(new RegionObserverOperationWithoutResult(user) {
665      @Override
666      public void call(RegionObserver observer) throws IOException {
667        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
668      }
669    });
670    return builder.build();
671  }
672
673  /**
674   * Called prior to rewriting the store files selected for compaction
675   * @param store    the store being compacted
676   * @param scanner  the scanner used to read store data during compaction
677   * @param scanType type of Scan
678   * @param tracker  used to track the life cycle of a compaction
679   * @param request  the compaction request
680   * @param user     the user
681   * @return Scanner to use (cannot be null!)
682   */
683  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
684    final ScanType scanType, final CompactionLifeCycleTracker tracker,
685    final CompactionRequest request, final User user) throws IOException {
686    InternalScanner defaultResult = scanner;
687    if (coprocEnvironments.isEmpty()) {
688      return defaultResult;
689    }
690    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
691      regionObserverGetter, defaultResult, user) {
692      @Override
693      public InternalScanner call(RegionObserver observer) throws IOException {
694        InternalScanner scanner =
695          observer.preCompact(this, store, getResult(), scanType, tracker, request);
696        if (scanner == null) {
697          throw new CoprocessorException("Null Scanner return disallowed!");
698        }
699        return scanner;
700      }
701    });
702  }
703
704  /**
705   * Called after the store compaction has completed.
706   * @param store      the store being compacted
707   * @param resultFile the new store file written during compaction
708   * @param tracker    used to track the life cycle of a compaction
709   * @param request    the compaction request
710   * @param user       the user
711   */
712  public void postCompact(final HStore store, final HStoreFile resultFile,
713    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
714    throws IOException {
715    execOperation(
716      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) {
717        @Override
718        public void call(RegionObserver observer) throws IOException {
719          observer.postCompact(this, store, resultFile, tracker, request);
720        }
721      });
722  }
723
724  /**
725   * Invoked before create StoreScanner for flush.
726   */
727  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
728    throws IOException {
729    if (coprocEnvironments.isEmpty()) {
730      return store.getScanInfo();
731    }
732    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
733    execOperation(new RegionObserverOperationWithoutResult() {
734      @Override
735      public void call(RegionObserver observer) throws IOException {
736        observer.preFlushScannerOpen(this, store, builder, tracker);
737      }
738    });
739    return builder.build();
740  }
741
742  /**
743   * Invoked before a memstore flush
744   * @return Scanner to use (cannot be null!)
745   */
746  public InternalScanner preFlush(HStore store, InternalScanner scanner,
747    FlushLifeCycleTracker tracker) throws IOException {
748    if (coprocEnvironments.isEmpty()) {
749      return scanner;
750    }
751    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
752      regionObserverGetter, scanner) {
753      @Override
754      public InternalScanner call(RegionObserver observer) throws IOException {
755        InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
756        if (scanner == null) {
757          throw new CoprocessorException("Null Scanner return disallowed!");
758        }
759        return scanner;
760      }
761    });
762  }
763
764  /**
765   * Invoked before a memstore flush
766   */
767  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
768    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
769      @Override
770      public void call(RegionObserver observer) throws IOException {
771        observer.preFlush(this, tracker);
772      }
773    });
774  }
775
776  /**
777   * Invoked after a memstore flush
778   */
779  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
780    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
781      @Override
782      public void call(RegionObserver observer) throws IOException {
783        observer.postFlush(this, tracker);
784      }
785    });
786  }
787
788  /**
789   * Invoked before in memory compaction.
790   */
791  public void preMemStoreCompaction(HStore store) throws IOException {
792    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
793      @Override
794      public void call(RegionObserver observer) throws IOException {
795        observer.preMemStoreCompaction(this, store);
796      }
797    });
798  }
799
800  /**
801   * Invoked before create StoreScanner for in memory compaction.
802   */
803  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
804    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
805    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
806      @Override
807      public void call(RegionObserver observer) throws IOException {
808        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
809      }
810    });
811    return builder.build();
812  }
813
814  /**
815   * Invoked before compacting memstore.
816   */
817  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
818    throws IOException {
819    if (coprocEnvironments.isEmpty()) {
820      return scanner;
821    }
822    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
823      regionObserverGetter, scanner) {
824      @Override
825      public InternalScanner call(RegionObserver observer) throws IOException {
826        return observer.preMemStoreCompactionCompact(this, store, getResult());
827      }
828    });
829  }
830
831  /**
832   * Invoked after in memory compaction.
833   */
834  public void postMemStoreCompaction(HStore store) throws IOException {
835    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
836      @Override
837      public void call(RegionObserver observer) throws IOException {
838        observer.postMemStoreCompaction(this, store);
839      }
840    });
841  }
842
843  /**
844   * Invoked after a memstore flush
845   */
846  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
847    throws IOException {
848    if (coprocEnvironments.isEmpty()) {
849      return;
850    }
851    execOperation(new RegionObserverOperationWithoutResult() {
852      @Override
853      public void call(RegionObserver observer) throws IOException {
854        observer.postFlush(this, store, storeFile, tracker);
855      }
856    });
857  }
858
859  // RegionObserver support
860  /**
861   * Supports Coprocessor 'bypass'.
862   * @param get     the Get request
863   * @param results What to return if return is true/'bypass'.
864   * @return true if default processing should be bypassed.
865   * @exception IOException Exception
866   */
867  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
868    if (coprocEnvironments.isEmpty()) {
869      return false;
870    }
871    boolean bypassable = true;
872    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
873      @Override
874      public void call(RegionObserver observer) throws IOException {
875        observer.preGetOp(this, get, results);
876      }
877    });
878  }
879
880  /**
881   * @param get     the Get request
882   * @param results the result set
883   * @exception IOException Exception
884   */
885  public void postGet(final Get get, final List<Cell> results) throws IOException {
886    if (coprocEnvironments.isEmpty()) {
887      return;
888    }
889    execOperation(new RegionObserverOperationWithoutResult() {
890      @Override
891      public void call(RegionObserver observer) throws IOException {
892        observer.postGetOp(this, get, results);
893      }
894    });
895  }
896
897  /**
898   * Supports Coprocessor 'bypass'.
899   * @param get the Get request
900   * @return true or false to return to client if bypassing normal operation, or null otherwise
901   * @exception IOException Exception
902   */
903  public Boolean preExists(final Get get) throws IOException {
904    boolean bypassable = true;
905    boolean defaultResult = false;
906    if (coprocEnvironments.isEmpty()) {
907      return null;
908    }
909    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
910      regionObserverGetter, defaultResult, bypassable) {
911      @Override
912      public Boolean call(RegionObserver observer) throws IOException {
913        return observer.preExists(this, get, getResult());
914      }
915    });
916  }
917
918  /**
919   * @param get    the Get request
920   * @param result the result returned by the region server
921   * @return the result to return to the client
922   * @exception IOException Exception
923   */
924  public boolean postExists(final Get get, boolean result) throws IOException {
925    if (this.coprocEnvironments.isEmpty()) {
926      return result;
927    }
928    return execOperationWithResult(
929      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
930        @Override
931        public Boolean call(RegionObserver observer) throws IOException {
932          return observer.postExists(this, get, getResult());
933        }
934      });
935  }
936
937  /**
938   * Supports Coprocessor 'bypass'.
939   * @param put  The Put object
940   * @param edit The WALEdit object.
941   * @return true if default processing should be bypassed
942   * @exception IOException Exception
943   */
944  public boolean prePut(final Put put, final WALEdit edit) throws IOException {
945    if (coprocEnvironments.isEmpty()) {
946      return false;
947    }
948    boolean bypassable = true;
949    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
950      @Override
951      public void call(RegionObserver observer) throws IOException {
952        observer.prePut(this, put, edit);
953      }
954    });
955  }
956
957  /**
958   * Supports Coprocessor 'bypass'.
959   * @param mutation - the current mutation
960   * @param kv       - the current cell
961   * @param byteNow  - current timestamp in bytes
962   * @param get      - the get that could be used Note that the get only does not specify the family
963   *                 and qualifier that should be used
964   * @return true if default processing should be bypassed
965   */
966  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
967    final byte[] byteNow, final Get get) throws IOException {
968    if (coprocEnvironments.isEmpty()) {
969      return false;
970    }
971    boolean bypassable = true;
972    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
973      @Override
974      public void call(RegionObserver observer) throws IOException {
975        observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
976      }
977    });
978  }
979
980  /**
981   * @param put  The Put object
982   * @param edit The WALEdit object.
983   * @exception IOException Exception
984   */
985  public void postPut(final Put put, final WALEdit edit) throws IOException {
986    if (coprocEnvironments.isEmpty()) {
987      return;
988    }
989    execOperation(new RegionObserverOperationWithoutResult() {
990      @Override
991      public void call(RegionObserver observer) throws IOException {
992        observer.postPut(this, put, edit);
993      }
994    });
995  }
996
997  /**
998   * Supports Coprocessor 'bypass'.
999   * @param delete The Delete object
1000   * @param edit   The WALEdit object.
1001   * @return true if default processing should be bypassed
1002   * @exception IOException Exception
1003   */
1004  public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
1005    if (this.coprocEnvironments.isEmpty()) {
1006      return false;
1007    }
1008    boolean bypassable = true;
1009    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
1010      @Override
1011      public void call(RegionObserver observer) throws IOException {
1012        observer.preDelete(this, delete, edit);
1013      }
1014    });
1015  }
1016
1017  /**
1018   * @param delete The Delete object
1019   * @param edit   The WALEdit object.
1020   * @exception IOException Exception
1021   */
1022  public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
1023    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1024      @Override
1025      public void call(RegionObserver observer) throws IOException {
1026        observer.postDelete(this, delete, edit);
1027      }
1028    });
1029  }
1030
1031  public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
1032    throws IOException {
1033    if (this.coprocEnvironments.isEmpty()) {
1034      return;
1035    }
1036    execOperation(new RegionObserverOperationWithoutResult() {
1037      @Override
1038      public void call(RegionObserver observer) throws IOException {
1039        observer.preBatchMutate(this, miniBatchOp);
1040      }
1041    });
1042  }
1043
1044  public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
1045    throws IOException {
1046    if (this.coprocEnvironments.isEmpty()) {
1047      return;
1048    }
1049    execOperation(new RegionObserverOperationWithoutResult() {
1050      @Override
1051      public void call(RegionObserver observer) throws IOException {
1052        observer.postBatchMutate(this, miniBatchOp);
1053      }
1054    });
1055  }
1056
1057  public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
1058    final boolean success) throws IOException {
1059    if (this.coprocEnvironments.isEmpty()) {
1060      return;
1061    }
1062    execOperation(new RegionObserverOperationWithoutResult() {
1063      @Override
1064      public void call(RegionObserver observer) throws IOException {
1065        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1066      }
1067    });
1068  }
1069
1070  /**
1071   * Supports Coprocessor 'bypass'.
1072   * @param checkAndMutate the CheckAndMutate object
1073   * @return true or false to return to client if default processing should be bypassed, or null
1074   *         otherwise
1075   * @throws IOException if an error occurred on the coprocessor
1076   */
1077  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException {
1078    boolean bypassable = true;
1079    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1080    if (coprocEnvironments.isEmpty()) {
1081      return null;
1082    }
1083    return execOperationWithResult(
1084      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1085        defaultResult, bypassable) {
1086        @Override
1087        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1088          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1089        }
1090      });
1091  }
1092
1093  /**
1094   * Supports Coprocessor 'bypass'.
1095   * @param checkAndMutate the CheckAndMutate object
1096   * @return true or false to return to client if default processing should be bypassed, or null
1097   *         otherwise
1098   * @throws IOException if an error occurred on the coprocessor
1099   */
1100  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1101    throws IOException {
1102    boolean bypassable = true;
1103    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1104    if (coprocEnvironments.isEmpty()) {
1105      return null;
1106    }
1107    return execOperationWithResult(
1108      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1109        defaultResult, bypassable) {
1110        @Override
1111        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1112          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1113        }
1114      });
1115  }
1116
1117  /**
1118   * @param checkAndMutate the CheckAndMutate object
1119   * @param result         the result returned by the checkAndMutate
1120   * @return true or false to return to client if default processing should be bypassed, or null
1121   *         otherwise
1122   * @throws IOException if an error occurred on the coprocessor
1123   */
1124  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1125    CheckAndMutateResult result) throws IOException {
1126    if (this.coprocEnvironments.isEmpty()) {
1127      return result;
1128    }
1129    return execOperationWithResult(
1130      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1131        result) {
1132        @Override
1133        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1134          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1135        }
1136      });
1137  }
1138
1139  /**
1140   * Supports Coprocessor 'bypass'.
1141   * @param append append object
1142   * @param edit   The WALEdit object.
1143   * @return result to return to client if default operation should be bypassed, null otherwise
1144   * @throws IOException if an error occurred on the coprocessor
1145   */
1146  public Result preAppend(final Append append, final WALEdit edit) throws IOException {
1147    boolean bypassable = true;
1148    Result defaultResult = null;
1149    if (this.coprocEnvironments.isEmpty()) {
1150      return defaultResult;
1151    }
1152    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1153      regionObserverGetter, defaultResult, bypassable) {
1154      @Override
1155      public Result call(RegionObserver observer) throws IOException {
1156        return observer.preAppend(this, append, edit);
1157      }
1158    });
1159  }
1160
1161  /**
1162   * Supports Coprocessor 'bypass'.
1163   * @param append append object
1164   * @return result to return to client if default operation should be bypassed, null otherwise
1165   * @throws IOException if an error occurred on the coprocessor
1166   */
1167  public Result preAppendAfterRowLock(final Append append) throws IOException {
1168    boolean bypassable = true;
1169    Result defaultResult = null;
1170    if (this.coprocEnvironments.isEmpty()) {
1171      return defaultResult;
1172    }
1173    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1174      regionObserverGetter, defaultResult, bypassable) {
1175      @Override
1176      public Result call(RegionObserver observer) throws IOException {
1177        return observer.preAppendAfterRowLock(this, append);
1178      }
1179    });
1180  }
1181
1182  /**
1183   * Supports Coprocessor 'bypass'.
1184   * @param increment increment object
1185   * @param edit      The WALEdit object.
1186   * @return result to return to client if default operation should be bypassed, null otherwise
1187   * @throws IOException if an error occurred on the coprocessor
1188   */
1189  public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
1190    boolean bypassable = true;
1191    Result defaultResult = null;
1192    if (coprocEnvironments.isEmpty()) {
1193      return defaultResult;
1194    }
1195    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1196      regionObserverGetter, defaultResult, bypassable) {
1197      @Override
1198      public Result call(RegionObserver observer) throws IOException {
1199        return observer.preIncrement(this, increment, edit);
1200      }
1201    });
1202  }
1203
1204  /**
1205   * Supports Coprocessor 'bypass'.
1206   * @param increment increment object
1207   * @return result to return to client if default operation should be bypassed, null otherwise
1208   * @throws IOException if an error occurred on the coprocessor
1209   */
1210  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1211    boolean bypassable = true;
1212    Result defaultResult = null;
1213    if (coprocEnvironments.isEmpty()) {
1214      return defaultResult;
1215    }
1216    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1217      regionObserverGetter, defaultResult, bypassable) {
1218      @Override
1219      public Result call(RegionObserver observer) throws IOException {
1220        return observer.preIncrementAfterRowLock(this, increment);
1221      }
1222    });
1223  }
1224
1225  /**
1226   * @param append Append object
1227   * @param result the result returned by the append
1228   * @param edit   The WALEdit object.
1229   * @throws IOException if an error occurred on the coprocessor
1230   */
1231  public Result postAppend(final Append append, final Result result, final WALEdit edit)
1232    throws IOException {
1233    if (this.coprocEnvironments.isEmpty()) {
1234      return result;
1235    }
1236    return execOperationWithResult(
1237      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1238        @Override
1239        public Result call(RegionObserver observer) throws IOException {
1240          return observer.postAppend(this, append, result, edit);
1241        }
1242      });
1243  }
1244
1245  /**
1246   * @param increment increment object
1247   * @param result    the result returned by postIncrement
1248   * @param edit      The WALEdit object.
1249   * @throws IOException if an error occurred on the coprocessor
1250   */
1251  public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
1252    throws IOException {
1253    if (this.coprocEnvironments.isEmpty()) {
1254      return result;
1255    }
1256    return execOperationWithResult(
1257      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1258        @Override
1259        public Result call(RegionObserver observer) throws IOException {
1260          return observer.postIncrement(this, increment, getResult(), edit);
1261        }
1262      });
1263  }
1264
1265  /**
1266   * @param scan the Scan specification
1267   * @exception IOException Exception
1268   */
1269  public void preScannerOpen(final Scan scan) throws IOException {
1270    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1271      @Override
1272      public void call(RegionObserver observer) throws IOException {
1273        observer.preScannerOpen(this, scan);
1274      }
1275    });
1276  }
1277
1278  /**
1279   * @param scan the Scan specification
1280   * @param s    the scanner
1281   * @return the scanner instance to use
1282   * @exception IOException Exception
1283   */
1284  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1285    if (this.coprocEnvironments.isEmpty()) {
1286      return s;
1287    }
1288    return execOperationWithResult(
1289      new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1290        @Override
1291        public RegionScanner call(RegionObserver observer) throws IOException {
1292          return observer.postScannerOpen(this, scan, getResult());
1293        }
1294      });
1295  }
1296
1297  /**
1298   * @param s       the scanner
1299   * @param results the result set returned by the region server
1300   * @param limit   the maximum number of results to return
1301   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1302   * @exception IOException Exception
1303   */
1304  public Boolean preScannerNext(final InternalScanner s, final List<Result> results,
1305    final int limit) throws IOException {
1306    boolean bypassable = true;
1307    boolean defaultResult = false;
1308    if (coprocEnvironments.isEmpty()) {
1309      return null;
1310    }
1311    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1312      regionObserverGetter, defaultResult, bypassable) {
1313      @Override
1314      public Boolean call(RegionObserver observer) throws IOException {
1315        return observer.preScannerNext(this, s, results, limit, getResult());
1316      }
1317    });
1318  }
1319
1320  /**
1321   * @param s       the scanner
1322   * @param results the result set returned by the region server
1323   * @param limit   the maximum number of results to return
1324   * @return 'has more' indication to give to client
1325   * @exception IOException Exception
1326   */
1327  public boolean postScannerNext(final InternalScanner s, final List<Result> results,
1328    final int limit, boolean hasMore) throws IOException {
1329    if (this.coprocEnvironments.isEmpty()) {
1330      return hasMore;
1331    }
1332    return execOperationWithResult(
1333      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1334        @Override
1335        public Boolean call(RegionObserver observer) throws IOException {
1336          return observer.postScannerNext(this, s, results, limit, getResult());
1337        }
1338      });
1339  }
1340
1341  /**
1342   * This will be called by the scan flow when the current scanned row is being filtered out by the
1343   * filter.
1344   * @param s          the scanner
1345   * @param curRowCell The cell in the current row which got filtered out
1346   * @return whether more rows are available for the scanner or not
1347   */
1348  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1349    throws IOException {
1350    // short circuit for performance
1351    boolean defaultResult = true;
1352    if (!hasCustomPostScannerFilterRow) {
1353      return defaultResult;
1354    }
1355    if (this.coprocEnvironments.isEmpty()) {
1356      return defaultResult;
1357    }
1358    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1359      regionObserverGetter, defaultResult) {
1360      @Override
1361      public Boolean call(RegionObserver observer) throws IOException {
1362        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1363      }
1364    });
1365  }
1366
1367  /**
1368   * Supports Coprocessor 'bypass'.
1369   * @param s the scanner
1370   * @return true if default behavior should be bypassed, false otherwise
1371   * @exception IOException Exception
1372   */
1373  // Should this be bypassable?
1374  public boolean preScannerClose(final InternalScanner s) throws IOException {
1375    return execOperation(
1376      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1377        @Override
1378        public void call(RegionObserver observer) throws IOException {
1379          observer.preScannerClose(this, s);
1380        }
1381      });
1382  }
1383
1384  /**
1385   * @exception IOException Exception
1386   */
1387  public void postScannerClose(final InternalScanner s) throws IOException {
1388    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1389      @Override
1390      public void call(RegionObserver observer) throws IOException {
1391        observer.postScannerClose(this, s);
1392      }
1393    });
1394  }
1395
1396  /**
1397   * Called before open store scanner for user scan.
1398   */
1399  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1400    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1401    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1402    execOperation(new RegionObserverOperationWithoutResult() {
1403      @Override
1404      public void call(RegionObserver observer) throws IOException {
1405        observer.preStoreScannerOpen(this, store, builder);
1406      }
1407    });
1408    return builder.build();
1409  }
1410
1411  /**
1412   * @param info  the RegionInfo for this region
1413   * @param edits the file of recovered edits
1414   */
1415  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1416    execOperation(
1417      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1418        @Override
1419        public void call(RegionObserver observer) throws IOException {
1420          observer.preReplayWALs(this, info, edits);
1421        }
1422      });
1423  }
1424
1425  /**
1426   * @param info  the RegionInfo for this region
1427   * @param edits the file of recovered edits
1428   * @throws IOException Exception
1429   */
1430  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1431    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1432      @Override
1433      public void call(RegionObserver observer) throws IOException {
1434        observer.postReplayWALs(this, info, edits);
1435      }
1436    });
1437  }
1438
1439  /**
1440   * Supports Coprocessor 'bypass'.
1441   * @return true if default behavior should be bypassed, false otherwise
1442   */
1443  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1444    throws IOException {
1445    return execOperation(
1446      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1447        @Override
1448        public void call(RegionObserver observer) throws IOException {
1449          observer.preWALRestore(this, info, logKey, logEdit);
1450        }
1451      });
1452  }
1453
1454  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1455    throws IOException {
1456    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1457      @Override
1458      public void call(RegionObserver observer) throws IOException {
1459        observer.postWALRestore(this, info, logKey, logEdit);
1460      }
1461    });
1462  }
1463
1464  /**
1465   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1466   */
1467  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1468    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1469      @Override
1470      public void call(RegionObserver observer) throws IOException {
1471        observer.preBulkLoadHFile(this, familyPaths);
1472      }
1473    });
1474  }
1475
1476  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1477    throws IOException {
1478    return execOperation(
1479      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1480        @Override
1481        public void call(RegionObserver observer) throws IOException {
1482          observer.preCommitStoreFile(this, family, pairs);
1483        }
1484      });
1485  }
1486
1487  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath)
1488    throws IOException {
1489    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1490      @Override
1491      public void call(RegionObserver observer) throws IOException {
1492        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1493      }
1494    });
1495  }
1496
1497  /**
1498   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1499   * @param map         Map of CF to List of file paths for the final loaded files
1500   */
1501  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1502    Map<byte[], List<Path>> map) throws IOException {
1503    if (this.coprocEnvironments.isEmpty()) {
1504      return;
1505    }
1506    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1507      @Override
1508      public void call(RegionObserver observer) throws IOException {
1509        observer.postBulkLoadHFile(this, familyPaths, map);
1510      }
1511    });
1512  }
1513
1514  public void postStartRegionOperation(final Operation op) throws IOException {
1515    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1516      @Override
1517      public void call(RegionObserver observer) throws IOException {
1518        observer.postStartRegionOperation(this, op);
1519      }
1520    });
1521  }
1522
1523  public void postCloseRegionOperation(final Operation op) throws IOException {
1524    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1525      @Override
1526      public void call(RegionObserver observer) throws IOException {
1527        observer.postCloseRegionOperation(this, op);
1528      }
1529    });
1530  }
1531
1532  /**
1533   * @param fs   fileystem to read from
1534   * @param p    path to the file
1535   * @param in   {@link FSDataInputStreamWrapper}
1536   * @param size Full size of the file
1537   * @param r    original reference file. This will be not null only when reading a split file.
1538   * @return a Reader instance to use instead of the base reader if overriding default behavior,
1539   *         null otherwise
1540   */
1541  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1542    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1543    final Reference r) throws IOException {
1544    if (coprocEnvironments.isEmpty()) {
1545      return null;
1546    }
1547    return execOperationWithResult(
1548      new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1549        @Override
1550        public StoreFileReader call(RegionObserver observer) throws IOException {
1551          return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1552        }
1553      });
1554  }
1555
1556  /**
1557   * @param fs     fileystem to read from
1558   * @param p      path to the file
1559   * @param in     {@link FSDataInputStreamWrapper}
1560   * @param size   Full size of the file
1561   * @param r      original reference file. This will be not null only when reading a split file.
1562   * @param reader the base reader instance
1563   * @return The reader to use
1564   */
1565  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1566    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1567    final Reference r, final StoreFileReader reader) throws IOException {
1568    if (this.coprocEnvironments.isEmpty()) {
1569      return reader;
1570    }
1571    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>(
1572      regionObserverGetter, reader) {
1573      @Override
1574      public StoreFileReader call(RegionObserver observer) throws IOException {
1575        return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1576      }
1577    });
1578  }
1579
1580  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1581    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1582    if (this.coprocEnvironments.isEmpty()) {
1583      return cellPairs;
1584    }
1585    return execOperationWithResult(
1586      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1587        cellPairs) {
1588        @Override
1589        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1590          return observer.postIncrementBeforeWAL(this, mutation, getResult());
1591        }
1592      });
1593  }
1594
1595  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1596    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1597    if (this.coprocEnvironments.isEmpty()) {
1598      return cellPairs;
1599    }
1600    return execOperationWithResult(
1601      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1602        cellPairs) {
1603        @Override
1604        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1605          return observer.postAppendBeforeWAL(this, mutation, getResult());
1606        }
1607      });
1608  }
1609
1610  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1611    if (this.coprocEnvironments.isEmpty()) {
1612      return;
1613    }
1614    execOperation(new RegionObserverOperationWithoutResult() {
1615      @Override
1616      public void call(RegionObserver observer) throws IOException {
1617        observer.preWALAppend(this, key, edit);
1618      }
1619    });
1620  }
1621
1622  public Message preEndpointInvocation(final Service service, final String methodName,
1623    Message request) throws IOException {
1624    if (coprocEnvironments.isEmpty()) {
1625      return request;
1626    }
1627    return execOperationWithResult(
1628      new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) {
1629        @Override
1630        public Message call(EndpointObserver observer) throws IOException {
1631          return observer.preEndpointInvocation(this, service, methodName, getResult());
1632        }
1633      });
1634  }
1635
1636  public void postEndpointInvocation(final Service service, final String methodName,
1637    final Message request, final Message.Builder responseBuilder) throws IOException {
1638    execOperation(coprocEnvironments.isEmpty()
1639      ? null
1640      : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1641        @Override
1642        public void call(EndpointObserver observer) throws IOException {
1643          observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1644        }
1645      });
1646  }
1647
1648  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1649    if (this.coprocEnvironments.isEmpty()) {
1650      return result;
1651    }
1652    return execOperationWithResult(
1653      new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) {
1654        @Override
1655        public DeleteTracker call(RegionObserver observer) throws IOException {
1656          return observer.postInstantiateDeleteTracker(this, getResult());
1657        }
1658      });
1659  }
1660
1661  /////////////////////////////////////////////////////////////////////////////////////////////////
1662  // BulkLoadObserver hooks
1663  /////////////////////////////////////////////////////////////////////////////////////////////////
1664  public void prePrepareBulkLoad(User user) throws IOException {
1665    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1666      @Override
1667      protected void call(BulkLoadObserver observer) throws IOException {
1668        observer.prePrepareBulkLoad(this);
1669      }
1670    });
1671  }
1672
1673  public void preCleanupBulkLoad(User user) throws IOException {
1674    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1675      @Override
1676      protected void call(BulkLoadObserver observer) throws IOException {
1677        observer.preCleanupBulkLoad(this);
1678      }
1679    });
1680  }
1681}