001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationTargetException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.stream.Collectors;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Cell;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.RawCellBuilder;
036import org.apache.hadoop.hbase.RawCellBuilderFactory;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.Append;
039import org.apache.hadoop.hbase.client.CheckAndMutate;
040import org.apache.hadoop.hbase.client.CheckAndMutateResult;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.SharedConnection;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
054import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
057import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
060import org.apache.hadoop.hbase.coprocessor.ObserverContext;
061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
063import org.apache.hadoop.hbase.coprocessor.RegionObserver;
064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
065import org.apache.hadoop.hbase.io.Reference;
066import org.apache.hadoop.hbase.io.hfile.CacheConfig;
067import org.apache.hadoop.hbase.metrics.MetricRegistry;
068import org.apache.hadoop.hbase.regionserver.Region.Operation;
069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
071import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
072import org.apache.hadoop.hbase.security.User;
073import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
074import org.apache.hadoop.hbase.util.Pair;
075import org.apache.hadoop.hbase.wal.WALEdit;
076import org.apache.hadoop.hbase.wal.WALKey;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.protobuf.Message;
082import org.apache.hbase.thirdparty.com.google.protobuf.Service;
083import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
084import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
085
086/**
087 * Implements the coprocessor environment and runtime support for coprocessors loaded within a
088 * {@link Region}.
089 */
090@InterfaceAudience.Private
091public class RegionCoprocessorHost
092  extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
093
094  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
095  // The shared data map
096  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
097    new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
098      AbstractReferenceMap.ReferenceStrength.WEAK);
099
100  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
101  private final boolean hasCustomPostScannerFilterRow;
102
103  /*
104   * Whether any configured CPs override postScannerFilterRow hook
105   */
106  public boolean hasCustomPostScannerFilterRow() {
107    return hasCustomPostScannerFilterRow;
108  }
109
110  /**
111   * Encapsulation of the environment of each coprocessor
112   */
113  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
114    implements RegionCoprocessorEnvironment {
115    private Region region;
116    ConcurrentMap<String, Object> sharedData;
117    private final MetricRegistry metricRegistry;
118    private final RegionServerServices services;
119
120    /**
121     * Constructor
122     * @param impl     the coprocessor instance
123     * @param priority chaining priority
124     */
125    public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq,
126      final Configuration conf, final Region region, final RegionServerServices services,
127      final ConcurrentMap<String, Object> sharedData) {
128      super(impl, priority, seq, conf);
129      this.region = region;
130      this.sharedData = sharedData;
131      this.services = services;
132      this.metricRegistry =
133        MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
134    }
135
136    /** Returns the region */
137    @Override
138    public Region getRegion() {
139      return region;
140    }
141
142    @Override
143    public OnlineRegions getOnlineRegions() {
144      return this.services;
145    }
146
147    @Override
148    public Connection getConnection() {
149      // Mocks may have services as null at test time.
150      return services != null ? new SharedConnection(services.getConnection()) : null;
151    }
152
153    @Override
154    public Connection createConnection(Configuration conf) throws IOException {
155      return services != null ? this.services.createConnection(conf) : null;
156    }
157
158    @Override
159    public ServerName getServerName() {
160      return services != null ? services.getServerName() : null;
161    }
162
163    @Override
164    public void shutdown() {
165      super.shutdown();
166      MetricsCoprocessor.removeRegistry(this.metricRegistry);
167    }
168
169    @Override
170    public ConcurrentMap<String, Object> getSharedData() {
171      return sharedData;
172    }
173
174    @Override
175    public RegionInfo getRegionInfo() {
176      return region.getRegionInfo();
177    }
178
179    @Override
180    public MetricRegistry getMetricRegistryForRegionServer() {
181      return metricRegistry;
182    }
183
184    @Override
185    public RawCellBuilder getCellBuilder() {
186      // We always do a DEEP_COPY only
187      return RawCellBuilderFactory.create();
188    }
189  }
190
191  /**
192   * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors
193   * only. Temporary hack until Core Coprocessors are integrated into Core.
194   */
195  private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment
196    implements HasRegionServerServices {
197    private final RegionServerServices rsServices;
198
199    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
200      final int seq, final Configuration conf, final Region region,
201      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
202      super(impl, priority, seq, conf, region, services, sharedData);
203      this.rsServices = services;
204    }
205
206    /**
207     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
208     *         consumption.
209     */
210    @Override
211    public RegionServerServices getRegionServerServices() {
212      return this.rsServices;
213    }
214  }
215
216  static class TableCoprocessorAttribute {
217    private Path path;
218    private String className;
219    private int priority;
220    private Configuration conf;
221
222    public TableCoprocessorAttribute(Path path, String className, int priority,
223      Configuration conf) {
224      this.path = path;
225      this.className = className;
226      this.priority = priority;
227      this.conf = conf;
228    }
229
230    public Path getPath() {
231      return path;
232    }
233
234    public String getClassName() {
235      return className;
236    }
237
238    public int getPriority() {
239      return priority;
240    }
241
242    public Configuration getConf() {
243      return conf;
244    }
245  }
246
247  /** The region server services */
248  RegionServerServices rsServices;
249  /** The region */
250  HRegion region;
251
252  /**
253   * Constructor
254   * @param region     the region
255   * @param rsServices interface to available region server functionality
256   * @param conf       the configuration
257   */
258  @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization
259  public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices,
260    final Configuration conf) {
261    super(rsServices);
262    this.conf = conf;
263    this.rsServices = rsServices;
264    this.region = region;
265    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
266
267    // load system default cp's from configuration.
268    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
269
270    // load system default cp's for user tables from configuration.
271    if (!region.getRegionInfo().getTable().isSystemTable()) {
272      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
273    }
274
275    // load Coprocessor From HDFS
276    loadTableCoprocessors(conf);
277
278    // now check whether any coprocessor implements postScannerFilterRow
279    boolean hasCustomPostScannerFilterRow = false;
280    out: for (RegionCoprocessorEnvironment env : coprocEnvironments) {
281      if (env.getInstance() instanceof RegionObserver) {
282        Class<?> clazz = env.getInstance().getClass();
283        for (;;) {
284          if (clazz == Object.class) {
285            // we dont need to look postScannerFilterRow into Object class
286            break; // break the inner loop
287          }
288          try {
289            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
290              InternalScanner.class, Cell.class, boolean.class);
291            // this coprocessor has a custom version of postScannerFilterRow
292            hasCustomPostScannerFilterRow = true;
293            break out;
294          } catch (NoSuchMethodException ignore) {
295          }
296          // the deprecated signature still exists
297          try {
298            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
299              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
300            // this coprocessor has a custom version of postScannerFilterRow
301            hasCustomPostScannerFilterRow = true;
302            break out;
303          } catch (NoSuchMethodException ignore) {
304          }
305          clazz = clazz.getSuperclass();
306        }
307      }
308    }
309    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
310  }
311
312  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
313    TableDescriptor htd) {
314    return htd.getCoprocessorDescriptors().stream().map(cp -> {
315      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
316      Configuration ourConf;
317      if (!cp.getProperties().isEmpty()) {
318        // do an explicit deep copy of the passed configuration
319        ourConf = new Configuration(false);
320        HBaseConfiguration.merge(ourConf, conf);
321        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
322      } else {
323        ourConf = conf;
324      }
325      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
326    }).collect(Collectors.toList());
327  }
328
329  /**
330   * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception
331   * if there is a problem.
332   */
333  public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd)
334    throws IOException {
335    String pathPrefix = UUID.randomUUID().toString();
336    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) {
337      if (attr.getPriority() < 0) {
338        throw new IOException(
339          "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0");
340      }
341      ClassLoader old = Thread.currentThread().getContextClassLoader();
342      try {
343        ClassLoader cl;
344        if (attr.getPath() != null) {
345          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
346            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
347        } else {
348          cl = CoprocessorHost.class.getClassLoader();
349        }
350        Thread.currentThread().setContextClassLoader(cl);
351        if (cl instanceof CoprocessorClassLoader) {
352          String[] includedClassPrefixes = null;
353          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
354            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
355            includedClassPrefixes = prefixes.split(";");
356          }
357          ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes);
358        } else {
359          cl.loadClass(attr.getClassName());
360        }
361      } catch (ClassNotFoundException e) {
362        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
363      } finally {
364        Thread.currentThread().setContextClassLoader(old);
365      }
366    }
367  }
368
369  void loadTableCoprocessors(final Configuration conf) {
370    boolean coprocessorsEnabled =
371      conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED);
372    boolean tableCoprocessorsEnabled =
373      conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED);
374    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
375      return;
376    }
377
378    // scan the table attributes for coprocessor load specifications
379    // initialize the coprocessors
380    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
381    for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf,
382      region.getTableDescriptor())) {
383      // Load encompasses classloading and coprocessor initialization
384      try {
385        RegionCoprocessorEnvironment env =
386          load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf());
387        if (env == null) {
388          continue;
389        }
390        configured.add(env);
391        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of "
392          + region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
393      } catch (Throwable t) {
394        // Coprocessor failed to load, do we abort on error?
395        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
396          abortServer(attr.getClassName(), t);
397        } else {
398          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
399        }
400      }
401    }
402    // add together to coprocessor set for COW efficiency
403    coprocEnvironments.addAll(configured);
404  }
405
406  @Override
407  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
408    Configuration conf) {
409    // If coprocessor exposes any services, register them.
410    for (Service service : instance.getServices()) {
411      region.registerService(service);
412    }
413    ConcurrentMap<String, Object> classData;
414    // make sure only one thread can add maps
415    synchronized (SHARED_DATA_MAP) {
416      // as long as at least one RegionEnvironment holds on to its classData it will
417      // remain in this map
418      classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
419        k -> new ConcurrentHashMap<>());
420    }
421    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
422    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)
423      ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices,
424        classData)
425      : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
426  }
427
428  @Override
429  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
430    throws InstantiationException, IllegalAccessException {
431    try {
432      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
433        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
434      } else {
435        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
436          implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
437        return null;
438      }
439    } catch (NoSuchMethodException | InvocationTargetException e) {
440      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
441    }
442  }
443
444  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
445    RegionCoprocessor::getRegionObserver;
446
447  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
448    RegionCoprocessor::getEndpointObserver;
449
450  abstract class RegionObserverOperationWithoutResult
451    extends ObserverOperationWithoutResult<RegionObserver> {
452    public RegionObserverOperationWithoutResult() {
453      super(regionObserverGetter);
454    }
455
456    public RegionObserverOperationWithoutResult(User user) {
457      super(regionObserverGetter, user);
458    }
459
460    public RegionObserverOperationWithoutResult(boolean bypassable) {
461      super(regionObserverGetter, null, bypassable);
462    }
463
464    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
465      super(regionObserverGetter, user, bypassable);
466    }
467  }
468
469  abstract class BulkLoadObserverOperation
470    extends ObserverOperationWithoutResult<BulkLoadObserver> {
471    public BulkLoadObserverOperation(User user) {
472      super(RegionCoprocessor::getBulkLoadObserver, user);
473    }
474  }
475
476  //////////////////////////////////////////////////////////////////////////////////////////////////
477  // Observer operations
478  //////////////////////////////////////////////////////////////////////////////////////////////////
479
480  //////////////////////////////////////////////////////////////////////////////////////////////////
481  // Observer operations
482  //////////////////////////////////////////////////////////////////////////////////////////////////
483
484  /**
485   * Invoked before a region open.
486   * @throws IOException Signals that an I/O exception has occurred.
487   */
488  public void preOpen() throws IOException {
489    if (coprocEnvironments.isEmpty()) {
490      return;
491    }
492    execOperation(new RegionObserverOperationWithoutResult() {
493      @Override
494      public void call(RegionObserver observer) throws IOException {
495        observer.preOpen(this);
496      }
497    });
498  }
499
500  /**
501   * Invoked after a region open
502   */
503  public void postOpen() {
504    if (coprocEnvironments.isEmpty()) {
505      return;
506    }
507    try {
508      execOperation(new RegionObserverOperationWithoutResult() {
509        @Override
510        public void call(RegionObserver observer) throws IOException {
511          observer.postOpen(this);
512        }
513      });
514    } catch (IOException e) {
515      LOG.warn(e.toString(), e);
516    }
517  }
518
519  /**
520   * Invoked before a region is closed
521   * @param abortRequested true if the server is aborting
522   */
523  public void preClose(final boolean abortRequested) throws IOException {
524    execOperation(new RegionObserverOperationWithoutResult() {
525      @Override
526      public void call(RegionObserver observer) throws IOException {
527        observer.preClose(this, abortRequested);
528      }
529    });
530  }
531
532  /**
533   * Invoked after a region is closed
534   * @param abortRequested true if the server is aborting
535   */
536  public void postClose(final boolean abortRequested) {
537    try {
538      execOperation(new RegionObserverOperationWithoutResult() {
539        @Override
540        public void call(RegionObserver observer) throws IOException {
541          observer.postClose(this, abortRequested);
542        }
543
544        @Override
545        public void postEnvCall() {
546          shutdown(this.getEnvironment());
547        }
548      });
549    } catch (IOException e) {
550      LOG.warn(e.toString(), e);
551    }
552  }
553
554  /**
555   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
556   * available candidates.
557   * <p>
558   * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the
559   * passed in <code>candidates</code>.
560   * @param store      The store where compaction is being requested
561   * @param candidates The currently available store files
562   * @param tracker    used to track the life cycle of a compaction
563   * @param user       the user
564   */
565  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
566    final CompactionLifeCycleTracker tracker, final User user) throws IOException {
567    if (coprocEnvironments.isEmpty()) {
568      return false;
569    }
570    boolean bypassable = true;
571    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
572      @Override
573      public void call(RegionObserver observer) throws IOException {
574        observer.preCompactSelection(this, store, candidates, tracker);
575      }
576    });
577  }
578
579  /**
580   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
581   * candidates.
582   * @param store    The store where compaction is being requested
583   * @param selected The store files selected to compact
584   * @param tracker  used to track the life cycle of a compaction
585   * @param request  the compaction request
586   * @param user     the user
587   */
588  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
589    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
590    throws IOException {
591    if (coprocEnvironments.isEmpty()) {
592      return;
593    }
594    execOperation(new RegionObserverOperationWithoutResult(user) {
595      @Override
596      public void call(RegionObserver observer) throws IOException {
597        observer.postCompactSelection(this, store, selected, tracker, request);
598      }
599    });
600  }
601
602  /**
603   * Called prior to opening store scanner for compaction.
604   */
605  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
606    CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
607    if (coprocEnvironments.isEmpty()) {
608      return store.getScanInfo();
609    }
610    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
611    execOperation(new RegionObserverOperationWithoutResult(user) {
612      @Override
613      public void call(RegionObserver observer) throws IOException {
614        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
615      }
616    });
617    return builder.build();
618  }
619
620  /**
621   * Called prior to rewriting the store files selected for compaction
622   * @param store    the store being compacted
623   * @param scanner  the scanner used to read store data during compaction
624   * @param scanType type of Scan
625   * @param tracker  used to track the life cycle of a compaction
626   * @param request  the compaction request
627   * @param user     the user
628   * @return Scanner to use (cannot be null!)
629   */
630  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
631    final ScanType scanType, final CompactionLifeCycleTracker tracker,
632    final CompactionRequest request, final User user) throws IOException {
633    InternalScanner defaultResult = scanner;
634    if (coprocEnvironments.isEmpty()) {
635      return defaultResult;
636    }
637    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
638      regionObserverGetter, defaultResult, user) {
639      @Override
640      public InternalScanner call(RegionObserver observer) throws IOException {
641        InternalScanner scanner =
642          observer.preCompact(this, store, getResult(), scanType, tracker, request);
643        if (scanner == null) {
644          throw new CoprocessorException("Null Scanner return disallowed!");
645        }
646        return scanner;
647      }
648    });
649  }
650
651  /**
652   * Called after the store compaction has completed.
653   * @param store      the store being compacted
654   * @param resultFile the new store file written during compaction
655   * @param tracker    used to track the life cycle of a compaction
656   * @param request    the compaction request
657   * @param user       the user
658   */
659  public void postCompact(final HStore store, final HStoreFile resultFile,
660    final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
661    throws IOException {
662    execOperation(
663      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) {
664        @Override
665        public void call(RegionObserver observer) throws IOException {
666          observer.postCompact(this, store, resultFile, tracker, request);
667        }
668      });
669  }
670
671  /**
672   * Invoked before create StoreScanner for flush.
673   */
674  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
675    throws IOException {
676    if (coprocEnvironments.isEmpty()) {
677      return store.getScanInfo();
678    }
679    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
680    execOperation(new RegionObserverOperationWithoutResult() {
681      @Override
682      public void call(RegionObserver observer) throws IOException {
683        observer.preFlushScannerOpen(this, store, builder, tracker);
684      }
685    });
686    return builder.build();
687  }
688
689  /**
690   * Invoked before a memstore flush
691   * @return Scanner to use (cannot be null!)
692   */
693  public InternalScanner preFlush(HStore store, InternalScanner scanner,
694    FlushLifeCycleTracker tracker) throws IOException {
695    if (coprocEnvironments.isEmpty()) {
696      return scanner;
697    }
698    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
699      regionObserverGetter, scanner) {
700      @Override
701      public InternalScanner call(RegionObserver observer) throws IOException {
702        InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
703        if (scanner == null) {
704          throw new CoprocessorException("Null Scanner return disallowed!");
705        }
706        return scanner;
707      }
708    });
709  }
710
711  /**
712   * Invoked before a memstore flush
713   */
714  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
715    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
716      @Override
717      public void call(RegionObserver observer) throws IOException {
718        observer.preFlush(this, tracker);
719      }
720    });
721  }
722
723  /**
724   * Invoked after a memstore flush
725   */
726  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
727    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
728      @Override
729      public void call(RegionObserver observer) throws IOException {
730        observer.postFlush(this, tracker);
731      }
732    });
733  }
734
735  /**
736   * Invoked before in memory compaction.
737   */
738  public void preMemStoreCompaction(HStore store) throws IOException {
739    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
740      @Override
741      public void call(RegionObserver observer) throws IOException {
742        observer.preMemStoreCompaction(this, store);
743      }
744    });
745  }
746
747  /**
748   * Invoked before create StoreScanner for in memory compaction.
749   */
750  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
751    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
752    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
753      @Override
754      public void call(RegionObserver observer) throws IOException {
755        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
756      }
757    });
758    return builder.build();
759  }
760
761  /**
762   * Invoked before compacting memstore.
763   */
764  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
765    throws IOException {
766    if (coprocEnvironments.isEmpty()) {
767      return scanner;
768    }
769    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
770      regionObserverGetter, scanner) {
771      @Override
772      public InternalScanner call(RegionObserver observer) throws IOException {
773        return observer.preMemStoreCompactionCompact(this, store, getResult());
774      }
775    });
776  }
777
778  /**
779   * Invoked after in memory compaction.
780   */
781  public void postMemStoreCompaction(HStore store) throws IOException {
782    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
783      @Override
784      public void call(RegionObserver observer) throws IOException {
785        observer.postMemStoreCompaction(this, store);
786      }
787    });
788  }
789
790  /**
791   * Invoked after a memstore flush
792   */
793  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
794    throws IOException {
795    if (coprocEnvironments.isEmpty()) {
796      return;
797    }
798    execOperation(new RegionObserverOperationWithoutResult() {
799      @Override
800      public void call(RegionObserver observer) throws IOException {
801        observer.postFlush(this, store, storeFile, tracker);
802      }
803    });
804  }
805
806  // RegionObserver support
807  /**
808   * Supports Coprocessor 'bypass'.
809   * @param get     the Get request
810   * @param results What to return if return is true/'bypass'.
811   * @return true if default processing should be bypassed.
812   * @exception IOException Exception
813   */
814  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
815    if (coprocEnvironments.isEmpty()) {
816      return false;
817    }
818    boolean bypassable = true;
819    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
820      @Override
821      public void call(RegionObserver observer) throws IOException {
822        observer.preGetOp(this, get, results);
823      }
824    });
825  }
826
827  /**
828   * @param get     the Get request
829   * @param results the result set
830   * @exception IOException Exception
831   */
832  public void postGet(final Get get, final List<Cell> results) throws IOException {
833    if (coprocEnvironments.isEmpty()) {
834      return;
835    }
836    execOperation(new RegionObserverOperationWithoutResult() {
837      @Override
838      public void call(RegionObserver observer) throws IOException {
839        observer.postGetOp(this, get, results);
840      }
841    });
842  }
843
844  /**
845   * Supports Coprocessor 'bypass'.
846   * @param get the Get request
847   * @return true or false to return to client if bypassing normal operation, or null otherwise
848   * @exception IOException Exception
849   */
850  public Boolean preExists(final Get get) throws IOException {
851    boolean bypassable = true;
852    boolean defaultResult = false;
853    if (coprocEnvironments.isEmpty()) {
854      return null;
855    }
856    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
857      regionObserverGetter, defaultResult, bypassable) {
858      @Override
859      public Boolean call(RegionObserver observer) throws IOException {
860        return observer.preExists(this, get, getResult());
861      }
862    });
863  }
864
865  /**
866   * @param get    the Get request
867   * @param result the result returned by the region server
868   * @return the result to return to the client
869   * @exception IOException Exception
870   */
871  public boolean postExists(final Get get, boolean result) throws IOException {
872    if (this.coprocEnvironments.isEmpty()) {
873      return result;
874    }
875    return execOperationWithResult(
876      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
877        @Override
878        public Boolean call(RegionObserver observer) throws IOException {
879          return observer.postExists(this, get, getResult());
880        }
881      });
882  }
883
884  /**
885   * Supports Coprocessor 'bypass'.
886   * @param put  The Put object
887   * @param edit The WALEdit object.
888   * @return true if default processing should be bypassed
889   * @exception IOException Exception
890   */
891  public boolean prePut(final Put put, final WALEdit edit) throws IOException {
892    if (coprocEnvironments.isEmpty()) {
893      return false;
894    }
895    boolean bypassable = true;
896    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
897      @Override
898      public void call(RegionObserver observer) throws IOException {
899        observer.prePut(this, put, edit);
900      }
901    });
902  }
903
904  /**
905   * Supports Coprocessor 'bypass'.
906   * @param mutation - the current mutation
907   * @param kv       - the current cell
908   * @param byteNow  - current timestamp in bytes
909   * @param get      - the get that could be used Note that the get only does not specify the family
910   *                 and qualifier that should be used
911   * @return true if default processing should be bypassed
912   * @deprecated In hbase-2.0.0. Will be removed in hbase-4.0.0. Added explicitly for a single
913   *             Coprocessor for its needs only. Will be removed. VisibilityController still needs
914   *             this, need to change the logic there first.
915   */
916  @Deprecated
917  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv,
918    final byte[] byteNow, final Get get) throws IOException {
919    if (coprocEnvironments.isEmpty()) {
920      return false;
921    }
922    boolean bypassable = true;
923    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
924      @Override
925      public void call(RegionObserver observer) throws IOException {
926        observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
927      }
928    });
929  }
930
931  /**
932   * @param put  The Put object
933   * @param edit The WALEdit object.
934   * @exception IOException Exception
935   */
936  public void postPut(final Put put, final WALEdit edit) throws IOException {
937    if (coprocEnvironments.isEmpty()) {
938      return;
939    }
940    execOperation(new RegionObserverOperationWithoutResult() {
941      @Override
942      public void call(RegionObserver observer) throws IOException {
943        observer.postPut(this, put, edit);
944      }
945    });
946  }
947
948  /**
949   * Supports Coprocessor 'bypass'.
950   * @param delete The Delete object
951   * @param edit   The WALEdit object.
952   * @return true if default processing should be bypassed
953   * @exception IOException Exception
954   */
955  public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException {
956    if (this.coprocEnvironments.isEmpty()) {
957      return false;
958    }
959    boolean bypassable = true;
960    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
961      @Override
962      public void call(RegionObserver observer) throws IOException {
963        observer.preDelete(this, delete, edit);
964      }
965    });
966  }
967
968  /**
969   * @param delete The Delete object
970   * @param edit   The WALEdit object.
971   * @exception IOException Exception
972   */
973  public void postDelete(final Delete delete, final WALEdit edit) throws IOException {
974    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
975      @Override
976      public void call(RegionObserver observer) throws IOException {
977        observer.postDelete(this, delete, edit);
978      }
979    });
980  }
981
982  public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
983    throws IOException {
984    if (this.coprocEnvironments.isEmpty()) {
985      return;
986    }
987    execOperation(new RegionObserverOperationWithoutResult() {
988      @Override
989      public void call(RegionObserver observer) throws IOException {
990        observer.preBatchMutate(this, miniBatchOp);
991      }
992    });
993  }
994
995  public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp)
996    throws IOException {
997    if (this.coprocEnvironments.isEmpty()) {
998      return;
999    }
1000    execOperation(new RegionObserverOperationWithoutResult() {
1001      @Override
1002      public void call(RegionObserver observer) throws IOException {
1003        observer.postBatchMutate(this, miniBatchOp);
1004      }
1005    });
1006  }
1007
1008  public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
1009    final boolean success) throws IOException {
1010    if (this.coprocEnvironments.isEmpty()) {
1011      return;
1012    }
1013    execOperation(new RegionObserverOperationWithoutResult() {
1014      @Override
1015      public void call(RegionObserver observer) throws IOException {
1016        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1017      }
1018    });
1019  }
1020
1021  /**
1022   * Supports Coprocessor 'bypass'.
1023   * @param checkAndMutate the CheckAndMutate object
1024   * @return true or false to return to client if default processing should be bypassed, or null
1025   *         otherwise
1026   * @throws IOException if an error occurred on the coprocessor
1027   */
1028  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException {
1029    boolean bypassable = true;
1030    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1031    if (coprocEnvironments.isEmpty()) {
1032      return null;
1033    }
1034    return execOperationWithResult(
1035      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1036        defaultResult, bypassable) {
1037        @Override
1038        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1039          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1040        }
1041      });
1042  }
1043
1044  /**
1045   * Supports Coprocessor 'bypass'.
1046   * @param checkAndMutate the CheckAndMutate object
1047   * @return true or false to return to client if default processing should be bypassed, or null
1048   *         otherwise
1049   * @throws IOException if an error occurred on the coprocessor
1050   */
1051  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1052    throws IOException {
1053    boolean bypassable = true;
1054    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1055    if (coprocEnvironments.isEmpty()) {
1056      return null;
1057    }
1058    return execOperationWithResult(
1059      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1060        defaultResult, bypassable) {
1061        @Override
1062        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1063          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1064        }
1065      });
1066  }
1067
1068  /**
1069   * @param checkAndMutate the CheckAndMutate object
1070   * @param result         the result returned by the checkAndMutate
1071   * @return true or false to return to client if default processing should be bypassed, or null
1072   *         otherwise
1073   * @throws IOException if an error occurred on the coprocessor
1074   */
1075  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1076    CheckAndMutateResult result) throws IOException {
1077    if (this.coprocEnvironments.isEmpty()) {
1078      return result;
1079    }
1080    return execOperationWithResult(
1081      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter,
1082        result) {
1083        @Override
1084        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1085          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1086        }
1087      });
1088  }
1089
1090  /**
1091   * Supports Coprocessor 'bypass'.
1092   * @param append append object
1093   * @param edit   The WALEdit object.
1094   * @return result to return to client if default operation should be bypassed, null otherwise
1095   * @throws IOException if an error occurred on the coprocessor
1096   */
1097  public Result preAppend(final Append append, final WALEdit edit) throws IOException {
1098    boolean bypassable = true;
1099    Result defaultResult = null;
1100    if (this.coprocEnvironments.isEmpty()) {
1101      return defaultResult;
1102    }
1103    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1104      regionObserverGetter, defaultResult, bypassable) {
1105      @Override
1106      public Result call(RegionObserver observer) throws IOException {
1107        return observer.preAppend(this, append, edit);
1108      }
1109    });
1110  }
1111
1112  /**
1113   * Supports Coprocessor 'bypass'.
1114   * @param append append object
1115   * @return result to return to client if default operation should be bypassed, null otherwise
1116   * @throws IOException if an error occurred on the coprocessor
1117   */
1118  public Result preAppendAfterRowLock(final Append append) throws IOException {
1119    boolean bypassable = true;
1120    Result defaultResult = null;
1121    if (this.coprocEnvironments.isEmpty()) {
1122      return defaultResult;
1123    }
1124    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1125      regionObserverGetter, defaultResult, bypassable) {
1126      @Override
1127      public Result call(RegionObserver observer) throws IOException {
1128        return observer.preAppendAfterRowLock(this, append);
1129      }
1130    });
1131  }
1132
1133  /**
1134   * Supports Coprocessor 'bypass'.
1135   * @param increment increment object
1136   * @param edit      The WALEdit object.
1137   * @return result to return to client if default operation should be bypassed, null otherwise
1138   * @throws IOException if an error occurred on the coprocessor
1139   */
1140  public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException {
1141    boolean bypassable = true;
1142    Result defaultResult = null;
1143    if (coprocEnvironments.isEmpty()) {
1144      return defaultResult;
1145    }
1146    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1147      regionObserverGetter, defaultResult, bypassable) {
1148      @Override
1149      public Result call(RegionObserver observer) throws IOException {
1150        return observer.preIncrement(this, increment, edit);
1151      }
1152    });
1153  }
1154
1155  /**
1156   * Supports Coprocessor 'bypass'.
1157   * @param increment increment object
1158   * @return result to return to client if default operation should be bypassed, null otherwise
1159   * @throws IOException if an error occurred on the coprocessor
1160   */
1161  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1162    boolean bypassable = true;
1163    Result defaultResult = null;
1164    if (coprocEnvironments.isEmpty()) {
1165      return defaultResult;
1166    }
1167    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>(
1168      regionObserverGetter, defaultResult, bypassable) {
1169      @Override
1170      public Result call(RegionObserver observer) throws IOException {
1171        return observer.preIncrementAfterRowLock(this, increment);
1172      }
1173    });
1174  }
1175
1176  /**
1177   * @param append Append object
1178   * @param result the result returned by the append
1179   * @param edit   The WALEdit object.
1180   * @throws IOException if an error occurred on the coprocessor
1181   */
1182  public Result postAppend(final Append append, final Result result, final WALEdit edit)
1183    throws IOException {
1184    if (this.coprocEnvironments.isEmpty()) {
1185      return result;
1186    }
1187    return execOperationWithResult(
1188      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1189        @Override
1190        public Result call(RegionObserver observer) throws IOException {
1191          return observer.postAppend(this, append, result, edit);
1192        }
1193      });
1194  }
1195
1196  /**
1197   * @param increment increment object
1198   * @param result    the result returned by postIncrement
1199   * @param edit      The WALEdit object.
1200   * @throws IOException if an error occurred on the coprocessor
1201   */
1202  public Result postIncrement(final Increment increment, Result result, final WALEdit edit)
1203    throws IOException {
1204    if (this.coprocEnvironments.isEmpty()) {
1205      return result;
1206    }
1207    return execOperationWithResult(
1208      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1209        @Override
1210        public Result call(RegionObserver observer) throws IOException {
1211          return observer.postIncrement(this, increment, getResult(), edit);
1212        }
1213      });
1214  }
1215
1216  /**
1217   * @param scan the Scan specification
1218   * @exception IOException Exception
1219   */
1220  public void preScannerOpen(final Scan scan) throws IOException {
1221    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1222      @Override
1223      public void call(RegionObserver observer) throws IOException {
1224        observer.preScannerOpen(this, scan);
1225      }
1226    });
1227  }
1228
1229  /**
1230   * @param scan the Scan specification
1231   * @param s    the scanner
1232   * @return the scanner instance to use
1233   * @exception IOException Exception
1234   */
1235  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1236    if (this.coprocEnvironments.isEmpty()) {
1237      return s;
1238    }
1239    return execOperationWithResult(
1240      new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1241        @Override
1242        public RegionScanner call(RegionObserver observer) throws IOException {
1243          return observer.postScannerOpen(this, scan, getResult());
1244        }
1245      });
1246  }
1247
1248  /**
1249   * @param s       the scanner
1250   * @param results the result set returned by the region server
1251   * @param limit   the maximum number of results to return
1252   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1253   * @exception IOException Exception
1254   */
1255  public Boolean preScannerNext(final InternalScanner s, final List<Result> results,
1256    final int limit) throws IOException {
1257    boolean bypassable = true;
1258    boolean defaultResult = false;
1259    if (coprocEnvironments.isEmpty()) {
1260      return null;
1261    }
1262    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1263      regionObserverGetter, defaultResult, bypassable) {
1264      @Override
1265      public Boolean call(RegionObserver observer) throws IOException {
1266        return observer.preScannerNext(this, s, results, limit, getResult());
1267      }
1268    });
1269  }
1270
1271  /**
1272   * @param s       the scanner
1273   * @param results the result set returned by the region server
1274   * @param limit   the maximum number of results to return
1275   * @return 'has more' indication to give to client
1276   * @exception IOException Exception
1277   */
1278  public boolean postScannerNext(final InternalScanner s, final List<Result> results,
1279    final int limit, boolean hasMore) throws IOException {
1280    if (this.coprocEnvironments.isEmpty()) {
1281      return hasMore;
1282    }
1283    return execOperationWithResult(
1284      new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1285        @Override
1286        public Boolean call(RegionObserver observer) throws IOException {
1287          return observer.postScannerNext(this, s, results, limit, getResult());
1288        }
1289      });
1290  }
1291
1292  /**
1293   * This will be called by the scan flow when the current scanned row is being filtered out by the
1294   * filter.
1295   * @param s          the scanner
1296   * @param curRowCell The cell in the current row which got filtered out
1297   * @return whether more rows are available for the scanner or not
1298   */
1299  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1300    throws IOException {
1301    // short circuit for performance
1302    boolean defaultResult = true;
1303    if (!hasCustomPostScannerFilterRow) {
1304      return defaultResult;
1305    }
1306    if (this.coprocEnvironments.isEmpty()) {
1307      return defaultResult;
1308    }
1309    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1310      regionObserverGetter, defaultResult) {
1311      @Override
1312      public Boolean call(RegionObserver observer) throws IOException {
1313        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1314      }
1315    });
1316  }
1317
1318  /**
1319   * Supports Coprocessor 'bypass'.
1320   * @param s the scanner
1321   * @return true if default behavior should be bypassed, false otherwise
1322   * @exception IOException Exception
1323   */
1324  // Should this be bypassable?
1325  public boolean preScannerClose(final InternalScanner s) throws IOException {
1326    return execOperation(
1327      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1328        @Override
1329        public void call(RegionObserver observer) throws IOException {
1330          observer.preScannerClose(this, s);
1331        }
1332      });
1333  }
1334
1335  /**
1336   * @exception IOException Exception
1337   */
1338  public void postScannerClose(final InternalScanner s) throws IOException {
1339    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1340      @Override
1341      public void call(RegionObserver observer) throws IOException {
1342        observer.postScannerClose(this, s);
1343      }
1344    });
1345  }
1346
1347  /**
1348   * Called before open store scanner for user scan.
1349   */
1350  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1351    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1352    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1353    execOperation(new RegionObserverOperationWithoutResult() {
1354      @Override
1355      public void call(RegionObserver observer) throws IOException {
1356        observer.preStoreScannerOpen(this, store, builder);
1357      }
1358    });
1359    return builder.build();
1360  }
1361
1362  /**
1363   * @param info  the RegionInfo for this region
1364   * @param edits the file of recovered edits
1365   */
1366  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1367    execOperation(
1368      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1369        @Override
1370        public void call(RegionObserver observer) throws IOException {
1371          observer.preReplayWALs(this, info, edits);
1372        }
1373      });
1374  }
1375
1376  /**
1377   * @param info  the RegionInfo for this region
1378   * @param edits the file of recovered edits
1379   * @throws IOException Exception
1380   */
1381  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1382    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1383      @Override
1384      public void call(RegionObserver observer) throws IOException {
1385        observer.postReplayWALs(this, info, edits);
1386      }
1387    });
1388  }
1389
1390  /**
1391   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1392   */
1393  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1394    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1395      @Override
1396      public void call(RegionObserver observer) throws IOException {
1397        observer.preBulkLoadHFile(this, familyPaths);
1398      }
1399    });
1400  }
1401
1402  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1403    throws IOException {
1404    return execOperation(
1405      coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1406        @Override
1407        public void call(RegionObserver observer) throws IOException {
1408          observer.preCommitStoreFile(this, family, pairs);
1409        }
1410      });
1411  }
1412
1413  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath)
1414    throws IOException {
1415    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1416      @Override
1417      public void call(RegionObserver observer) throws IOException {
1418        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1419      }
1420    });
1421  }
1422
1423  /**
1424   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1425   * @param map         Map of CF to List of file paths for the final loaded files
1426   */
1427  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1428    Map<byte[], List<Path>> map) throws IOException {
1429    if (this.coprocEnvironments.isEmpty()) {
1430      return;
1431    }
1432    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1433      @Override
1434      public void call(RegionObserver observer) throws IOException {
1435        observer.postBulkLoadHFile(this, familyPaths, map);
1436      }
1437    });
1438  }
1439
1440  public void postStartRegionOperation(final Operation op) throws IOException {
1441    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1442      @Override
1443      public void call(RegionObserver observer) throws IOException {
1444        observer.postStartRegionOperation(this, op);
1445      }
1446    });
1447  }
1448
1449  public void postCloseRegionOperation(final Operation op) throws IOException {
1450    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1451      @Override
1452      public void call(RegionObserver observer) throws IOException {
1453        observer.postCloseRegionOperation(this, op);
1454      }
1455    });
1456  }
1457
1458  /**
1459   * @param fs   fileystem to read from
1460   * @param p    path to the file
1461   * @param in   {@link FSDataInputStreamWrapper}
1462   * @param size Full size of the file
1463   * @param r    original reference file. This will be not null only when reading a split file.
1464   * @return a Reader instance to use instead of the base reader if overriding default behavior,
1465   *         null otherwise
1466   */
1467  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1468    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1469    final Reference r) throws IOException {
1470    if (coprocEnvironments.isEmpty()) {
1471      return null;
1472    }
1473    return execOperationWithResult(
1474      new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1475        @Override
1476        public StoreFileReader call(RegionObserver observer) throws IOException {
1477          return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1478        }
1479      });
1480  }
1481
1482  /**
1483   * @param fs     fileystem to read from
1484   * @param p      path to the file
1485   * @param in     {@link FSDataInputStreamWrapper}
1486   * @param size   Full size of the file
1487   * @param r      original reference file. This will be not null only when reading a split file.
1488   * @param reader the base reader instance
1489   * @return The reader to use
1490   */
1491  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1492    final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1493    final Reference r, final StoreFileReader reader) throws IOException {
1494    if (this.coprocEnvironments.isEmpty()) {
1495      return reader;
1496    }
1497    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>(
1498      regionObserverGetter, reader) {
1499      @Override
1500      public StoreFileReader call(RegionObserver observer) throws IOException {
1501        return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult());
1502      }
1503    });
1504  }
1505
1506  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1507    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1508    if (this.coprocEnvironments.isEmpty()) {
1509      return cellPairs;
1510    }
1511    return execOperationWithResult(
1512      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1513        cellPairs) {
1514        @Override
1515        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1516          return observer.postIncrementBeforeWAL(this, mutation, getResult());
1517        }
1518      });
1519  }
1520
1521  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1522    final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1523    if (this.coprocEnvironments.isEmpty()) {
1524      return cellPairs;
1525    }
1526    return execOperationWithResult(
1527      new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter,
1528        cellPairs) {
1529        @Override
1530        public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1531          return observer.postAppendBeforeWAL(this, mutation, getResult());
1532        }
1533      });
1534  }
1535
1536  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1537    if (this.coprocEnvironments.isEmpty()) {
1538      return;
1539    }
1540    execOperation(new RegionObserverOperationWithoutResult() {
1541      @Override
1542      public void call(RegionObserver observer) throws IOException {
1543        observer.preWALAppend(this, key, edit);
1544      }
1545    });
1546  }
1547
1548  public Message preEndpointInvocation(final Service service, final String methodName,
1549    Message request) throws IOException {
1550    if (coprocEnvironments.isEmpty()) {
1551      return request;
1552    }
1553    return execOperationWithResult(
1554      new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) {
1555        @Override
1556        public Message call(EndpointObserver observer) throws IOException {
1557          return observer.preEndpointInvocation(this, service, methodName, getResult());
1558        }
1559      });
1560  }
1561
1562  public void postEndpointInvocation(final Service service, final String methodName,
1563    final Message request, final Message.Builder responseBuilder) throws IOException {
1564    execOperation(coprocEnvironments.isEmpty()
1565      ? null
1566      : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1567        @Override
1568        public void call(EndpointObserver observer) throws IOException {
1569          observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1570        }
1571      });
1572  }
1573
1574  /**
1575   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1576   */
1577  @Deprecated
1578  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1579    if (this.coprocEnvironments.isEmpty()) {
1580      return result;
1581    }
1582    return execOperationWithResult(
1583      new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) {
1584        @Override
1585        public DeleteTracker call(RegionObserver observer) throws IOException {
1586          return observer.postInstantiateDeleteTracker(this, getResult());
1587        }
1588      });
1589  }
1590
1591  /////////////////////////////////////////////////////////////////////////////////////////////////
1592  // BulkLoadObserver hooks
1593  /////////////////////////////////////////////////////////////////////////////////////////////////
1594  public void prePrepareBulkLoad(User user) throws IOException {
1595    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1596      @Override
1597      protected void call(BulkLoadObserver observer) throws IOException {
1598        observer.prePrepareBulkLoad(this);
1599      }
1600    });
1601  }
1602
1603  public void preCleanupBulkLoad(User user) throws IOException {
1604    execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) {
1605      @Override
1606      protected void call(BulkLoadObserver observer) throws IOException {
1607        observer.preCleanupBulkLoad(this);
1608      }
1609    });
1610  }
1611}