001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RawCellBuilder;
036import org.apache.hadoop.hbase.RawCellBuilderFactory;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.CheckAndMutate;
040import org.apache.hadoop.hbase.client.CheckAndMutateResult;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.SharedConnection;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
057import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
063import org.apache.hadoop.hbase.coprocessor.RegionObserver;
064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
065import org.apache.hadoop.hbase.io.Reference;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.metrics.MetricRegistry;
068import org.apache.hadoop.hbase.regionserver.Region.Operation;
069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
071import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
072import org.apache.hadoop.hbase.security.User;
073import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
074import org.apache.hadoop.hbase.util.Pair;
075import org.apache.hadoop.hbase.wal.WALEdit;
076import org.apache.hadoop.hbase.wal.WALKey;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.protobuf.Message;
082import org.apache.hbase.thirdparty.com.google.protobuf.Service;
083import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
084import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
085
086/**
087 * Implements the coprocessor environment and runtime support for coprocessors loaded within a
088 * {@link Region}.
089 */
090@InterfaceAudience.Private
091public class RegionCoprocessorHost
092  extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
093
094  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
095  // The shared data map
096  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
097    new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
098      AbstractReferenceMap.ReferenceStrength.WEAK);
099
100  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
101  private final boolean hasCustomPostScannerFilterRow;
102
103  /*
104   * Whether any configured CPs override postScannerFilterRow hook
105   */
106  public boolean hasCustomPostScannerFilterRow() {
107    return hasCustomPostScannerFilterRow;
108  }
109
110  /**
111   * Encapsulation of the environment of each coprocessor
112   */
113  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
114    implements RegionCoprocessorEnvironment {
115    private Region region;
116    ConcurrentMap<String, Object> sharedData;
117    private final MetricRegistry metricRegistry;
118    private final RegionServerServices services;
119
120    /**
121     * Constructor
122     * @param impl     the coprocessor instance
123     * @param priority chaining priority
124     */
125    public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq,
126      final Configuration conf, final Region region, final RegionServerServices services,
127      final ConcurrentMap<String, Object> sharedData) {
128      super(impl, priority, seq, conf);
129      this.region = region;
130      this.sharedData = sharedData;
131      this.services = services;
132      this.metricRegistry =
133        MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
134    }
135
136    /** Returns the region */
137    @Override
138    public Region getRegion() {
139      return region;
140    }
141
142    @Override
143    public OnlineRegions getOnlineRegions() {
144      return this.services;
145    }
146
147    @Override
148    public Connection getConnection() {
149      // Mocks may have services as null at test time.
150      return services != null ? new SharedConnection(services.getConnection()) : null;
151    }
152
153    @Override
154    public Connection createConnection(Configuration conf) throws IOException {
155      return services != null ? this.services.createConnection(conf) : null;
156    }
157
158    @Override
159    public ServerName getServerName() {
160      return services != null ? services.getServerName() : null;
161    }
162
163    @Override
164    public void shutdown() {
165      super.shutdown();
166      MetricsCoprocessor.removeRegistry(this.metricRegistry);
167    }
168
169    @Override
170    public ConcurrentMap<String, Object> getSharedData() {
171      return sharedData;
172    }
173
174    @Override
175    public RegionInfo getRegionInfo() {
176      return region.getRegionInfo();
177    }
178
179    @Override
180    public MetricRegistry getMetricRegistryForRegionServer() {
181      return metricRegistry;
182    }
183
184    @Override
185    public RawCellBuilder getCellBuilder() {
186      // We always do a DEEP_COPY only
187      return RawCellBuilderFactory.create();
188    }
189  }
190
191  /**
192   * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors
193   * only. Temporary hack until Core Coprocessors are integrated into Core.
194   */
195  private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment
196    implements HasRegionServerServices {
197    private final RegionServerServices rsServices;
198
199    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
200      final int seq, final Configuration conf, final Region region,
201      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
202      super(impl, priority, seq, conf, region, services, sharedData);
203      this.rsServices = services;
204    }
205
206    /**
207     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
208     *         consumption.
209     */
210    @Override
211    public RegionServerServices getRegionServerServices() {
212      return this.rsServices;
213    }
214  }
215
216  static class TableCoprocessorAttribute {
217    private Path path;
218    private String className;
219    private int priority;
220    private Configuration conf;
221
222    public TableCoprocessorAttribute(Path path, String className, int priority,
223      Configuration conf) {
224      this.path = path;
225      this.className = className;
226      this.priority = priority;
227      this.conf = conf;
228    }
229
230    public Path getPath() {
231      return path;
232    }
233
234    public String getClassName() {
235      return className;
236    }
237
238    public int getPriority() {
239      return priority;
240    }
241
242    public Configuration getConf() {
243      return conf;
244    }
245  }
246
247  /** The region server services */
248  RegionServerServices rsServices;
249  /** The region */
250  HRegion region;
251
252  /**
253   * Constructor
254   * @param region     the region
255   * @param rsServices interface to available region server functionality
256   * @param conf       the configuration
257   */
258  @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization
259  public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices,
260    final Configuration conf) {
261    super(rsServices);
262    this.conf = conf;
263    this.rsServices = rsServices;
264    this.region = region;
265    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
266
267    // load system default cp's from configuration.
268    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
269
270    // load system default cp's for user tables from configuration.
271    if (!region.getRegionInfo().getTable().isSystemTable()) {
272      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
273    }
274
275    // load Coprocessor From HDFS
276    loadTableCoprocessors(conf);
277
278    // now check whether any coprocessor implements postScannerFilterRow
279    boolean hasCustomPostScannerFilterRow = false;
280    out: for (RegionCoprocessorEnvironment env : coprocEnvironments) {
281      if (env.getInstance() instanceof RegionObserver) {
282        Class<?> clazz = env.getInstance().getClass();
283        for (;;) {
284          if (clazz == Object.class) {
285            // we dont need to look postScannerFilterRow into Object class
286            break; // break the inner loop
287          }
288          try {
289            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
290              InternalScanner.class, Cell.class, boolean.class);
291            // this coprocessor has a custom version of postScannerFilterRow
292            hasCustomPostScannerFilterRow = true;
293            break out;
294          } catch (NoSuchMethodException ignore) {
295          }
296          // the deprecated signature still exists
297          try {
298            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
299              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
300            // this coprocessor has a custom version of postScannerFilterRow
301            hasCustomPostScannerFilterRow = true;
302            break out;
303          } catch (NoSuchMethodException ignore) {
304          }
305          clazz = clazz.getSuperclass();
306        }
307      }
308    }
309    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
310  }
311
312  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
313    TableDescriptor htd) {
314    return htd.getCoprocessorDescriptors().stream().map(cp -> {
315      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
316      Configuration ourConf;
317      if (!cp.getProperties().isEmpty()) {
318        // do an explicit deep copy of the passed configuration
319        ourConf = new Configuration(false);
320        HBaseConfiguration.merge(ourConf, conf);
321        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
322      } else {
323        ourConf = conf;
324      }
325      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
326    }).collect(Collectors.toList());
327  }
328
329  /**
330   * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception
331   * if there is a problem.
332   */
333  public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd)
334    throws IOException {
335    String pathPrefix = UUID.randomUUID().toString();
336    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) {
337      if (attr.getPriority() < 0) {
338        throw new IOException(
339          "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0");
340      }
341      ClassLoader old = Thread.currentThread().getContextClassLoader();
342      try {
343        ClassLoader cl;
344        if (attr.getPath() != null) {
345          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
346            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
347        } else {
348          cl = CoprocessorHost.class.getClassLoader();
349        }
350        Thread.currentThread().setContextClassLoader(cl);
351        if (cl instanceof CoprocessorClassLoader) {
352          String[] includedClassPrefixes = null;
353          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
354            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
355            includedClassPrefixes = prefixes.split(";");
356          }
357          ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes);
358        } else {
359          cl.loadClass(attr.getClassName());
360        }
361      } catch (ClassNotFoundException e) {
362        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
363      } finally {
364        Thread.currentThread().setContextClassLoader(old);
365      }
366    }
367  }
368
369  void loadTableCoprocessors(final Configuration conf) {
370    boolean coprocessorsEnabled =
371      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
372    boolean tableCoprocessorsEnabled =
373      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
374    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
375      return;
376    }
377
378    // scan the table attributes for coprocessor load specifications
379    // initialize the coprocessors
380    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
381    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf,
382      region.getTableDescriptor())) {
383      // Load encompasses classloading and coprocessor initialization
384      try {
385        RegionCoprocessorEnvironment env =
386          load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf());
387        if (env == null) {
388          continue;
389        }
390        configured.add(env);
391        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of "
392          + region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
393      } catch (Throwable t) {
394        // Coprocessor failed to load, do we abort on error?
395        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
396          abortServer(attr.getClassName(), t);
397        } else {
398          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
399        }
400      }
401    }
402    // add together to coprocessor set for COW efficiency
403    coprocEnvironments.addAll(configured);
404  }
405
406  @Override
407  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
408    Configuration conf) {
409    // If coprocessor exposes any services, register them.
410    for (Service service : instance.getServices()) {
411      region.registerService(service);
412    }
413    ConcurrentMap<String, Object> classData;
414    // make sure only one thread can add maps
415    synchronized (SHARED_DATA_MAP) {
416      // as long as at least one RegionEnvironment holds on to its classData it will
417      // remain in this map
418      classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
419        k -> new ConcurrentHashMap<>());
420    }
421    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
422    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
423      ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices,
424        classData)
425      : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
426  }
427
428  @Override
429  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
430    throws InstantiationException, IllegalAccessException {
431    try {
432      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
433        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
434      } else {
435        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
436          implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
437        return null;
438      }
439    } catch (NoSuchMethodException | InvocationTargetException e) {
440      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
441    }
442  }
443
444  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
445    RegionCoprocessor::getRegionObserver;
446
447  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
448    RegionCoprocessor::getEndpointObserver;
449
450  abstract class RegionObserverOperationWithoutResult
451    extends ObserverOperationWithoutResult<RegionObserver> {
452    public RegionObserverOperationWithoutResult() {
453      super(regionObserverGetter);
454    }
455
456    public RegionObserverOperationWithoutResult(User user) {
457      super(regionObserverGetter, user);
458    }
459
460    public RegionObserverOperationWithoutResult(boolean bypassable) {
461      super(regionObserverGetter, null, bypassable);
462    }
463
464    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
465      super(regionObserverGetter, user, bypassable);
466    }
467  }
468
469  abstract class BulkLoadObserverOperation
470    extends ObserverOperationWithoutResult<BulkLoadObserver> {
471    public BulkLoadObserverOperation(User user) {
472      super(RegionCoprocessor::getBulkLoadObserver, user);
473    }
474  }
475
476  //////////////////////////////////////////////////////////////////////////////////////////////////
477  // Observer operations
478  //////////////////////////////////////////////////////////////////////////////////////////////////
479
480  //////////////////////////////////////////////////////////////////////////////////////////////////
481  // Observer operations
482  //////////////////////////////////////////////////////////////////////////////////////////////////
483
484  /**
485   * Invoked before a region open.
486   * @throws IOException Signals that an I/O exception has occurred.
487   */
488  public void preOpen() throws IOException {
489    if (coprocEnvironments.isEmpty()) {
490      return;
491    }
492    execOperation(new RegionObserverOperationWithoutResult() {
493      @Override
494      public void call(RegionObserver observer) throws IOException {
495        observer.preOpen(this);
496      }
497    });
498  }
499
500  /**
501   * Invoked after a region open
502   */
503  public void postOpen() {
504    if (coprocEnvironments.isEmpty()) {
505      return;
506    }
507    try {
508      execOperation(new RegionObserverOperationWithoutResult() {
509        @Override
510        public void call(RegionObserver observer) throws IOException {
511          observer.postOpen(this);
512        }
513      });
514    } catch (IOException e) {
515      LOG.warn(e.toString(), e);
516    }
517  }
518
519  /**
520   * Invoked before a region is closed
521   * @param abortRequested true if the server is aborting
522   */
523  public void preClose(final boolean abortRequested) throws IOException {
524    execOperation(new RegionObserverOperationWithoutResult() {
525      @Override
526      public void call(RegionObserver observer) throws IOException {
527        observer.preClose(this, abortRequested);
528      }
529    });
530  }
531
532  /**
533   * Invoked after a region is closed
534   * @param abortRequested true if the server is aborting
535   */
536  public void postClose(final boolean abortRequested) {
537    try {
538      execOperation(new RegionObserverOperationWithoutResult() {
539        @Override
540        public void call(RegionObserver observer) throws IOException {
541          observer.postClose(this, abortRequested);
542        }
543
544        @Override
545        public void postEnvCall() {
546          shutdown(this.getEnvironment());
547        }
548      });
549    } catch (IOException e) {
550      LOG.warn(e.toString(), e);
551    }
552  }
553
554  /**
555   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
556   * available candidates.
557   * <p>
558   * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the
559   * passed in <code>candidates</code>.
560   * @param store      The store where compaction is being requested
561   * @param candidates The currently available store files
562   * @param tracker    used to track the life cycle of a compaction
563   * @param user       the user
564   */
565  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
566    final CompactionLifeCycleTracker tracker, final User user) throws IOException {
567    if (coprocEnvironments.isEmpty()) {
568      return false;
569    }
570    boolean bypassable = true;
571    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
572      @Override
573      public void call(RegionObserver observer) throws IOException {
574        observer.preCompactSelection(this, store, candidates, tracker);
575      }
576    });
577  }
578
579  /**
580   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
581   * candidates.
582   * @param store    The store where compaction is being requested
583   * @param selected The store files selected to compact
584   * @param tracker  used to track the life cycle of a compaction
585   * @param request  the compaction request
586   * @param user     the user
587   */
588  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
589    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
590    throws IOException {
591    if (coprocEnvironments.isEmpty()) {
592      return;
593    }
594    execOperation(new RegionObserverOperationWithoutResult(user) {
595      @Override
596      public void call(RegionObserver observer) throws IOException {
597        observer.postCompactSelection(this, store, selected, tracker, request);
598      }
599    });
600  }
601
602  /**
603   * Called prior to opening store scanner for compaction.
604   */
605  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
606    CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
607    if (coprocEnvironments.isEmpty()) {
608      return store.getScanInfo();
609    }
610    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
611    execOperation(new RegionObserverOperationWithoutResult(user) {
612      @Override
613      public void call(RegionObserver observer) throws IOException {
614        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
615      }
616    });
617    return builder.build();
618  }
619
620  /**
621   * Called prior to rewriting the store files selected for compaction
622   * @param store    the store being compacted
623   * @param scanner  the scanner used to read store data during compaction
624   * @param scanType type of Scan
625   * @param tracker  used to track the life cycle of a compaction
626   * @param request  the compaction request
627   * @param user     the user
628   * @return Scanner to use (cannot be null!)
629   */
630  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
631    final ScanType scanType, final CompactionLifeCycleTracker tracker,
632    final CompactionRequest request, final User user) throws IOException {
633    InternalScanner defaultResult = scanner;
634    if (coprocEnvironments.isEmpty()) {
635      return defaultResult;
636    }
637    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
638      regionObserverGetter, defaultResult, user) {
639      @Override
640      public InternalScanner call(RegionObserver observer) throws IOException {
641        InternalScanner scanner =
642          observer.preCompact(this, store, getResult(), scanType, tracker, request);
643        if (scanner == null) {
644          throw new CoprocessorException("Null Scanner return disallowed!");
645        }
646        return scanner;
647      }
648    });
649  }
650
651  /**
652   * Called after the store compaction has completed.
653   * @param store      the store being compacted
654   * @param resultFile the new store file written during compaction
655   * @param tracker    used to track the life cycle of a compaction
656   * @param request    the compaction request
657   * @param user       the user
658   */
659  public void postCompact(final HStore store, final HStoreFile resultFile,
660    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
661    throws IOException {
662    execOperation(
663      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) {
664        @Override
665        public void call(RegionObserver observer) throws IOException {
666          observer.postCompact(this, store, resultFile, tracker, request);
667        }
668      });
669  }
670
671  /**
672   * Invoked before create StoreScanner for flush.
673   */
674  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
675    throws IOException {
676    if (coprocEnvironments.isEmpty()) {
677      return store.getScanInfo();
678    }
679    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
680    execOperation(new RegionObserverOperationWithoutResult() {
681      @Override
682      public void call(RegionObserver observer) throws IOException {
683        observer.preFlushScannerOpen(this, store, builder, tracker);
684      }
685    });
686    return builder.build();
687  }
688
689  /**
690   * Invoked before a memstore flush
691   * @return Scanner to use (cannot be null!)
692   */
693  public InternalScanner preFlush(HStore store, InternalScanner scanner,
694    FlushLifeCycleTracker tracker) throws IOException {
695    if (coprocEnvironments.isEmpty()) {
696      return scanner;
697    }
698    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
699      regionObserverGetter, scanner) {
700      @Override
701      public InternalScanner call(RegionObserver observer) throws IOException {
702        InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
703        if (scanner == null) {
704          throw new CoprocessorException("Null Scanner return disallowed!");
705        }
706        return scanner;
707      }
708    });
709  }
710
711  /**
712   * Invoked before a memstore flush
713   */
714  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
715    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
716      @Override
717      public void call(RegionObserver observer) throws IOException {
718        observer.preFlush(this, tracker);
719      }
720    });
721  }
722
723  /**
724   * Invoked after a memstore flush
725   */
726  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
727    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
728      @Override
729      public void call(RegionObserver observer) throws IOException {
730        observer.postFlush(this, tracker);
731      }
732    });
733  }
734
735  /**
736   * Invoked before in memory compaction.
737   */
738  public void preMemStoreCompaction(HStore store) throws IOException {
739    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
740      @Override
741      public void call(RegionObserver observer) throws IOException {
742        observer.preMemStoreCompaction(this, store);
743      }
744    });
745  }
746
747  /**
748   * Invoked before create StoreScanner for in memory compaction.
749   */
750  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
751    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
752    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
753      @Override
754      public void call(RegionObserver observer) throws IOException {
755        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
756      }
757    });
758    return builder.build();
759  }
760
761  /**
762   * Invoked before compacting memstore.
763   */
764  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
765    throws IOException {
766    if (coprocEnvironments.isEmpty()) {
767      return scanner;
768    }
769    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
770      regionObserverGetter, scanner) {
771      @Override
772      public InternalScanner call(RegionObserver observer) throws IOException {
773        return observer.preMemStoreCompactionCompact(this, store, getResult());
774      }
775    });
776  }
777
778  /**
779   * Invoked after in memory compaction.
780   */
781  public void postMemStoreCompaction(HStore store) throws IOException {
782    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
783      @Override
784      public void call(RegionObserver observer) throws IOException {
785        observer.postMemStoreCompaction(this, store);
786      }
787    });
788  }
789
790  /**
791   * Invoked after a memstore flush
792   */
793  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
794    throws IOException {
795    if (coprocEnvironments.isEmpty()) {
796      return;
797    }
798    execOperation(new RegionObserverOperationWithoutResult() {
799      @Override
800      public void call(RegionObserver observer) throws IOException {
801        observer.postFlush(this, store, storeFile, tracker);
802      }
803    });
804  }
805
806  // RegionObserver support
807  /**
808   * Supports Coprocessor 'bypass'.
809   * @param get     the Get request
810   * @param results What to return if return is true/'bypass'.
811   * @return true if default processing should be bypassed.
812   * @exception IOException Exception
813   */
814  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
815    if (coprocEnvironments.isEmpty()) {
816      return false;
817    }
818    boolean bypassable = true;
819    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
820      @Override
821      public void call(RegionObserver observer) throws IOException {
822        observer.preGetOp(this, get, results);
823      }
824    });
825  }
826
827  /**
828   * @param get     the Get request
829   * @param results the result set
830   * @exception IOException Exception
831   */
832  public void postGet(final Get get, final List<Cell> results) throws IOException {
833    if (coprocEnvironments.isEmpty()) {
834      return;
835    }
836    execOperation(new RegionObserverOperationWithoutResult() {
837      @Override
838      public void call(RegionObserver observer) throws IOException {
839        observer.postGetOp(this, get, results);
840      }
841    });
842  }
843
844  /**
845   * Supports Coprocessor 'bypass'.
846   * @param get the Get request
847   * @return true or false to return to client if bypassing normal operation, or null otherwise
848   * @exception IOException Exception
849   */
850  public Boolean preExists(final Get get) throws IOException {
851    boolean bypassable = true;
852    boolean defaultResult = false;
853    if (coprocEnvironments.isEmpty()) {
854      return null;
855    }
856    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
857      regionObserverGetter, defaultResult, bypassable) {
858      @Override
859      public Boolean call(RegionObserver observer) throws IOException {
860        return observer.preExists(this, get, getResult());
861      }
862    });
863  }
864
865  /**
866   * @param get    the Get request
867   * @param result the result returned by the region server
868   * @return the result to return to the client
869   * @exception IOException Exception
870   */
871  public boolean postExists(final Get get, boolean result) throws IOException {
872    if (this.coprocEnvironments.isEmpty()) {
873      return result;
874    }
875    return execOperationWithResult(
876      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
877        @Override
878        public Boolean call(RegionObserver observer) throws IOException {
879          return observer.postExists(this, get, getResult());
880        }
881      });
882  }
883
884  /**
885   * Supports Coprocessor 'bypass'.
886   * @param put  The Put object
887   * @param edit The WALEdit object.
888   * @return true if default processing should be bypassed
889   * @exception IOException Exception
890   */
891  public boolean prePut(final Put put, final WALEdit edit) throws IOException {
892    if (coprocEnvironments.isEmpty()) {
893      return false;
894    }
895    boolean bypassable = true;
896    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
897      @Override
898      public void call(RegionObserver observer) throws IOException {
899        observer.prePut(this, put, edit);
900      }
901    });
902  }
903
904  /**
905   * Supports Coprocessor 'bypass'.
906   * @param mutation - the current mutation
907   * @param kv       - the current cell
908   * @param byteNow  - current timestamp in bytes
909   * @param get      - the get that could be used Note that the get only does not specify the family
910   *                 and qualifier that should be used
911   * @return true if default processing should be bypassed
912   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
913   *             Coprocessor for its needs only. Will be removed.
914   */
915  @Deprecated
916  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
917    final byte[] byteNow, final Get get) throws IOException {
918    if (coprocEnvironments.isEmpty()) {
919      return false;
920    }
921    boolean bypassable = true;
922    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
923      @Override
924      public void call(RegionObserver observer) throws IOException {
925        observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
926      }
927    });
928  }
929
930  /**
931   * @param put  The Put object
932   * @param edit The WALEdit object.
933   * @exception IOException Exception
934   */
935  public void postPut(final Put put, final WALEdit edit) throws IOException {
936    if (coprocEnvironments.isEmpty()) {
937      return;
938    }
939    execOperation(new RegionObserverOperationWithoutResult() {
940      @Override
941      public void call(RegionObserver observer) throws IOException {
942        observer.postPut(this, put, edit);
943      }
944    });
945  }
946
947  /**
948   * Supports Coprocessor 'bypass'.
949   * @param delete The Delete object
950   * @param edit   The WALEdit object.
951   * @return true if default processing should be bypassed
952   * @exception IOException Exception
953   */
954  public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
955    if (this.coprocEnvironments.isEmpty()) {
956      return false;
957    }
958    boolean bypassable = true;
959    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
960      @Override
961      public void call(RegionObserver observer) throws IOException {
962        observer.preDelete(this, delete, edit);
963      }
964    });
965  }
966
967  /**
968   * @param delete The Delete object
969   * @param edit   The WALEdit object.
970   * @exception IOException Exception
971   */
972  public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
973    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
974      @Override
975      public void call(RegionObserver observer) throws IOException {
976        observer.postDelete(this, delete, edit);
977      }
978    });
979  }
980
981  public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
982    throws IOException {
983    if (this.coprocEnvironments.isEmpty()) {
984      return;
985    }
986    execOperation(new RegionObserverOperationWithoutResult() {
987      @Override
988      public void call(RegionObserver observer) throws IOException {
989        observer.preBatchMutate(this, miniBatchOp);
990      }
991    });
992  }
993
994  public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
995    throws IOException {
996    if (this.coprocEnvironments.isEmpty()) {
997      return;
998    }
999    execOperation(new RegionObserverOperationWithoutResult() {
1000      @Override
1001      public void call(RegionObserver observer) throws IOException {
1002        observer.postBatchMutate(this, miniBatchOp);
1003      }
1004    });
1005  }
1006
1007  public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
1008    final boolean success) throws IOException {
1009    if (this.coprocEnvironments.isEmpty()) {
1010      return;
1011    }
1012    execOperation(new RegionObserverOperationWithoutResult() {
1013      @Override
1014      public void call(RegionObserver observer) throws IOException {
1015        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1016      }
1017    });
1018  }
1019
1020  /**
1021   * Supports Coprocessor 'bypass'.
1022   * @param checkAndMutate the CheckAndMutate object
1023   * @return true or false to return to client if default processing should be bypassed, or null
1024   *         otherwise
1025   * @throws IOException if an error occurred on the coprocessor
1026   */
1027  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException {
1028    boolean bypassable = true;
1029    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1030    if (coprocEnvironments.isEmpty()) {
1031      return null;
1032    }
1033    return execOperationWithResult(
1034      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1035        defaultResult, bypassable) {
1036        @Override
1037        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1038          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1039        }
1040      });
1041  }
1042
1043  /**
1044   * Supports Coprocessor 'bypass'.
1045   * @param checkAndMutate the CheckAndMutate object
1046   * @return true or false to return to client if default processing should be bypassed, or null
1047   *         otherwise
1048   * @throws IOException if an error occurred on the coprocessor
1049   */
1050  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1051    throws IOException {
1052    boolean bypassable = true;
1053    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1054    if (coprocEnvironments.isEmpty()) {
1055      return null;
1056    }
1057    return execOperationWithResult(
1058      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1059        defaultResult, bypassable) {
1060        @Override
1061        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1062          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1063        }
1064      });
1065  }
1066
1067  /**
1068   * @param checkAndMutate the CheckAndMutate object
1069   * @param result         the result returned by the checkAndMutate
1070   * @return true or false to return to client if default processing should be bypassed, or null
1071   *         otherwise
1072   * @throws IOException if an error occurred on the coprocessor
1073   */
1074  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1075    CheckAndMutateResult result) throws IOException {
1076    if (this.coprocEnvironments.isEmpty()) {
1077      return result;
1078    }
1079    return execOperationWithResult(
1080      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1081        result) {
1082        @Override
1083        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1084          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1085        }
1086      });
1087  }
1088
1089  /**
1090   * Supports Coprocessor 'bypass'.
1091   * @param append append object
1092   * @param edit   The WALEdit object.
1093   * @return result to return to client if default operation should be bypassed, null otherwise
1094   * @throws IOException if an error occurred on the coprocessor
1095   */
1096  public Result preAppend(final Append append, final WALEdit edit) throws IOException {
1097    boolean bypassable = true;
1098    Result defaultResult = null;
1099    if (this.coprocEnvironments.isEmpty()) {
1100      return defaultResult;
1101    }
1102    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1103      regionObserverGetter, defaultResult, bypassable) {
1104      @Override
1105      public Result call(RegionObserver observer) throws IOException {
1106        return observer.preAppend(this, append, edit);
1107      }
1108    });
1109  }
1110
1111  /**
1112   * Supports Coprocessor 'bypass'.
1113   * @param append append object
1114   * @return result to return to client if default operation should be bypassed, null otherwise
1115   * @throws IOException if an error occurred on the coprocessor
1116   */
1117  public Result preAppendAfterRowLock(final Append append) throws IOException {
1118    boolean bypassable = true;
1119    Result defaultResult = null;
1120    if (this.coprocEnvironments.isEmpty()) {
1121      return defaultResult;
1122    }
1123    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1124      regionObserverGetter, defaultResult, bypassable) {
1125      @Override
1126      public Result call(RegionObserver observer) throws IOException {
1127        return observer.preAppendAfterRowLock(this, append);
1128      }
1129    });
1130  }
1131
1132  /**
1133   * Supports Coprocessor 'bypass'.
1134   * @param increment increment object
1135   * @param edit      The WALEdit object.
1136   * @return result to return to client if default operation should be bypassed, null otherwise
1137   * @throws IOException if an error occurred on the coprocessor
1138   */
1139  public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
1140    boolean bypassable = true;
1141    Result defaultResult = null;
1142    if (coprocEnvironments.isEmpty()) {
1143      return defaultResult;
1144    }
1145    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1146      regionObserverGetter, defaultResult, bypassable) {
1147      @Override
1148      public Result call(RegionObserver observer) throws IOException {
1149        return observer.preIncrement(this, increment, edit);
1150      }
1151    });
1152  }
1153
1154  /**
1155   * Supports Coprocessor 'bypass'.
1156   * @param increment increment object
1157   * @return result to return to client if default operation should be bypassed, null otherwise
1158   * @throws IOException if an error occurred on the coprocessor
1159   */
1160  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1161    boolean bypassable = true;
1162    Result defaultResult = null;
1163    if (coprocEnvironments.isEmpty()) {
1164      return defaultResult;
1165    }
1166    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1167      regionObserverGetter, defaultResult, bypassable) {
1168      @Override
1169      public Result call(RegionObserver observer) throws IOException {
1170        return observer.preIncrementAfterRowLock(this, increment);
1171      }
1172    });
1173  }
1174
1175  /**
1176   * @param append Append object
1177   * @param result the result returned by the append
1178   * @param edit   The WALEdit object.
1179   * @throws IOException if an error occurred on the coprocessor
1180   */
1181  public Result postAppend(final Append append, final Result result, final WALEdit edit)
1182    throws IOException {
1183    if (this.coprocEnvironments.isEmpty()) {
1184      return result;
1185    }
1186    return execOperationWithResult(
1187      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1188        @Override
1189        public Result call(RegionObserver observer) throws IOException {
1190          return observer.postAppend(this, append, result, edit);
1191        }
1192      });
1193  }
1194
1195  /**
1196   * @param increment increment object
1197   * @param result    the result returned by postIncrement
1198   * @param edit      The WALEdit object.
1199   * @throws IOException if an error occurred on the coprocessor
1200   */
1201  public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
1202    throws IOException {
1203    if (this.coprocEnvironments.isEmpty()) {
1204      return result;
1205    }
1206    return execOperationWithResult(
1207      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1208        @Override
1209        public Result call(RegionObserver observer) throws IOException {
1210          return observer.postIncrement(this, increment, getResult(), edit);
1211        }
1212      });
1213  }
1214
1215  /**
1216   * @param scan the Scan specification
1217   * @exception IOException Exception
1218   */
1219  public void preScannerOpen(final Scan scan) throws IOException {
1220    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1221      @Override
1222      public void call(RegionObserver observer) throws IOException {
1223        observer.preScannerOpen(this, scan);
1224      }
1225    });
1226  }
1227
1228  /**
1229   * @param scan the Scan specification
1230   * @param s    the scanner
1231   * @return the scanner instance to use
1232   * @exception IOException Exception
1233   */
1234  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1235    if (this.coprocEnvironments.isEmpty()) {
1236      return s;
1237    }
1238    return execOperationWithResult(
1239      new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1240        @Override
1241        public RegionScanner call(RegionObserver observer) throws IOException {
1242          return observer.postScannerOpen(this, scan, getResult());
1243        }
1244      });
1245  }
1246
1247  /**
1248   * @param s       the scanner
1249   * @param results the result set returned by the region server
1250   * @param limit   the maximum number of results to return
1251   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1252   * @exception IOException Exception
1253   */
1254  public Boolean preScannerNext(final InternalScanner s, final List<Result> results,
1255    final int limit) throws IOException {
1256    boolean bypassable = true;
1257    boolean defaultResult = false;
1258    if (coprocEnvironments.isEmpty()) {
1259      return null;
1260    }
1261    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1262      regionObserverGetter, defaultResult, bypassable) {
1263      @Override
1264      public Boolean call(RegionObserver observer) throws IOException {
1265        return observer.preScannerNext(this, s, results, limit, getResult());
1266      }
1267    });
1268  }
1269
1270  /**
1271   * @param s       the scanner
1272   * @param results the result set returned by the region server
1273   * @param limit   the maximum number of results to return
1274   * @return 'has more' indication to give to client
1275   * @exception IOException Exception
1276   */
1277  public boolean postScannerNext(final InternalScanner s, final List<Result> results,
1278    final int limit, boolean hasMore) throws IOException {
1279    if (this.coprocEnvironments.isEmpty()) {
1280      return hasMore;
1281    }
1282    return execOperationWithResult(
1283      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1284        @Override
1285        public Boolean call(RegionObserver observer) throws IOException {
1286          return observer.postScannerNext(this, s, results, limit, getResult());
1287        }
1288      });
1289  }
1290
1291  /**
1292   * This will be called by the scan flow when the current scanned row is being filtered out by the
1293   * filter.
1294   * @param s          the scanner
1295   * @param curRowCell The cell in the current row which got filtered out
1296   * @return whether more rows are available for the scanner or not
1297   */
1298  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1299    throws IOException {
1300    // short circuit for performance
1301    boolean defaultResult = true;
1302    if (!hasCustomPostScannerFilterRow) {
1303      return defaultResult;
1304    }
1305    if (this.coprocEnvironments.isEmpty()) {
1306      return defaultResult;
1307    }
1308    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1309      regionObserverGetter, defaultResult) {
1310      @Override
1311      public Boolean call(RegionObserver observer) throws IOException {
1312        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1313      }
1314    });
1315  }
1316
1317  /**
1318   * Supports Coprocessor 'bypass'.
1319   * @param s the scanner
1320   * @return true if default behavior should be bypassed, false otherwise
1321   * @exception IOException Exception
1322   */
1323  // Should this be bypassable?
1324  public boolean preScannerClose(final InternalScanner s) throws IOException {
1325    return execOperation(
1326      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1327        @Override
1328        public void call(RegionObserver observer) throws IOException {
1329          observer.preScannerClose(this, s);
1330        }
1331      });
1332  }
1333
1334  /**
1335   * @exception IOException Exception
1336   */
1337  public void postScannerClose(final InternalScanner s) throws IOException {
1338    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1339      @Override
1340      public void call(RegionObserver observer) throws IOException {
1341        observer.postScannerClose(this, s);
1342      }
1343    });
1344  }
1345
1346  /**
1347   * Called before open store scanner for user scan.
1348   */
1349  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1350    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1351    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1352    execOperation(new RegionObserverOperationWithoutResult() {
1353      @Override
1354      public void call(RegionObserver observer) throws IOException {
1355        observer.preStoreScannerOpen(this, store, builder);
1356      }
1357    });
1358    return builder.build();
1359  }
1360
1361  /**
1362   * @param info  the RegionInfo for this region
1363   * @param edits the file of recovered edits
1364   */
1365  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1366    execOperation(
1367      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1368        @Override
1369        public void call(RegionObserver observer) throws IOException {
1370          observer.preReplayWALs(this, info, edits);
1371        }
1372      });
1373  }
1374
1375  /**
1376   * @param info  the RegionInfo for this region
1377   * @param edits the file of recovered edits
1378   * @throws IOException Exception
1379   */
1380  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1381    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1382      @Override
1383      public void call(RegionObserver observer) throws IOException {
1384        observer.postReplayWALs(this, info, edits);
1385      }
1386    });
1387  }
1388
1389  /**
1390   * Supports Coprocessor 'bypass'.
1391   * @return true if default behavior should be bypassed, false otherwise
1392   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with
1393   *             something that doesn't expose IntefaceAudience.Private classes.
1394   */
1395  @Deprecated
1396  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1397    throws IOException {
1398    return execOperation(
1399      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1400        @Override
1401        public void call(RegionObserver observer) throws IOException {
1402          observer.preWALRestore(this, info, logKey, logEdit);
1403        }
1404      });
1405  }
1406
1407  /**
1408   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with
1409   *             something that doesn't expose IntefaceAudience.Private classes.
1410   */
1411  @Deprecated
1412  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1413    throws IOException {
1414    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1415      @Override
1416      public void call(RegionObserver observer) throws IOException {
1417        observer.postWALRestore(this, info, logKey, logEdit);
1418      }
1419    });
1420  }
1421
1422  /**
1423   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1424   */
1425  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1426    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1427      @Override
1428      public void call(RegionObserver observer) throws IOException {
1429        observer.preBulkLoadHFile(this, familyPaths);
1430      }
1431    });
1432  }
1433
1434  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1435    throws IOException {
1436    return execOperation(
1437      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1438        @Override
1439        public void call(RegionObserver observer) throws IOException {
1440          observer.preCommitStoreFile(this, family, pairs);
1441        }
1442      });
1443  }
1444
1445  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath)
1446    throws IOException {
1447    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1448      @Override
1449      public void call(RegionObserver observer) throws IOException {
1450        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1451      }
1452    });
1453  }
1454
1455  /**
1456   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1457   * @param map         Map of CF to List of file paths for the final loaded files
1458   */
1459  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1460    Map<byte[], List<Path>> map) throws IOException {
1461    if (this.coprocEnvironments.isEmpty()) {
1462      return;
1463    }
1464    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1465      @Override
1466      public void call(RegionObserver observer) throws IOException {
1467        observer.postBulkLoadHFile(this, familyPaths, map);
1468      }
1469    });
1470  }
1471
1472  public void postStartRegionOperation(final Operation op) throws IOException {
1473    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1474      @Override
1475      public void call(RegionObserver observer) throws IOException {
1476        observer.postStartRegionOperation(this, op);
1477      }
1478    });
1479  }
1480
1481  public void postCloseRegionOperation(final Operation op) throws IOException {
1482    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1483      @Override
1484      public void call(RegionObserver observer) throws IOException {
1485        observer.postCloseRegionOperation(this, op);
1486      }
1487    });
1488  }
1489
1490  /**
1491   * @param fs   fileystem to read from
1492   * @param p    path to the file
1493   * @param in   {@link FSDataInputStreamWrapper}
1494   * @param size Full size of the file
1495   * @param r    original reference file. This will be not null only when reading a split file.
1496   * @return a Reader instance to use instead of the base reader if overriding default behavior,
1497   *         null otherwise
1498   */
1499  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1500    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1501    final Reference r) throws IOException {
1502    if (coprocEnvironments.isEmpty()) {
1503      return null;
1504    }
1505    return execOperationWithResult(
1506      new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1507        @Override
1508        public StoreFileReader call(RegionObserver observer) throws IOException {
1509          return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1510        }
1511      });
1512  }
1513
1514  /**
1515   * @param fs     fileystem to read from
1516   * @param p      path to the file
1517   * @param in     {@link FSDataInputStreamWrapper}
1518   * @param size   Full size of the file
1519   * @param r      original reference file. This will be not null only when reading a split file.
1520   * @param reader the base reader instance
1521   * @return The reader to use
1522   */
1523  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1524    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1525    final Reference r, final StoreFileReader reader) throws IOException {
1526    if (this.coprocEnvironments.isEmpty()) {
1527      return reader;
1528    }
1529    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>(
1530      regionObserverGetter, reader) {
1531      @Override
1532      public StoreFileReader call(RegionObserver observer) throws IOException {
1533        return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1534      }
1535    });
1536  }
1537
1538  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1539    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1540    if (this.coprocEnvironments.isEmpty()) {
1541      return cellPairs;
1542    }
1543    return execOperationWithResult(
1544      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1545        cellPairs) {
1546        @Override
1547        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1548          return observer.postIncrementBeforeWAL(this, mutation, getResult());
1549        }
1550      });
1551  }
1552
1553  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1554    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1555    if (this.coprocEnvironments.isEmpty()) {
1556      return cellPairs;
1557    }
1558    return execOperationWithResult(
1559      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1560        cellPairs) {
1561        @Override
1562        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1563          return observer.postAppendBeforeWAL(this, mutation, getResult());
1564        }
1565      });
1566  }
1567
1568  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1569    if (this.coprocEnvironments.isEmpty()) {
1570      return;
1571    }
1572    execOperation(new RegionObserverOperationWithoutResult() {
1573      @Override
1574      public void call(RegionObserver observer) throws IOException {
1575        observer.preWALAppend(this, key, edit);
1576      }
1577    });
1578  }
1579
1580  public Message preEndpointInvocation(final Service service, final String methodName,
1581    Message request) throws IOException {
1582    if (coprocEnvironments.isEmpty()) {
1583      return request;
1584    }
1585    return execOperationWithResult(
1586      new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) {
1587        @Override
1588        public Message call(EndpointObserver observer) throws IOException {
1589          return observer.preEndpointInvocation(this, service, methodName, getResult());
1590        }
1591      });
1592  }
1593
1594  public void postEndpointInvocation(final Service service, final String methodName,
1595    final Message request, final Message.Builder responseBuilder) throws IOException {
1596    execOperation(coprocEnvironments.isEmpty()
1597      ? null
1598      : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1599        @Override
1600        public void call(EndpointObserver observer) throws IOException {
1601          observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1602        }
1603      });
1604  }
1605
1606  /**
1607   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1608   */
1609  @Deprecated
1610  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1611    if (this.coprocEnvironments.isEmpty()) {
1612      return result;
1613    }
1614    return execOperationWithResult(
1615      new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) {
1616        @Override
1617        public DeleteTracker call(RegionObserver observer) throws IOException {
1618          return observer.postInstantiateDeleteTracker(this, getResult());
1619        }
1620      });
1621  }
1622
1623  /////////////////////////////////////////////////////////////////////////////////////////////////
1624  // BulkLoadObserver hooks
1625  /////////////////////////////////////////////////////////////////////////////////////////////////
1626  public void prePrepareBulkLoad(User user) throws IOException {
1627    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1628      @Override
1629      protected void call(BulkLoadObserver observer) throws IOException {
1630        observer.prePrepareBulkLoad(this);
1631      }
1632    });
1633  }
1634
1635  public void preCleanupBulkLoad(User user) throws IOException {
1636    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1637      @Override
1638      protected void call(BulkLoadObserver observer) throws IOException {
1639        observer.preCleanupBulkLoad(this);
1640      }
1641    });
1642  }
1643}