001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.protobuf.Message;
021import com.google.protobuf.Service;
022import java.io.IOException;
023import java.lang.reflect.InvocationTargetException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.UUID;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.RawCellBuilder;
038import org.apache.hadoop.hbase.RawCellBuilderFactory;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.SharedConnection;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.CheckAndMutate;
043import org.apache.hadoop.hbase.client.CheckAndMutateResult;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.Delete;
046import org.apache.hadoop.hbase.client.Get;
047import org.apache.hadoop.hbase.client.Increment;
048import org.apache.hadoop.hbase.client.Mutation;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
055import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
056import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
058import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
059import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
060import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
061import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
062import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
063import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
064import org.apache.hadoop.hbase.coprocessor.ObserverContext;
065import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
066import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
067import org.apache.hadoop.hbase.coprocessor.RegionObserver;
068import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
069import org.apache.hadoop.hbase.io.Reference;
070import org.apache.hadoop.hbase.io.hfile.CacheConfig;
071import org.apache.hadoop.hbase.metrics.MetricRegistry;
072import org.apache.hadoop.hbase.regionserver.Region.Operation;
073import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
074import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
075import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
076import org.apache.hadoop.hbase.security.User;
077import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.wal.WALEdit;
080import org.apache.hadoop.hbase.wal.WALKey;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084
085import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
086import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
087
088/**
089 * Implements the coprocessor environment and runtime support for coprocessors loaded within a
090 * {@link Region}.
091 */
092@InterfaceAudience.Private
093public class RegionCoprocessorHost
094  extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
095
096  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
097  // The shared data map
098  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
099    new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
100      AbstractReferenceMap.ReferenceStrength.WEAK);
101
102  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
103  private final boolean hasCustomPostScannerFilterRow;
104
105  /*
106   * Whether any configured CPs override postScannerFilterRow hook
107   */
108  public boolean hasCustomPostScannerFilterRow() {
109    return hasCustomPostScannerFilterRow;
110  }
111
112  /**
113   * Encapsulation of the environment of each coprocessor
114   */
115  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
116    implements RegionCoprocessorEnvironment {
117    private Region region;
118    ConcurrentMap<String, Object> sharedData;
119    private final MetricRegistry metricRegistry;
120    private final RegionServerServices services;
121
122    /**
123     * Constructor
124     * @param impl     the coprocessor instance
125     * @param priority chaining priority
126     */
127    public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq,
128      final Configuration conf, final Region region, final RegionServerServices services,
129      final ConcurrentMap<String, Object> sharedData) {
130      super(impl, priority, seq, conf);
131      this.region = region;
132      this.sharedData = sharedData;
133      this.services = services;
134      this.metricRegistry =
135        MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
136    }
137
138    /** Returns the region */
139    @Override
140    public Region getRegion() {
141      return region;
142    }
143
144    @Override
145    public OnlineRegions getOnlineRegions() {
146      return this.services;
147    }
148
149    @Override
150    public Connection getConnection() {
151      // Mocks may have services as null at test time.
152      return services != null ? new SharedConnection(services.getConnection()) : null;
153    }
154
155    @Override
156    public Connection createConnection(Configuration conf) throws IOException {
157      return services != null ? this.services.createConnection(conf) : null;
158    }
159
160    @Override
161    public ServerName getServerName() {
162      return services != null ? services.getServerName() : null;
163    }
164
165    @Override
166    public void shutdown() {
167      super.shutdown();
168      MetricsCoprocessor.removeRegistry(this.metricRegistry);
169    }
170
171    @Override
172    public ConcurrentMap<String, Object> getSharedData() {
173      return sharedData;
174    }
175
176    @Override
177    public RegionInfo getRegionInfo() {
178      return region.getRegionInfo();
179    }
180
181    @Override
182    public MetricRegistry getMetricRegistryForRegionServer() {
183      return metricRegistry;
184    }
185
186    @Override
187    public RawCellBuilder getCellBuilder() {
188      // We always do a DEEP_COPY only
189      return RawCellBuilderFactory.create();
190    }
191  }
192
193  /**
194   * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors
195   * only. Temporary hack until Core Coprocessors are integrated into Core.
196   */
197  private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment
198    implements HasRegionServerServices {
199    private final RegionServerServices rsServices;
200
201    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
202      final int seq, final Configuration conf, final Region region,
203      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
204      super(impl, priority, seq, conf, region, services, sharedData);
205      this.rsServices = services;
206    }
207
208    /**
209     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
210     *         consumption.
211     */
212    @Override
213    public RegionServerServices getRegionServerServices() {
214      return this.rsServices;
215    }
216  }
217
218  static class TableCoprocessorAttribute {
219    private Path path;
220    private String className;
221    private int priority;
222    private Configuration conf;
223
224    public TableCoprocessorAttribute(Path path, String className, int priority,
225      Configuration conf) {
226      this.path = path;
227      this.className = className;
228      this.priority = priority;
229      this.conf = conf;
230    }
231
232    public Path getPath() {
233      return path;
234    }
235
236    public String getClassName() {
237      return className;
238    }
239
240    public int getPriority() {
241      return priority;
242    }
243
244    public Configuration getConf() {
245      return conf;
246    }
247  }
248
249  /** The region server services */
250  RegionServerServices rsServices;
251  /** The region */
252  HRegion region;
253
254  /**
255   * Constructor
256   * @param region     the region
257   * @param rsServices interface to available region server functionality
258   * @param conf       the configuration
259   */
260  @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization
261  public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices,
262    final Configuration conf) {
263    super(rsServices);
264    this.conf = conf;
265    this.rsServices = rsServices;
266    this.region = region;
267    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
268
269    // load system default cp's from configuration.
270    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
271
272    // load system default cp's for user tables from configuration.
273    if (!region.getRegionInfo().getTable().isSystemTable()) {
274      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
275    }
276
277    // load Coprocessor From HDFS
278    loadTableCoprocessors(conf);
279
280    // now check whether any coprocessor implements postScannerFilterRow
281    boolean hasCustomPostScannerFilterRow = false;
282    out: for (RegionCoprocessorEnvironment env : coprocEnvironments) {
283      if (env.getInstance() instanceof RegionObserver) {
284        Class<?> clazz = env.getInstance().getClass();
285        for (;;) {
286          if (clazz == Object.class) {
287            // we dont need to look postScannerFilterRow into Object class
288            break; // break the inner loop
289          }
290          try {
291            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
292              InternalScanner.class, Cell.class, boolean.class);
293            // this coprocessor has a custom version of postScannerFilterRow
294            hasCustomPostScannerFilterRow = true;
295            break out;
296          } catch (NoSuchMethodException ignore) {
297          }
298          // the deprecated signature still exists
299          try {
300            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
301              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
302            // this coprocessor has a custom version of postScannerFilterRow
303            hasCustomPostScannerFilterRow = true;
304            break out;
305          } catch (NoSuchMethodException ignore) {
306          }
307          clazz = clazz.getSuperclass();
308        }
309      }
310    }
311    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
312  }
313
314  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
315    TableDescriptor htd) {
316    return htd.getCoprocessorDescriptors().stream().map(cp -> {
317      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
318      Configuration ourConf;
319      if (!cp.getProperties().isEmpty()) {
320        // do an explicit deep copy of the passed configuration
321        ourConf = new Configuration(false);
322        HBaseConfiguration.merge(ourConf, conf);
323        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
324      } else {
325        ourConf = conf;
326      }
327      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
328    }).collect(Collectors.toList());
329  }
330
331  /**
332   * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception
333   * if there is a problem. nnn
334   */
335  public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd)
336    throws IOException {
337    String pathPrefix = UUID.randomUUID().toString();
338    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) {
339      if (attr.getPriority() < 0) {
340        throw new IOException(
341          "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0");
342      }
343      ClassLoader old = Thread.currentThread().getContextClassLoader();
344      try {
345        ClassLoader cl;
346        if (attr.getPath() != null) {
347          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
348            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
349        } else {
350          cl = CoprocessorHost.class.getClassLoader();
351        }
352        Thread.currentThread().setContextClassLoader(cl);
353        if (cl instanceof CoprocessorClassLoader) {
354          String[] includedClassPrefixes = null;
355          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
356            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
357            includedClassPrefixes = prefixes.split(";");
358          }
359          ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes);
360        } else {
361          cl.loadClass(attr.getClassName());
362        }
363      } catch (ClassNotFoundException e) {
364        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
365      } finally {
366        Thread.currentThread().setContextClassLoader(old);
367      }
368    }
369  }
370
371  void loadTableCoprocessors(final Configuration conf) {
372    boolean coprocessorsEnabled =
373      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
374    boolean tableCoprocessorsEnabled =
375      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
376    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
377      return;
378    }
379
380    // scan the table attributes for coprocessor load specifications
381    // initialize the coprocessors
382    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
383    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf,
384      region.getTableDescriptor())) {
385      // Load encompasses classloading and coprocessor initialization
386      try {
387        RegionCoprocessorEnvironment env =
388          load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf());
389        if (env == null) {
390          continue;
391        }
392        configured.add(env);
393        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of "
394          + region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
395      } catch (Throwable t) {
396        // Coprocessor failed to load, do we abort on error?
397        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
398          abortServer(attr.getClassName(), t);
399        } else {
400          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
401        }
402      }
403    }
404    // add together to coprocessor set for COW efficiency
405    coprocEnvironments.addAll(configured);
406  }
407
408  @Override
409  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
410    Configuration conf) {
411    // If coprocessor exposes any services, register them.
412    for (Service service : instance.getServices()) {
413      region.registerService(service);
414    }
415    ConcurrentMap<String, Object> classData;
416    // make sure only one thread can add maps
417    synchronized (SHARED_DATA_MAP) {
418      // as long as at least one RegionEnvironment holds on to its classData it will
419      // remain in this map
420      classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
421        k -> new ConcurrentHashMap<>());
422    }
423    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
424    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
425      ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices,
426        classData)
427      : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
428  }
429
430  @Override
431  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
432    throws InstantiationException, IllegalAccessException {
433    try {
434      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
435        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
436      } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
437        // For backward compatibility with old CoprocessorService impl which don't extend
438        // RegionCoprocessor.
439        CoprocessorService cs;
440        cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance();
441        return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs);
442      } else {
443        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
444          implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
445        return null;
446      }
447    } catch (NoSuchMethodException | InvocationTargetException e) {
448      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
449    }
450  }
451
452  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
453    RegionCoprocessor::getRegionObserver;
454
455  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
456    RegionCoprocessor::getEndpointObserver;
457
458  abstract class RegionObserverOperationWithoutResult
459    extends ObserverOperationWithoutResult<RegionObserver> {
460    public RegionObserverOperationWithoutResult() {
461      super(regionObserverGetter);
462    }
463
464    public RegionObserverOperationWithoutResult(User user) {
465      super(regionObserverGetter, user);
466    }
467
468    public RegionObserverOperationWithoutResult(boolean bypassable) {
469      super(regionObserverGetter, null, bypassable);
470    }
471
472    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
473      super(regionObserverGetter, user, bypassable);
474    }
475  }
476
477  abstract class BulkLoadObserverOperation
478    extends ObserverOperationWithoutResult<BulkLoadObserver> {
479    public BulkLoadObserverOperation(User user) {
480      super(RegionCoprocessor::getBulkLoadObserver, user);
481    }
482  }
483
484  //////////////////////////////////////////////////////////////////////////////////////////////////
485  // Observer operations
486  //////////////////////////////////////////////////////////////////////////////////////////////////
487
488  //////////////////////////////////////////////////////////////////////////////////////////////////
489  // Observer operations
490  //////////////////////////////////////////////////////////////////////////////////////////////////
491
492  /**
493   * Invoked before a region open.
494   * @throws IOException Signals that an I/O exception has occurred.
495   */
496  public void preOpen() throws IOException {
497    if (coprocEnvironments.isEmpty()) {
498      return;
499    }
500    execOperation(new RegionObserverOperationWithoutResult() {
501      @Override
502      public void call(RegionObserver observer) throws IOException {
503        observer.preOpen(this);
504      }
505    });
506  }
507
508  /**
509   * Invoked after a region open
510   */
511  public void postOpen() {
512    if (coprocEnvironments.isEmpty()) {
513      return;
514    }
515    try {
516      execOperation(new RegionObserverOperationWithoutResult() {
517        @Override
518        public void call(RegionObserver observer) throws IOException {
519          observer.postOpen(this);
520        }
521      });
522    } catch (IOException e) {
523      LOG.warn(e.toString(), e);
524    }
525  }
526
527  /**
528   * Invoked before a region is closed
529   * @param abortRequested true if the server is aborting
530   */
531  public void preClose(final boolean abortRequested) throws IOException {
532    execOperation(new RegionObserverOperationWithoutResult() {
533      @Override
534      public void call(RegionObserver observer) throws IOException {
535        observer.preClose(this, abortRequested);
536      }
537    });
538  }
539
540  /**
541   * Invoked after a region is closed
542   * @param abortRequested true if the server is aborting
543   */
544  public void postClose(final boolean abortRequested) {
545    try {
546      execOperation(new RegionObserverOperationWithoutResult() {
547        @Override
548        public void call(RegionObserver observer) throws IOException {
549          observer.postClose(this, abortRequested);
550        }
551
552        @Override
553        public void postEnvCall() {
554          shutdown(this.getEnvironment());
555        }
556      });
557    } catch (IOException e) {
558      LOG.warn(e.toString(), e);
559    }
560  }
561
562  /**
563   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
564   * available candidates.
565   * <p>
566   * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the
567   * passed in <code>candidates</code>.
568   * @param store      The store where compaction is being requested
569   * @param candidates The currently available store files
570   * @param tracker    used to track the life cycle of a compaction
571   * @param user       the user n
572   */
573  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
574    final CompactionLifeCycleTracker tracker, final User user) throws IOException {
575    if (coprocEnvironments.isEmpty()) {
576      return false;
577    }
578    boolean bypassable = true;
579    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
580      @Override
581      public void call(RegionObserver observer) throws IOException {
582        observer.preCompactSelection(this, store, candidates, tracker);
583      }
584    });
585  }
586
587  /**
588   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
589   * candidates.
590   * @param store    The store where compaction is being requested
591   * @param selected The store files selected to compact
592   * @param tracker  used to track the life cycle of a compaction
593   * @param request  the compaction request
594   * @param user     the user
595   */
596  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
597    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
598    throws IOException {
599    if (coprocEnvironments.isEmpty()) {
600      return;
601    }
602    execOperation(new RegionObserverOperationWithoutResult(user) {
603      @Override
604      public void call(RegionObserver observer) throws IOException {
605        observer.postCompactSelection(this, store, selected, tracker, request);
606      }
607    });
608  }
609
610  /**
611   * Called prior to opening store scanner for compaction.
612   */
613  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
614    CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
615    if (coprocEnvironments.isEmpty()) {
616      return store.getScanInfo();
617    }
618    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
619    execOperation(new RegionObserverOperationWithoutResult(user) {
620      @Override
621      public void call(RegionObserver observer) throws IOException {
622        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
623      }
624    });
625    return builder.build();
626  }
627
628  /**
629   * Called prior to rewriting the store files selected for compaction
630   * @param store    the store being compacted
631   * @param scanner  the scanner used to read store data during compaction
632   * @param scanType type of Scan
633   * @param tracker  used to track the life cycle of a compaction
634   * @param request  the compaction request
635   * @param user     the user
636   * @return Scanner to use (cannot be null!) n
637   */
638  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
639    final ScanType scanType, final CompactionLifeCycleTracker tracker,
640    final CompactionRequest request, final User user) throws IOException {
641    InternalScanner defaultResult = scanner;
642    if (coprocEnvironments.isEmpty()) {
643      return defaultResult;
644    }
645    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
646      regionObserverGetter, defaultResult, user) {
647      @Override
648      public InternalScanner call(RegionObserver observer) throws IOException {
649        InternalScanner scanner =
650          observer.preCompact(this, store, getResult(), scanType, tracker, request);
651        if (scanner == null) {
652          throw new CoprocessorException("Null Scanner return disallowed!");
653        }
654        return scanner;
655      }
656    });
657  }
658
659  /**
660   * Called after the store compaction has completed.
661   * @param store      the store being compacted
662   * @param resultFile the new store file written during compaction
663   * @param tracker    used to track the life cycle of a compaction
664   * @param request    the compaction request
665   * @param user       the user n
666   */
667  public void postCompact(final HStore store, final HStoreFile resultFile,
668    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
669    throws IOException {
670    execOperation(
671      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) {
672        @Override
673        public void call(RegionObserver observer) throws IOException {
674          observer.postCompact(this, store, resultFile, tracker, request);
675        }
676      });
677  }
678
679  /**
680   * Invoked before create StoreScanner for flush.
681   */
682  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
683    throws IOException {
684    if (coprocEnvironments.isEmpty()) {
685      return store.getScanInfo();
686    }
687    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
688    execOperation(new RegionObserverOperationWithoutResult() {
689      @Override
690      public void call(RegionObserver observer) throws IOException {
691        observer.preFlushScannerOpen(this, store, builder, tracker);
692      }
693    });
694    return builder.build();
695  }
696
697  /**
698   * Invoked before a memstore flush
699   * @return Scanner to use (cannot be null!) n
700   */
701  public InternalScanner preFlush(HStore store, InternalScanner scanner,
702    FlushLifeCycleTracker tracker) throws IOException {
703    if (coprocEnvironments.isEmpty()) {
704      return scanner;
705    }
706    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
707      regionObserverGetter, scanner) {
708      @Override
709      public InternalScanner call(RegionObserver observer) throws IOException {
710        InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
711        if (scanner == null) {
712          throw new CoprocessorException("Null Scanner return disallowed!");
713        }
714        return scanner;
715      }
716    });
717  }
718
719  /**
720   * Invoked before a memstore flush n
721   */
722  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
723    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
724      @Override
725      public void call(RegionObserver observer) throws IOException {
726        observer.preFlush(this, tracker);
727      }
728    });
729  }
730
731  /**
732   * Invoked after a memstore flush n
733   */
734  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
735    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
736      @Override
737      public void call(RegionObserver observer) throws IOException {
738        observer.postFlush(this, tracker);
739      }
740    });
741  }
742
743  /**
744   * Invoked before in memory compaction.
745   */
746  public void preMemStoreCompaction(HStore store) throws IOException {
747    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
748      @Override
749      public void call(RegionObserver observer) throws IOException {
750        observer.preMemStoreCompaction(this, store);
751      }
752    });
753  }
754
755  /**
756   * Invoked before create StoreScanner for in memory compaction.
757   */
758  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
759    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
760    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
761      @Override
762      public void call(RegionObserver observer) throws IOException {
763        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
764      }
765    });
766    return builder.build();
767  }
768
769  /**
770   * Invoked before compacting memstore.
771   */
772  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
773    throws IOException {
774    if (coprocEnvironments.isEmpty()) {
775      return scanner;
776    }
777    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
778      regionObserverGetter, scanner) {
779      @Override
780      public InternalScanner call(RegionObserver observer) throws IOException {
781        return observer.preMemStoreCompactionCompact(this, store, getResult());
782      }
783    });
784  }
785
786  /**
787   * Invoked after in memory compaction.
788   */
789  public void postMemStoreCompaction(HStore store) throws IOException {
790    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
791      @Override
792      public void call(RegionObserver observer) throws IOException {
793        observer.postMemStoreCompaction(this, store);
794      }
795    });
796  }
797
798  /**
799   * Invoked after a memstore flush n
800   */
801  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
802    throws IOException {
803    if (coprocEnvironments.isEmpty()) {
804      return;
805    }
806    execOperation(new RegionObserverOperationWithoutResult() {
807      @Override
808      public void call(RegionObserver observer) throws IOException {
809        observer.postFlush(this, store, storeFile, tracker);
810      }
811    });
812  }
813
814  // RegionObserver support
815  /**
816   * Supports Coprocessor 'bypass'.
817   * @param get     the Get request
818   * @param results What to return if return is true/'bypass'.
819   * @return true if default processing should be bypassed.
820   * @exception IOException Exception
821   */
822  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
823    if (coprocEnvironments.isEmpty()) {
824      return false;
825    }
826    boolean bypassable = true;
827    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
828      @Override
829      public void call(RegionObserver observer) throws IOException {
830        observer.preGetOp(this, get, results);
831      }
832    });
833  }
834
835  /**
836   * @param get     the Get request
837   * @param results the result set
838   * @exception IOException Exception
839   */
840  public void postGet(final Get get, final List<Cell> results) throws IOException {
841    if (coprocEnvironments.isEmpty()) {
842      return;
843    }
844    execOperation(new RegionObserverOperationWithoutResult() {
845      @Override
846      public void call(RegionObserver observer) throws IOException {
847        observer.postGetOp(this, get, results);
848      }
849    });
850  }
851
852  /**
853   * Supports Coprocessor 'bypass'.
854   * @param get the Get request
855   * @return true or false to return to client if bypassing normal operation, or null otherwise
856   * @exception IOException Exception
857   */
858  public Boolean preExists(final Get get) throws IOException {
859    boolean bypassable = true;
860    boolean defaultResult = false;
861    if (coprocEnvironments.isEmpty()) {
862      return null;
863    }
864    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
865      regionObserverGetter, defaultResult, bypassable) {
866      @Override
867      public Boolean call(RegionObserver observer) throws IOException {
868        return observer.preExists(this, get, getResult());
869      }
870    });
871  }
872
873  /**
874   * @param get    the Get request
875   * @param result the result returned by the region server
876   * @return the result to return to the client
877   * @exception IOException Exception
878   */
879  public boolean postExists(final Get get, boolean result) throws IOException {
880    if (this.coprocEnvironments.isEmpty()) {
881      return result;
882    }
883    return execOperationWithResult(
884      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
885        @Override
886        public Boolean call(RegionObserver observer) throws IOException {
887          return observer.postExists(this, get, getResult());
888        }
889      });
890  }
891
892  /**
893   * Supports Coprocessor 'bypass'.
894   * @param put  The Put object
895   * @param edit The WALEdit object.
896   * @return true if default processing should be bypassed
897   * @exception IOException Exception
898   */
899  public boolean prePut(final Put put, final WALEdit edit) throws IOException {
900    if (coprocEnvironments.isEmpty()) {
901      return false;
902    }
903    boolean bypassable = true;
904    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
905      @Override
906      public void call(RegionObserver observer) throws IOException {
907        observer.prePut(this, put, edit);
908      }
909    });
910  }
911
912  /**
913   * Supports Coprocessor 'bypass'.
914   * @param mutation - the current mutation
915   * @param kv       - the current cell
916   * @param byteNow  - current timestamp in bytes
917   * @param get      - the get that could be used Note that the get only does not specify the family
918   *                 and qualifier that should be used
919   * @return true if default processing should be bypassed
920   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
921   *             Coprocessor for its needs only. Will be removed.
922   */
923  @Deprecated
924  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
925    final byte[] byteNow, final Get get) throws IOException {
926    if (coprocEnvironments.isEmpty()) {
927      return false;
928    }
929    boolean bypassable = true;
930    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
931      @Override
932      public void call(RegionObserver observer) throws IOException {
933        observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
934      }
935    });
936  }
937
938  /**
939   * @param put  The Put object
940   * @param edit The WALEdit object.
941   * @exception IOException Exception
942   */
943  public void postPut(final Put put, final WALEdit edit) throws IOException {
944    if (coprocEnvironments.isEmpty()) {
945      return;
946    }
947    execOperation(new RegionObserverOperationWithoutResult() {
948      @Override
949      public void call(RegionObserver observer) throws IOException {
950        observer.postPut(this, put, edit);
951      }
952    });
953  }
954
955  /**
956   * Supports Coprocessor 'bypass'.
957   * @param delete The Delete object
958   * @param edit   The WALEdit object.
959   * @return true if default processing should be bypassed
960   * @exception IOException Exception
961   */
962  public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
963    if (this.coprocEnvironments.isEmpty()) {
964      return false;
965    }
966    boolean bypassable = true;
967    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
968      @Override
969      public void call(RegionObserver observer) throws IOException {
970        observer.preDelete(this, delete, edit);
971      }
972    });
973  }
974
975  /**
976   * @param delete The Delete object
977   * @param edit   The WALEdit object.
978   * @exception IOException Exception
979   */
980  public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
981    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
982      @Override
983      public void call(RegionObserver observer) throws IOException {
984        observer.postDelete(this, delete, edit);
985      }
986    });
987  }
988
989  public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
990    throws IOException {
991    if (this.coprocEnvironments.isEmpty()) {
992      return;
993    }
994    execOperation(new RegionObserverOperationWithoutResult() {
995      @Override
996      public void call(RegionObserver observer) throws IOException {
997        observer.preBatchMutate(this, miniBatchOp);
998      }
999    });
1000  }
1001
1002  public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
1003    throws IOException {
1004    if (this.coprocEnvironments.isEmpty()) {
1005      return;
1006    }
1007    execOperation(new RegionObserverOperationWithoutResult() {
1008      @Override
1009      public void call(RegionObserver observer) throws IOException {
1010        observer.postBatchMutate(this, miniBatchOp);
1011      }
1012    });
1013  }
1014
1015  public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
1016    final boolean success) throws IOException {
1017    if (this.coprocEnvironments.isEmpty()) {
1018      return;
1019    }
1020    execOperation(new RegionObserverOperationWithoutResult() {
1021      @Override
1022      public void call(RegionObserver observer) throws IOException {
1023        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1024      }
1025    });
1026  }
1027
1028  /**
1029   * Supports Coprocessor 'bypass'.
1030   * @param checkAndMutate the CheckAndMutate object
1031   * @return true or false to return to client if default processing should be bypassed, or null
1032   *         otherwise
1033   * @throws IOException if an error occurred on the coprocessor
1034   */
1035  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException {
1036    boolean bypassable = true;
1037    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1038    if (coprocEnvironments.isEmpty()) {
1039      return null;
1040    }
1041    return execOperationWithResult(
1042      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1043        defaultResult, bypassable) {
1044        @Override
1045        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1046          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1047        }
1048      });
1049  }
1050
1051  /**
1052   * Supports Coprocessor 'bypass'.
1053   * @param checkAndMutate the CheckAndMutate object
1054   * @return true or false to return to client if default processing should be bypassed, or null
1055   *         otherwise
1056   * @throws IOException if an error occurred on the coprocessor
1057   */
1058  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1059    throws IOException {
1060    boolean bypassable = true;
1061    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1062    if (coprocEnvironments.isEmpty()) {
1063      return null;
1064    }
1065    return execOperationWithResult(
1066      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1067        defaultResult, bypassable) {
1068        @Override
1069        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1070          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1071        }
1072      });
1073  }
1074
1075  /**
1076   * @param checkAndMutate the CheckAndMutate object
1077   * @param result         the result returned by the checkAndMutate
1078   * @return true or false to return to client if default processing should be bypassed, or null
1079   *         otherwise
1080   * @throws IOException if an error occurred on the coprocessor
1081   */
1082  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1083    CheckAndMutateResult result) throws IOException {
1084    if (this.coprocEnvironments.isEmpty()) {
1085      return result;
1086    }
1087    return execOperationWithResult(
1088      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1089        result) {
1090        @Override
1091        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1092          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1093        }
1094      });
1095  }
1096
1097  /**
1098   * Supports Coprocessor 'bypass'.
1099   * @param append append object
1100   * @param edit   The WALEdit object.
1101   * @return result to return to client if default operation should be bypassed, null otherwise
1102   * @throws IOException if an error occurred on the coprocessor
1103   */
1104  public Result preAppend(final Append append, final WALEdit edit) throws IOException {
1105    boolean bypassable = true;
1106    Result defaultResult = null;
1107    if (this.coprocEnvironments.isEmpty()) {
1108      return defaultResult;
1109    }
1110    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1111      regionObserverGetter, defaultResult, bypassable) {
1112      @Override
1113      public Result call(RegionObserver observer) throws IOException {
1114        return observer.preAppend(this, append, edit);
1115      }
1116    });
1117  }
1118
1119  /**
1120   * Supports Coprocessor 'bypass'.
1121   * @param append append object
1122   * @return result to return to client if default operation should be bypassed, null otherwise
1123   * @throws IOException if an error occurred on the coprocessor
1124   */
1125  public Result preAppendAfterRowLock(final Append append) throws IOException {
1126    boolean bypassable = true;
1127    Result defaultResult = null;
1128    if (this.coprocEnvironments.isEmpty()) {
1129      return defaultResult;
1130    }
1131    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1132      regionObserverGetter, defaultResult, bypassable) {
1133      @Override
1134      public Result call(RegionObserver observer) throws IOException {
1135        return observer.preAppendAfterRowLock(this, append);
1136      }
1137    });
1138  }
1139
1140  /**
1141   * Supports Coprocessor 'bypass'.
1142   * @param increment increment object
1143   * @param edit      The WALEdit object.
1144   * @return result to return to client if default operation should be bypassed, null otherwise
1145   * @throws IOException if an error occurred on the coprocessor
1146   */
1147  public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
1148    boolean bypassable = true;
1149    Result defaultResult = null;
1150    if (coprocEnvironments.isEmpty()) {
1151      return defaultResult;
1152    }
1153    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1154      regionObserverGetter, defaultResult, bypassable) {
1155      @Override
1156      public Result call(RegionObserver observer) throws IOException {
1157        return observer.preIncrement(this, increment, edit);
1158      }
1159    });
1160  }
1161
1162  /**
1163   * Supports Coprocessor 'bypass'.
1164   * @param increment increment object
1165   * @return result to return to client if default operation should be bypassed, null otherwise
1166   * @throws IOException if an error occurred on the coprocessor
1167   */
1168  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1169    boolean bypassable = true;
1170    Result defaultResult = null;
1171    if (coprocEnvironments.isEmpty()) {
1172      return defaultResult;
1173    }
1174    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1175      regionObserverGetter, defaultResult, bypassable) {
1176      @Override
1177      public Result call(RegionObserver observer) throws IOException {
1178        return observer.preIncrementAfterRowLock(this, increment);
1179      }
1180    });
1181  }
1182
1183  /**
1184   * @param append Append object
1185   * @param result the result returned by the append
1186   * @param edit   The WALEdit object.
1187   * @throws IOException if an error occurred on the coprocessor
1188   */
1189  public Result postAppend(final Append append, final Result result, final WALEdit edit)
1190    throws IOException {
1191    if (this.coprocEnvironments.isEmpty()) {
1192      return result;
1193    }
1194    return execOperationWithResult(
1195      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1196        @Override
1197        public Result call(RegionObserver observer) throws IOException {
1198          return observer.postAppend(this, append, result, edit);
1199        }
1200      });
1201  }
1202
1203  /**
1204   * @param increment increment object
1205   * @param result    the result returned by postIncrement
1206   * @param edit      The WALEdit object.
1207   * @throws IOException if an error occurred on the coprocessor
1208   */
1209  public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
1210    throws IOException {
1211    if (this.coprocEnvironments.isEmpty()) {
1212      return result;
1213    }
1214    return execOperationWithResult(
1215      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1216        @Override
1217        public Result call(RegionObserver observer) throws IOException {
1218          return observer.postIncrement(this, increment, getResult(), edit);
1219        }
1220      });
1221  }
1222
1223  /**
1224   * @param scan the Scan specification
1225   * @exception IOException Exception
1226   */
1227  public void preScannerOpen(final Scan scan) throws IOException {
1228    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1229      @Override
1230      public void call(RegionObserver observer) throws IOException {
1231        observer.preScannerOpen(this, scan);
1232      }
1233    });
1234  }
1235
1236  /**
1237   * @param scan the Scan specification
1238   * @param s    the scanner
1239   * @return the scanner instance to use
1240   * @exception IOException Exception
1241   */
1242  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1243    if (this.coprocEnvironments.isEmpty()) {
1244      return s;
1245    }
1246    return execOperationWithResult(
1247      new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1248        @Override
1249        public RegionScanner call(RegionObserver observer) throws IOException {
1250          return observer.postScannerOpen(this, scan, getResult());
1251        }
1252      });
1253  }
1254
1255  /**
1256   * @param s       the scanner
1257   * @param results the result set returned by the region server
1258   * @param limit   the maximum number of results to return
1259   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1260   * @exception IOException Exception
1261   */
1262  public Boolean preScannerNext(final InternalScanner s, final List<Result> results,
1263    final int limit) throws IOException {
1264    boolean bypassable = true;
1265    boolean defaultResult = false;
1266    if (coprocEnvironments.isEmpty()) {
1267      return null;
1268    }
1269    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1270      regionObserverGetter, defaultResult, bypassable) {
1271      @Override
1272      public Boolean call(RegionObserver observer) throws IOException {
1273        return observer.preScannerNext(this, s, results, limit, getResult());
1274      }
1275    });
1276  }
1277
1278  /**
1279   * @param s       the scanner
1280   * @param results the result set returned by the region server
1281   * @param limit   the maximum number of results to return n * @return 'has more' indication to
1282   *                give to client
1283   * @exception IOException Exception
1284   */
1285  public boolean postScannerNext(final InternalScanner s, final List<Result> results,
1286    final int limit, boolean hasMore) throws IOException {
1287    if (this.coprocEnvironments.isEmpty()) {
1288      return hasMore;
1289    }
1290    return execOperationWithResult(
1291      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1292        @Override
1293        public Boolean call(RegionObserver observer) throws IOException {
1294          return observer.postScannerNext(this, s, results, limit, getResult());
1295        }
1296      });
1297  }
1298
1299  /**
1300   * This will be called by the scan flow when the current scanned row is being filtered out by the
1301   * filter.
1302   * @param s          the scanner
1303   * @param curRowCell The cell in the current row which got filtered out
1304   * @return whether more rows are available for the scanner or not n
1305   */
1306  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1307    throws IOException {
1308    // short circuit for performance
1309    boolean defaultResult = true;
1310    if (!hasCustomPostScannerFilterRow) {
1311      return defaultResult;
1312    }
1313    if (this.coprocEnvironments.isEmpty()) {
1314      return defaultResult;
1315    }
1316    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1317      regionObserverGetter, defaultResult) {
1318      @Override
1319      public Boolean call(RegionObserver observer) throws IOException {
1320        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1321      }
1322    });
1323  }
1324
1325  /**
1326   * Supports Coprocessor 'bypass'.
1327   * @param s the scanner
1328   * @return true if default behavior should be bypassed, false otherwise
1329   * @exception IOException Exception
1330   */
1331  // Should this be bypassable?
1332  public boolean preScannerClose(final InternalScanner s) throws IOException {
1333    return execOperation(
1334      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1335        @Override
1336        public void call(RegionObserver observer) throws IOException {
1337          observer.preScannerClose(this, s);
1338        }
1339      });
1340  }
1341
1342  /**
1343   * @exception IOException Exception
1344   */
1345  public void postScannerClose(final InternalScanner s) throws IOException {
1346    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1347      @Override
1348      public void call(RegionObserver observer) throws IOException {
1349        observer.postScannerClose(this, s);
1350      }
1351    });
1352  }
1353
1354  /**
1355   * Called before open store scanner for user scan.
1356   */
1357  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1358    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1359    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1360    execOperation(new RegionObserverOperationWithoutResult() {
1361      @Override
1362      public void call(RegionObserver observer) throws IOException {
1363        observer.preStoreScannerOpen(this, store, builder);
1364      }
1365    });
1366    return builder.build();
1367  }
1368
1369  /**
1370   * @param info  the RegionInfo for this region
1371   * @param edits the file of recovered edits
1372   */
1373  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1374    execOperation(
1375      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1376        @Override
1377        public void call(RegionObserver observer) throws IOException {
1378          observer.preReplayWALs(this, info, edits);
1379        }
1380      });
1381  }
1382
1383  /**
1384   * @param info  the RegionInfo for this region
1385   * @param edits the file of recovered edits
1386   * @throws IOException Exception
1387   */
1388  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1389    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1390      @Override
1391      public void call(RegionObserver observer) throws IOException {
1392        observer.postReplayWALs(this, info, edits);
1393      }
1394    });
1395  }
1396
1397  /**
1398   * Supports Coprocessor 'bypass'.
1399   * @return true if default behavior should be bypassed, false otherwise
1400   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with
1401   *             something that doesn't expose IntefaceAudience.Private classes.
1402   */
1403  @Deprecated
1404  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1405    throws IOException {
1406    return execOperation(
1407      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1408        @Override
1409        public void call(RegionObserver observer) throws IOException {
1410          observer.preWALRestore(this, info, logKey, logEdit);
1411        }
1412      });
1413  }
1414
1415  /**
1416   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with
1417   *             something that doesn't expose IntefaceAudience.Private classes.
1418   */
1419  @Deprecated
1420  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1421    throws IOException {
1422    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1423      @Override
1424      public void call(RegionObserver observer) throws IOException {
1425        observer.postWALRestore(this, info, logKey, logEdit);
1426      }
1427    });
1428  }
1429
1430  /**
1431   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1432   */
1433  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1434    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1435      @Override
1436      public void call(RegionObserver observer) throws IOException {
1437        observer.preBulkLoadHFile(this, familyPaths);
1438      }
1439    });
1440  }
1441
1442  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1443    throws IOException {
1444    return execOperation(
1445      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1446        @Override
1447        public void call(RegionObserver observer) throws IOException {
1448          observer.preCommitStoreFile(this, family, pairs);
1449        }
1450      });
1451  }
1452
1453  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath)
1454    throws IOException {
1455    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1456      @Override
1457      public void call(RegionObserver observer) throws IOException {
1458        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1459      }
1460    });
1461  }
1462
1463  /**
1464   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1465   * @param map         Map of CF to List of file paths for the final loaded files n
1466   */
1467  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1468    Map<byte[], List<Path>> map) throws IOException {
1469    if (this.coprocEnvironments.isEmpty()) {
1470      return;
1471    }
1472    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1473      @Override
1474      public void call(RegionObserver observer) throws IOException {
1475        observer.postBulkLoadHFile(this, familyPaths, map);
1476      }
1477    });
1478  }
1479
1480  public void postStartRegionOperation(final Operation op) throws IOException {
1481    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1482      @Override
1483      public void call(RegionObserver observer) throws IOException {
1484        observer.postStartRegionOperation(this, op);
1485      }
1486    });
1487  }
1488
1489  public void postCloseRegionOperation(final Operation op) throws IOException {
1490    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1491      @Override
1492      public void call(RegionObserver observer) throws IOException {
1493        observer.postCloseRegionOperation(this, op);
1494      }
1495    });
1496  }
1497
1498  /**
1499   * @param fs   fileystem to read from
1500   * @param p    path to the file
1501   * @param in   {@link FSDataInputStreamWrapper}
1502   * @param size Full size of the file n * @param r original reference file. This will be not null
1503   *             only when reading a split file.
1504   * @return a Reader instance to use instead of the base reader if overriding default behavior,
1505   *         null otherwise n
1506   */
1507  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1508    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1509    final Reference r) throws IOException {
1510    if (coprocEnvironments.isEmpty()) {
1511      return null;
1512    }
1513    return execOperationWithResult(
1514      new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1515        @Override
1516        public StoreFileReader call(RegionObserver observer) throws IOException {
1517          return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1518        }
1519      });
1520  }
1521
1522  /**
1523   * @param fs     fileystem to read from
1524   * @param p      path to the file
1525   * @param in     {@link FSDataInputStreamWrapper}
1526   * @param size   Full size of the file n * @param r original reference file. This will be not null
1527   *               only when reading a split file.
1528   * @param reader the base reader instance
1529   * @return The reader to use n
1530   */
1531  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1532    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1533    final Reference r, final StoreFileReader reader) throws IOException {
1534    if (this.coprocEnvironments.isEmpty()) {
1535      return reader;
1536    }
1537    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>(
1538      regionObserverGetter, reader) {
1539      @Override
1540      public StoreFileReader call(RegionObserver observer) throws IOException {
1541        return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1542      }
1543    });
1544  }
1545
1546  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1547    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1548    if (this.coprocEnvironments.isEmpty()) {
1549      return cellPairs;
1550    }
1551    return execOperationWithResult(
1552      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1553        cellPairs) {
1554        @Override
1555        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1556          return observer.postIncrementBeforeWAL(this, mutation, getResult());
1557        }
1558      });
1559  }
1560
1561  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1562    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1563    if (this.coprocEnvironments.isEmpty()) {
1564      return cellPairs;
1565    }
1566    return execOperationWithResult(
1567      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1568        cellPairs) {
1569        @Override
1570        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1571          return observer.postAppendBeforeWAL(this, mutation, getResult());
1572        }
1573      });
1574  }
1575
1576  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1577    if (this.coprocEnvironments.isEmpty()) {
1578      return;
1579    }
1580    execOperation(new RegionObserverOperationWithoutResult() {
1581      @Override
1582      public void call(RegionObserver observer) throws IOException {
1583        observer.preWALAppend(this, key, edit);
1584      }
1585    });
1586  }
1587
1588  public Message preEndpointInvocation(final Service service, final String methodName,
1589    Message request) throws IOException {
1590    if (coprocEnvironments.isEmpty()) {
1591      return request;
1592    }
1593    return execOperationWithResult(
1594      new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) {
1595        @Override
1596        public Message call(EndpointObserver observer) throws IOException {
1597          return observer.preEndpointInvocation(this, service, methodName, getResult());
1598        }
1599      });
1600  }
1601
1602  public void postEndpointInvocation(final Service service, final String methodName,
1603    final Message request, final Message.Builder responseBuilder) throws IOException {
1604    execOperation(coprocEnvironments.isEmpty()
1605      ? null
1606      : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1607        @Override
1608        public void call(EndpointObserver observer) throws IOException {
1609          observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1610        }
1611      });
1612  }
1613
1614  /**
1615   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1616   */
1617  @Deprecated
1618  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1619    if (this.coprocEnvironments.isEmpty()) {
1620      return result;
1621    }
1622    return execOperationWithResult(
1623      new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) {
1624        @Override
1625        public DeleteTracker call(RegionObserver observer) throws IOException {
1626          return observer.postInstantiateDeleteTracker(this, getResult());
1627        }
1628      });
1629  }
1630
1631  /////////////////////////////////////////////////////////////////////////////////////////////////
1632  // BulkLoadObserver hooks
1633  /////////////////////////////////////////////////////////////////////////////////////////////////
1634  public void prePrepareBulkLoad(User user) throws IOException {
1635    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1636      @Override
1637      protected void call(BulkLoadObserver observer) throws IOException {
1638        observer.prePrepareBulkLoad(this);
1639      }
1640    });
1641  }
1642
1643  public void preCleanupBulkLoad(User user) throws IOException {
1644    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1645      @Override
1646      protected void call(BulkLoadObserver observer) throws IOException {
1647        observer.preCleanupBulkLoad(this);
1648      }
1649    });
1650  }
1651}