001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import com.google.protobuf.Message;
022import com.google.protobuf.Service;
023
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.UUID;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CompareOperator;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.RawCellBuilder;
041import org.apache.hadoop.hbase.RawCellBuilderFactory;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.SharedConnection;
044import org.apache.hadoop.hbase.client.Append;
045import org.apache.hadoop.hbase.client.Connection;
046import org.apache.hadoop.hbase.client.Delete;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Get;
049import org.apache.hadoop.hbase.client.Increment;
050import org.apache.hadoop.hbase.client.Mutation;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
057import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
058import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
061import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
062import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
063import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
064import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
065import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
066import org.apache.hadoop.hbase.coprocessor.ObserverContext;
067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
069import org.apache.hadoop.hbase.coprocessor.RegionObserver;
070import org.apache.hadoop.hbase.filter.ByteArrayComparable;
071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
072import org.apache.hadoop.hbase.io.Reference;
073import org.apache.hadoop.hbase.io.hfile.CacheConfig;
074import org.apache.hadoop.hbase.metrics.MetricRegistry;
075import org.apache.hadoop.hbase.regionserver.Region.Operation;
076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
079import org.apache.hadoop.hbase.security.User;
080import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WALEdit;
083import org.apache.hadoop.hbase.wal.WALKey;
084import org.apache.yetus.audience.InterfaceAudience;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
090
091/**
092 * Implements the coprocessor environment and runtime support for coprocessors
093 * loaded within a {@link Region}.
094 */
095@InterfaceAudience.Private
096public class RegionCoprocessorHost
097    extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
098
099  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
100  // The shared data map
101  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
102      new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
103          AbstractReferenceMap.ReferenceStrength.WEAK);
104
105  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
106  private final boolean hasCustomPostScannerFilterRow;
107
108  /**
109   *
110   * Encapsulation of the environment of each coprocessor
111   */
112  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
113      implements RegionCoprocessorEnvironment {
114    private Region region;
115    ConcurrentMap<String, Object> sharedData;
116    private final MetricRegistry metricRegistry;
117    private final RegionServerServices services;
118
119    /**
120     * Constructor
121     * @param impl the coprocessor instance
122     * @param priority chaining priority
123     */
124    public RegionEnvironment(final RegionCoprocessor impl, final int priority,
125        final int seq, final Configuration conf, final Region region,
126        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
127      super(impl, priority, seq, conf);
128      this.region = region;
129      this.sharedData = sharedData;
130      this.services = services;
131      this.metricRegistry =
132          MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
133    }
134
135    /** @return the region */
136    @Override
137    public Region getRegion() {
138      return region;
139    }
140
141    @Override
142    public OnlineRegions getOnlineRegions() {
143      return this.services;
144    }
145
146    @Override
147    public Connection getConnection() {
148      // Mocks may have services as null at test time.
149      return services != null ? new SharedConnection(services.getConnection()) : null;
150    }
151
152    @Override
153    public Connection createConnection(Configuration conf) throws IOException {
154      return services != null ? this.services.createConnection(conf) : null;
155    }
156
157    @Override
158    public ServerName getServerName() {
159      return services != null? services.getServerName(): null;
160    }
161
162    @Override
163    public void shutdown() {
164      super.shutdown();
165      MetricsCoprocessor.removeRegistry(this.metricRegistry);
166    }
167
168    @Override
169    public ConcurrentMap<String, Object> getSharedData() {
170      return sharedData;
171    }
172
173    @Override
174    public RegionInfo getRegionInfo() {
175      return region.getRegionInfo();
176    }
177
178    @Override
179    public MetricRegistry getMetricRegistryForRegionServer() {
180      return metricRegistry;
181    }
182
183    @Override
184    public RawCellBuilder getCellBuilder() {
185      // We always do a DEEP_COPY only
186      return RawCellBuilderFactory.create();
187    }
188  }
189
190  /**
191   * Special version of RegionEnvironment that exposes RegionServerServices for Core
192   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
193   */
194  private static class RegionEnvironmentForCoreCoprocessors extends
195      RegionEnvironment implements HasRegionServerServices {
196    private final RegionServerServices rsServices;
197
198    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
199      final int seq, final Configuration conf, final Region region,
200      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
201      super(impl, priority, seq, conf, region, services, sharedData);
202      this.rsServices = services;
203    }
204
205    /**
206     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
207     * consumption.
208     */
209    @Override
210    public RegionServerServices getRegionServerServices() {
211      return this.rsServices;
212    }
213  }
214
215  static class TableCoprocessorAttribute {
216    private Path path;
217    private String className;
218    private int priority;
219    private Configuration conf;
220
221    public TableCoprocessorAttribute(Path path, String className, int priority,
222        Configuration conf) {
223      this.path = path;
224      this.className = className;
225      this.priority = priority;
226      this.conf = conf;
227    }
228
229    public Path getPath() {
230      return path;
231    }
232
233    public String getClassName() {
234      return className;
235    }
236
237    public int getPriority() {
238      return priority;
239    }
240
241    public Configuration getConf() {
242      return conf;
243    }
244  }
245
246  /** The region server services */
247  RegionServerServices rsServices;
248  /** The region */
249  HRegion region;
250
251  /**
252   * Constructor
253   * @param region the region
254   * @param rsServices interface to available region server functionality
255   * @param conf the configuration
256   */
257  public RegionCoprocessorHost(final HRegion region,
258      final RegionServerServices rsServices, final Configuration conf) {
259    super(rsServices);
260    this.conf = conf;
261    this.rsServices = rsServices;
262    this.region = region;
263    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
264
265    // load system default cp's from configuration.
266    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
267
268    // load system default cp's for user tables from configuration.
269    if (!region.getRegionInfo().getTable().isSystemTable()) {
270      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
271    }
272
273    // load Coprocessor From HDFS
274    loadTableCoprocessors(conf);
275
276    // now check whether any coprocessor implements postScannerFilterRow
277    boolean hasCustomPostScannerFilterRow = false;
278    out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
279      if (env.getInstance() instanceof RegionObserver) {
280        Class<?> clazz = env.getInstance().getClass();
281        for(;;) {
282          if (clazz == null) {
283            // we must have directly implemented RegionObserver
284            hasCustomPostScannerFilterRow = true;
285            break out;
286          }
287          try {
288            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
289              InternalScanner.class, Cell.class, boolean.class);
290            // this coprocessor has a custom version of postScannerFilterRow
291            hasCustomPostScannerFilterRow = true;
292            break out;
293          } catch (NoSuchMethodException ignore) {
294          }
295          // the deprecated signature still exists
296          try {
297            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
298              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
299            // this coprocessor has a custom version of postScannerFilterRow
300            hasCustomPostScannerFilterRow = true;
301            break out;
302          } catch (NoSuchMethodException ignore) {
303          }
304          clazz = clazz.getSuperclass();
305        }
306      }
307    }
308    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
309  }
310
311  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
312      TableDescriptor htd) {
313    return htd.getCoprocessorDescriptors().stream().map(cp -> {
314      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
315      Configuration ourConf;
316      if (!cp.getProperties().isEmpty()) {
317        // do an explicit deep copy of the passed configuration
318        ourConf = new Configuration(false);
319        HBaseConfiguration.merge(ourConf, conf);
320        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
321      } else {
322        ourConf = conf;
323      }
324      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
325    }).collect(Collectors.toList());
326  }
327
328  /**
329   * Sanity check the table coprocessor attributes of the supplied schema. Will
330   * throw an exception if there is a problem.
331   * @param conf
332   * @param htd
333   * @throws IOException
334   */
335  public static void testTableCoprocessorAttrs(final Configuration conf,
336      final TableDescriptor htd) throws IOException {
337    String pathPrefix = UUID.randomUUID().toString();
338    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
339      if (attr.getPriority() < 0) {
340        throw new IOException("Priority for coprocessor " + attr.getClassName() +
341          " cannot be less than 0");
342      }
343      ClassLoader old = Thread.currentThread().getContextClassLoader();
344      try {
345        ClassLoader cl;
346        if (attr.getPath() != null) {
347          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
348            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
349        } else {
350          cl = CoprocessorHost.class.getClassLoader();
351        }
352        Thread.currentThread().setContextClassLoader(cl);
353        if (cl instanceof CoprocessorClassLoader) {
354          String[] includedClassPrefixes = null;
355          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
356            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
357            includedClassPrefixes = prefixes.split(";");
358          }
359          ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes);
360        } else {
361          cl.loadClass(attr.getClassName());
362        }
363      } catch (ClassNotFoundException e) {
364        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
365      } finally {
366        Thread.currentThread().setContextClassLoader(old);
367      }
368    }
369  }
370
371  void loadTableCoprocessors(final Configuration conf) {
372    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
373      DEFAULT_COPROCESSORS_ENABLED);
374    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
375      DEFAULT_USER_COPROCESSORS_ENABLED);
376    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
377      return;
378    }
379
380    // scan the table attributes for coprocessor load specifications
381    // initialize the coprocessors
382    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
383    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
384        region.getTableDescriptor())) {
385      // Load encompasses classloading and coprocessor initialization
386      try {
387        RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
388            attr.getPriority(), attr.getConf());
389        if (env == null) {
390          continue;
391        }
392        configured.add(env);
393        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
394            region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
395      } catch (Throwable t) {
396        // Coprocessor failed to load, do we abort on error?
397        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
398          abortServer(attr.getClassName(), t);
399        } else {
400          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
401        }
402      }
403    }
404    // add together to coprocessor set for COW efficiency
405    coprocEnvironments.addAll(configured);
406  }
407
408  @Override
409  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
410      Configuration conf) {
411    // If coprocessor exposes any services, register them.
412    for (Service service : instance.getServices()) {
413      region.registerService(service);
414    }
415    ConcurrentMap<String, Object> classData;
416    // make sure only one thread can add maps
417    synchronized (SHARED_DATA_MAP) {
418      // as long as at least one RegionEnvironment holds on to its classData it will
419      // remain in this map
420      classData =
421          SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
422              k -> new ConcurrentHashMap<>());
423    }
424    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
425    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
426        new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
427            rsServices, classData):
428        new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
429  }
430
431  @Override
432  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
433      throws InstantiationException, IllegalAccessException {
434    try {
435      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
436        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
437      } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
438        // For backward compatibility with old CoprocessorService impl which don't extend
439        // RegionCoprocessor.
440        CoprocessorService cs;
441        cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance();
442        return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs);
443      } else {
444        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
445            implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
446        return null;
447      }
448    } catch (NoSuchMethodException | InvocationTargetException e) {
449      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
450    }
451  }
452
453  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
454      RegionCoprocessor::getRegionObserver;
455
456  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
457      RegionCoprocessor::getEndpointObserver;
458
459  abstract class RegionObserverOperationWithoutResult extends
460      ObserverOperationWithoutResult<RegionObserver> {
461    public RegionObserverOperationWithoutResult() {
462      super(regionObserverGetter);
463    }
464
465    public RegionObserverOperationWithoutResult(User user) {
466      super(regionObserverGetter, user);
467    }
468
469    public RegionObserverOperationWithoutResult(boolean bypassable) {
470      super(regionObserverGetter, null, bypassable);
471    }
472
473    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
474      super(regionObserverGetter, user, bypassable);
475    }
476  }
477
478  abstract class BulkLoadObserverOperation extends
479      ObserverOperationWithoutResult<BulkLoadObserver> {
480    public BulkLoadObserverOperation(User user) {
481      super(RegionCoprocessor::getBulkLoadObserver, user);
482    }
483  }
484
485
486  //////////////////////////////////////////////////////////////////////////////////////////////////
487  // Observer operations
488  //////////////////////////////////////////////////////////////////////////////////////////////////
489
490  //////////////////////////////////////////////////////////////////////////////////////////////////
491  // Observer operations
492  //////////////////////////////////////////////////////////////////////////////////////////////////
493
494  /**
495   * Invoked before a region open.
496   *
497   * @throws IOException Signals that an I/O exception has occurred.
498   */
499  public void preOpen() throws IOException {
500    if (coprocEnvironments.isEmpty()) {
501      return;
502    }
503    execOperation(new RegionObserverOperationWithoutResult() {
504      @Override
505      public void call(RegionObserver observer) throws IOException {
506        observer.preOpen(this);
507      }
508    });
509  }
510
511
512  /**
513   * Invoked after a region open
514   */
515  public void postOpen() {
516    if (coprocEnvironments.isEmpty()) {
517      return;
518    }
519    try {
520      execOperation(new RegionObserverOperationWithoutResult() {
521        @Override
522        public void call(RegionObserver observer) throws IOException {
523          observer.postOpen(this);
524        }
525      });
526    } catch (IOException e) {
527      LOG.warn(e.toString(), e);
528    }
529  }
530
531  /**
532   * Invoked before a region is closed
533   * @param abortRequested true if the server is aborting
534   */
535  public void preClose(final boolean abortRequested) throws IOException {
536    execOperation(new RegionObserverOperationWithoutResult() {
537      @Override
538      public void call(RegionObserver observer) throws IOException {
539        observer.preClose(this, abortRequested);
540      }
541    });
542  }
543
544  /**
545   * Invoked after a region is closed
546   * @param abortRequested true if the server is aborting
547   */
548  public void postClose(final boolean abortRequested) {
549    try {
550      execOperation(new RegionObserverOperationWithoutResult() {
551        @Override
552        public void call(RegionObserver observer) throws IOException {
553          observer.postClose(this, abortRequested);
554        }
555
556        @Override
557        public void postEnvCall() {
558          shutdown(this.getEnvironment());
559        }
560      });
561    } catch (IOException e) {
562      LOG.warn(e.toString(), e);
563    }
564  }
565
566  /**
567   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
568   * available candidates.
569   * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
570   * the passed in <code>candidates</code>.
571   * @param store The store where compaction is being requested
572   * @param candidates The currently available store files
573   * @param tracker used to track the life cycle of a compaction
574   * @param user the user
575   * @throws IOException
576   */
577  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
578      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
579    if (coprocEnvironments.isEmpty()) {
580      return false;
581    }
582    boolean bypassable = true;
583    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
584      @Override
585      public void call(RegionObserver observer) throws IOException {
586        observer.preCompactSelection(this, store, candidates, tracker);
587      }
588    });
589  }
590
591  /**
592   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
593   * candidates.
594   * @param store The store where compaction is being requested
595   * @param selected The store files selected to compact
596   * @param tracker used to track the life cycle of a compaction
597   * @param request the compaction request
598   * @param user the user
599   */
600  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
601      final CompactionLifeCycleTracker tracker, final CompactionRequest request,
602      final User user) throws IOException {
603    if (coprocEnvironments.isEmpty()) {
604      return;
605    }
606    execOperation(new RegionObserverOperationWithoutResult(user) {
607      @Override
608      public void call(RegionObserver observer) throws IOException {
609        observer.postCompactSelection(this, store, selected, tracker, request);
610      }
611    });
612  }
613
614  /**
615   * Called prior to opening store scanner for compaction.
616   */
617  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
618      CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
619    if (coprocEnvironments.isEmpty()) {
620      return store.getScanInfo();
621    }
622    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
623    execOperation(new RegionObserverOperationWithoutResult(user) {
624      @Override
625      public void call(RegionObserver observer) throws IOException {
626        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
627      }
628    });
629    return builder.build();
630  }
631
632  /**
633   * Called prior to rewriting the store files selected for compaction
634   * @param store the store being compacted
635   * @param scanner the scanner used to read store data during compaction
636   * @param scanType type of Scan
637   * @param tracker used to track the life cycle of a compaction
638   * @param request the compaction request
639   * @param user the user
640   * @return Scanner to use (cannot be null!)
641   * @throws IOException
642   */
643  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
644      final ScanType scanType, final CompactionLifeCycleTracker tracker,
645      final CompactionRequest request, final User user) throws IOException {
646    InternalScanner defaultResult = scanner;
647    if (coprocEnvironments.isEmpty()) {
648      return defaultResult;
649    }
650    return execOperationWithResult(
651        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter,
652            defaultResult, user) {
653          @Override
654          public InternalScanner call(RegionObserver observer) throws IOException {
655            InternalScanner scanner =
656                observer.preCompact(this, store, getResult(), scanType, tracker, request);
657            if (scanner == null) {
658              throw new CoprocessorException("Null Scanner return disallowed!");
659            }
660            return scanner;
661          }
662        });
663  }
664
665  /**
666   * Called after the store compaction has completed.
667   * @param store the store being compacted
668   * @param resultFile the new store file written during compaction
669   * @param tracker used to track the life cycle of a compaction
670   * @param request the compaction request
671   * @param user the user
672   * @throws IOException
673   */
674  public void postCompact(final HStore store, final HStoreFile resultFile,
675      final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
676      throws IOException {
677    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) {
678      @Override
679      public void call(RegionObserver observer) throws IOException {
680        observer.postCompact(this, store, resultFile, tracker, request);
681      }
682    });
683  }
684
685  /**
686   * Invoked before create StoreScanner for flush.
687   */
688  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
689      throws IOException {
690    if (coprocEnvironments.isEmpty()) {
691      return store.getScanInfo();
692    }
693    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
694    execOperation(new RegionObserverOperationWithoutResult() {
695      @Override
696      public void call(RegionObserver observer) throws IOException {
697        observer.preFlushScannerOpen(this, store, builder, tracker);
698      }
699    });
700    return builder.build();
701  }
702
703  /**
704   * Invoked before a memstore flush
705   * @return Scanner to use (cannot be null!)
706   * @throws IOException
707   */
708  public InternalScanner preFlush(HStore store, InternalScanner scanner,
709      FlushLifeCycleTracker tracker) throws IOException {
710    if (coprocEnvironments.isEmpty()) {
711      return scanner;
712    }
713    return execOperationWithResult(
714        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
715          @Override
716          public InternalScanner call(RegionObserver observer) throws IOException {
717            InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
718            if (scanner == null) {
719              throw new CoprocessorException("Null Scanner return disallowed!");
720            }
721            return scanner;
722          }
723        });
724  }
725
726  /**
727   * Invoked before a memstore flush
728   * @throws IOException
729   */
730  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
731    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
732      @Override
733      public void call(RegionObserver observer) throws IOException {
734        observer.preFlush(this, tracker);
735      }
736    });
737  }
738
739  /**
740   * Invoked after a memstore flush
741   * @throws IOException
742   */
743  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
744    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
745      @Override
746      public void call(RegionObserver observer) throws IOException {
747        observer.postFlush(this, tracker);
748      }
749    });
750  }
751
752  /**
753   * Invoked before in memory compaction.
754   */
755  public void preMemStoreCompaction(HStore store) throws IOException {
756    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
757      @Override
758      public void call(RegionObserver observer) throws IOException {
759        observer.preMemStoreCompaction(this, store);
760      }
761    });
762  }
763
764  /**
765   * Invoked before create StoreScanner for in memory compaction.
766   */
767  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
768    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
769    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
770      @Override
771      public void call(RegionObserver observer) throws IOException {
772        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
773      }
774    });
775    return builder.build();
776  }
777
778  /**
779   * Invoked before compacting memstore.
780   */
781  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
782      throws IOException {
783    if (coprocEnvironments.isEmpty()) {
784      return scanner;
785    }
786    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
787        regionObserverGetter, scanner) {
788      @Override
789      public InternalScanner call(RegionObserver observer) throws IOException {
790        return observer.preMemStoreCompactionCompact(this, store, getResult());
791      }
792    });
793  }
794
795  /**
796   * Invoked after in memory compaction.
797   */
798  public void postMemStoreCompaction(HStore store) throws IOException {
799    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
800      @Override
801      public void call(RegionObserver observer) throws IOException {
802        observer.postMemStoreCompaction(this, store);
803      }
804    });
805  }
806
807  /**
808   * Invoked after a memstore flush
809   * @throws IOException
810   */
811  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
812      throws IOException {
813    if (coprocEnvironments.isEmpty()) {
814      return;
815    }
816    execOperation(new RegionObserverOperationWithoutResult() {
817      @Override
818      public void call(RegionObserver observer) throws IOException {
819        observer.postFlush(this, store, storeFile, tracker);
820      }
821    });
822  }
823
824  // RegionObserver support
825  /**
826   * Supports Coprocessor 'bypass'.
827   * @param get the Get request
828   * @param results What to return if return is true/'bypass'.
829   * @return true if default processing should be bypassed.
830   * @exception IOException Exception
831   */
832  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
833    if (coprocEnvironments.isEmpty()) {
834      return false;
835    }
836    boolean bypassable = true;
837    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
838      @Override
839      public void call(RegionObserver observer) throws IOException {
840        observer.preGetOp(this, get, results);
841      }
842    });
843  }
844
845  /**
846   * @param get the Get request
847   * @param results the result set
848   * @exception IOException Exception
849   */
850  public void postGet(final Get get, final List<Cell> results)
851      throws IOException {
852    if (coprocEnvironments.isEmpty()) {
853      return;
854    }
855    execOperation(new RegionObserverOperationWithoutResult() {
856      @Override
857      public void call(RegionObserver observer) throws IOException {
858        observer.postGetOp(this, get, results);
859      }
860    });
861  }
862
863  /**
864   * Supports Coprocessor 'bypass'.
865   * @param get the Get request
866   * @return true or false to return to client if bypassing normal operation, or null otherwise
867   * @exception IOException Exception
868   */
869  public Boolean preExists(final Get get) throws IOException {
870    boolean bypassable = true;
871    boolean defaultResult = false;
872    if (coprocEnvironments.isEmpty()) {
873      return null;
874    }
875    return execOperationWithResult(
876        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
877            defaultResult, bypassable) {
878          @Override
879          public Boolean call(RegionObserver observer) throws IOException {
880            return observer.preExists(this, get, getResult());
881          }
882        });
883  }
884
885  /**
886   * @param get the Get request
887   * @param result the result returned by the region server
888   * @return the result to return to the client
889   * @exception IOException Exception
890   */
891  public boolean postExists(final Get get, boolean result)
892      throws IOException {
893    if (this.coprocEnvironments.isEmpty()) {
894      return result;
895    }
896    return execOperationWithResult(
897        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
898          @Override
899          public Boolean call(RegionObserver observer) throws IOException {
900            return observer.postExists(this, get, getResult());
901          }
902        });
903  }
904
905  /**
906   * Supports Coprocessor 'bypass'.
907   * @param put The Put object
908   * @param edit The WALEdit object.
909   * @param durability The durability used
910   * @return true if default processing should be bypassed
911   * @exception IOException Exception
912   */
913  public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
914      throws IOException {
915    if (coprocEnvironments.isEmpty()) {
916      return false;
917    }
918    boolean bypassable = true;
919    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
920      @Override
921      public void call(RegionObserver observer) throws IOException {
922        observer.prePut(this, put, edit, durability);
923      }
924    });
925  }
926
927  /**
928   * Supports Coprocessor 'bypass'.
929   * @param mutation - the current mutation
930   * @param kv - the current cell
931   * @param byteNow - current timestamp in bytes
932   * @param get - the get that could be used
933   * Note that the get only does not specify the family and qualifier that should be used
934   * @return true if default processing should be bypassed
935   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
936   * Coprocessor for its needs only. Will be removed.
937   */
938  @Deprecated
939  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
940      final Cell kv, final byte[] byteNow, final Get get) throws IOException {
941    if (coprocEnvironments.isEmpty()) {
942      return false;
943    }
944    boolean bypassable = true;
945    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
946      @Override
947      public void call(RegionObserver observer) throws IOException {
948          observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
949      }
950    });
951  }
952
953  /**
954   * @param put The Put object
955   * @param edit The WALEdit object.
956   * @param durability The durability used
957   * @exception IOException Exception
958   */
959  public void postPut(final Put put, final WALEdit edit, final Durability durability)
960      throws IOException {
961    if (coprocEnvironments.isEmpty()) {
962      return;
963    }
964    execOperation(new RegionObserverOperationWithoutResult() {
965      @Override
966      public void call(RegionObserver observer) throws IOException {
967        observer.postPut(this, put, edit, durability);
968      }
969    });
970  }
971
972  /**
973   * Supports Coprocessor 'bypass'.
974   * @param delete The Delete object
975   * @param edit The WALEdit object.
976   * @param durability The durability used
977   * @return true if default processing should be bypassed
978   * @exception IOException Exception
979   */
980  public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
981      throws IOException {
982    if (this.coprocEnvironments.isEmpty()) {
983      return false;
984    }
985    boolean bypassable = true;
986    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
987      @Override
988      public void call(RegionObserver observer) throws IOException {
989         observer.preDelete(this, delete, edit, durability);
990      }
991    });
992  }
993
994  /**
995   * @param delete The Delete object
996   * @param edit The WALEdit object.
997   * @param durability The durability used
998   * @exception IOException Exception
999   */
1000  public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
1001      throws IOException {
1002    execOperation(coprocEnvironments.isEmpty()? null:
1003        new RegionObserverOperationWithoutResult() {
1004      @Override
1005      public void call(RegionObserver observer) throws IOException {
1006        observer.postDelete(this, delete, edit, durability);
1007      }
1008    });
1009  }
1010
1011  public void preBatchMutate(
1012      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1013    if(this.coprocEnvironments.isEmpty()) {
1014      return;
1015    }
1016    execOperation(new RegionObserverOperationWithoutResult() {
1017      @Override
1018      public void call(RegionObserver observer) throws IOException {
1019        observer.preBatchMutate(this, miniBatchOp);
1020      }
1021    });
1022  }
1023
1024  public void postBatchMutate(
1025      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1026    if (this.coprocEnvironments.isEmpty()) {
1027      return;
1028    }
1029    execOperation(new RegionObserverOperationWithoutResult() {
1030      @Override
1031      public void call(RegionObserver observer) throws IOException {
1032        observer.postBatchMutate(this, miniBatchOp);
1033      }
1034    });
1035  }
1036
1037  public void postBatchMutateIndispensably(
1038      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1039      throws IOException {
1040    if (this.coprocEnvironments.isEmpty()) {
1041      return;
1042    }
1043    execOperation(new RegionObserverOperationWithoutResult() {
1044      @Override
1045      public void call(RegionObserver observer) throws IOException {
1046        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1047      }
1048    });
1049  }
1050
1051  /**
1052   * Supports Coprocessor 'bypass'.
1053   * @param row row to check
1054   * @param family column family
1055   * @param qualifier column qualifier
1056   * @param op the comparison operation
1057   * @param comparator the comparator
1058   * @param put data to put if check succeeds
1059   * @return true or false to return to client if default processing should be bypassed, or null
1060   * otherwise
1061   */
1062  public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1063      final byte [] qualifier, final CompareOperator op,
1064      final ByteArrayComparable comparator, final Put put)
1065      throws IOException {
1066    boolean bypassable = true;
1067    boolean defaultResult = false;
1068    if (coprocEnvironments.isEmpty()) {
1069      return null;
1070    }
1071    return execOperationWithResult(
1072        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1073            defaultResult,  bypassable) {
1074          @Override
1075          public Boolean call(RegionObserver observer) throws IOException {
1076            return observer.preCheckAndPut(this, row, family, qualifier,
1077                op, comparator, put, getResult());
1078          }
1079        });
1080  }
1081
1082  /**
1083   * Supports Coprocessor 'bypass'.
1084   * @param row row to check
1085   * @param family column family
1086   * @param qualifier column qualifier
1087   * @param op the comparison operation
1088   * @param comparator the comparator
1089   * @param put data to put if check succeeds
1090   * @return true or false to return to client if default processing should be bypassed, or null
1091   * otherwise
1092   */
1093  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
1094      justification="Null is legit")
1095  public Boolean preCheckAndPutAfterRowLock(
1096      final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1097      final ByteArrayComparable comparator, final Put put) throws IOException {
1098    boolean bypassable = true;
1099    boolean defaultResult = false;
1100    if (coprocEnvironments.isEmpty()) {
1101      return null;
1102    }
1103    return execOperationWithResult(
1104        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1105            defaultResult, bypassable) {
1106          @Override
1107          public Boolean call(RegionObserver observer) throws IOException {
1108            return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
1109                op, comparator, put, getResult());
1110          }
1111        });
1112  }
1113
1114  /**
1115   * @param row row to check
1116   * @param family column family
1117   * @param qualifier column qualifier
1118   * @param op the comparison operation
1119   * @param comparator the comparator
1120   * @param put data to put if check succeeds
1121   * @throws IOException e
1122   */
1123  public boolean postCheckAndPut(final byte [] row, final byte [] family,
1124      final byte [] qualifier, final CompareOperator op,
1125      final ByteArrayComparable comparator, final Put put,
1126      boolean result) throws IOException {
1127    if (this.coprocEnvironments.isEmpty()) {
1128      return result;
1129    }
1130    return execOperationWithResult(
1131        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
1132          @Override
1133          public Boolean call(RegionObserver observer) throws IOException {
1134            return observer.postCheckAndPut(this, row, family, qualifier,
1135                op, comparator, put, getResult());
1136          }
1137        });
1138  }
1139
1140  /**
1141   * Supports Coprocessor 'bypass'.
1142   * @param row row to check
1143   * @param family column family
1144   * @param qualifier column qualifier
1145   * @param op the comparison operation
1146   * @param comparator the comparator
1147   * @param delete delete to commit if check succeeds
1148   * @return true or false to return to client if default processing should be bypassed,
1149   * or null otherwise
1150   */
1151  public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1152      final byte [] qualifier, final CompareOperator op,
1153      final ByteArrayComparable comparator, final Delete delete)
1154      throws IOException {
1155    boolean bypassable = true;
1156    boolean defaultResult = false;
1157    if (coprocEnvironments.isEmpty()) {
1158      return null;
1159    }
1160    return execOperationWithResult(
1161        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1162            defaultResult, bypassable) {
1163          @Override
1164          public Boolean call(RegionObserver observer) throws IOException {
1165            return observer.preCheckAndDelete(this, row, family,
1166                qualifier, op, comparator, delete, getResult());
1167          }
1168        });
1169  }
1170
1171  /**
1172   * Supports Coprocessor 'bypass'.
1173   * @param row row to check
1174   * @param family column family
1175   * @param qualifier column qualifier
1176   * @param op the comparison operation
1177   * @param comparator the comparator
1178   * @param delete delete to commit if check succeeds
1179   * @return true or false to return to client if default processing should be bypassed,
1180   * or null otherwise
1181   */
1182  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
1183      justification="Null is legit")
1184  public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1185      final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
1186      final Delete delete) throws IOException {
1187    boolean bypassable = true;
1188    boolean defaultResult = false;
1189    if (coprocEnvironments.isEmpty()) {
1190      return null;
1191    }
1192    return execOperationWithResult(
1193        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1194            defaultResult, bypassable) {
1195          @Override
1196          public Boolean call(RegionObserver observer) throws IOException {
1197            return observer.preCheckAndDeleteAfterRowLock(this, row,
1198                family, qualifier, op, comparator, delete, getResult());
1199          }
1200        });
1201  }
1202
1203  /**
1204   * @param row row to check
1205   * @param family column family
1206   * @param qualifier column qualifier
1207   * @param op the comparison operation
1208   * @param comparator the comparator
1209   * @param delete delete to commit if check succeeds
1210   * @throws IOException e
1211   */
1212  public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1213      final byte [] qualifier, final CompareOperator op,
1214      final ByteArrayComparable comparator, final Delete delete,
1215      boolean result) throws IOException {
1216    if (this.coprocEnvironments.isEmpty()) {
1217      return result;
1218    }
1219    return execOperationWithResult(
1220        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
1221          @Override
1222          public Boolean call(RegionObserver observer) throws IOException {
1223            return observer.postCheckAndDelete(this, row, family,
1224                qualifier, op, comparator, delete, getResult());
1225          }
1226        });
1227  }
1228
1229  /**
1230   * Supports Coprocessor 'bypass'.
1231   * @param append append object
1232   * @return result to return to client if default operation should be bypassed, null otherwise
1233   * @throws IOException if an error occurred on the coprocessor
1234   */
1235  public Result preAppend(final Append append) throws IOException {
1236    boolean bypassable = true;
1237    Result defaultResult = null;
1238    if (this.coprocEnvironments.isEmpty()) {
1239      return defaultResult;
1240    }
1241    return execOperationWithResult(
1242      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1243            bypassable) {
1244          @Override
1245          public Result call(RegionObserver observer) throws IOException {
1246            return observer.preAppend(this, append);
1247          }
1248        });
1249  }
1250
1251  /**
1252   * Supports Coprocessor 'bypass'.
1253   * @param append append object
1254   * @return result to return to client if default operation should be bypassed, null otherwise
1255   * @throws IOException if an error occurred on the coprocessor
1256   */
1257  public Result preAppendAfterRowLock(final Append append) throws IOException {
1258    boolean bypassable = true;
1259    Result defaultResult = null;
1260    if (this.coprocEnvironments.isEmpty()) {
1261      return defaultResult;
1262    }
1263    return execOperationWithResult(
1264        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter,
1265            defaultResult, bypassable) {
1266          @Override
1267          public Result call(RegionObserver observer) throws IOException {
1268            return observer.preAppendAfterRowLock(this, append);
1269          }
1270        });
1271  }
1272
1273  /**
1274   * Supports Coprocessor 'bypass'.
1275   * @param increment increment object
1276   * @return result to return to client if default operation should be bypassed, null otherwise
1277   * @throws IOException if an error occurred on the coprocessor
1278   */
1279  public Result preIncrement(final Increment increment) throws IOException {
1280    boolean bypassable = true;
1281    Result defaultResult = null;
1282    if (coprocEnvironments.isEmpty()) {
1283      return defaultResult;
1284    }
1285    return execOperationWithResult(
1286        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1287            bypassable) {
1288          @Override
1289          public Result call(RegionObserver observer) throws IOException {
1290            return observer.preIncrement(this, increment);
1291          }
1292        });
1293  }
1294
1295  /**
1296   * Supports Coprocessor 'bypass'.
1297   * @param increment increment object
1298   * @return result to return to client if default operation should be bypassed, null otherwise
1299   * @throws IOException if an error occurred on the coprocessor
1300   */
1301  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1302    boolean bypassable = true;
1303    Result defaultResult = null;
1304    if (coprocEnvironments.isEmpty()) {
1305      return defaultResult;
1306    }
1307    return execOperationWithResult(
1308        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1309            bypassable) {
1310          @Override
1311          public Result call(RegionObserver observer) throws IOException {
1312            return observer.preIncrementAfterRowLock(this, increment);
1313          }
1314        });
1315  }
1316
1317  /**
1318   * @param append Append object
1319   * @param result the result returned by the append
1320   * @throws IOException if an error occurred on the coprocessor
1321   */
1322  public Result postAppend(final Append append, final Result result) throws IOException {
1323    if (this.coprocEnvironments.isEmpty()) {
1324      return result;
1325    }
1326    return execOperationWithResult(
1327        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1328          @Override
1329          public Result call(RegionObserver observer) throws IOException {
1330            return observer.postAppend(this, append, result);
1331          }
1332        });
1333  }
1334
1335  /**
1336   * @param increment increment object
1337   * @param result the result returned by postIncrement
1338   * @throws IOException if an error occurred on the coprocessor
1339   */
1340  public Result postIncrement(final Increment increment, Result result) throws IOException {
1341    if (this.coprocEnvironments.isEmpty()) {
1342      return result;
1343    }
1344    return execOperationWithResult(
1345        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1346          @Override
1347          public Result call(RegionObserver observer) throws IOException {
1348            return observer.postIncrement(this, increment, getResult());
1349          }
1350        });
1351  }
1352
1353  /**
1354   * @param scan the Scan specification
1355   * @exception IOException Exception
1356   */
1357  public void preScannerOpen(final Scan scan) throws IOException {
1358    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1359      @Override
1360      public void call(RegionObserver observer) throws IOException {
1361        observer.preScannerOpen(this, scan);
1362      }
1363    });
1364  }
1365
1366  /**
1367   * @param scan the Scan specification
1368   * @param s the scanner
1369   * @return the scanner instance to use
1370   * @exception IOException Exception
1371   */
1372  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1373    if (this.coprocEnvironments.isEmpty()) {
1374      return s;
1375    }
1376    return execOperationWithResult(
1377        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1378          @Override
1379          public RegionScanner call(RegionObserver observer) throws IOException {
1380            return observer.postScannerOpen(this, scan, getResult());
1381          }
1382        });
1383  }
1384
1385  /**
1386   * @param s the scanner
1387   * @param results the result set returned by the region server
1388   * @param limit the maximum number of results to return
1389   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1390   * @exception IOException Exception
1391   */
1392  public Boolean preScannerNext(final InternalScanner s,
1393      final List<Result> results, final int limit) throws IOException {
1394    boolean bypassable = true;
1395    boolean defaultResult = false;
1396    if (coprocEnvironments.isEmpty()) {
1397      return null;
1398    }
1399    return execOperationWithResult(
1400        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1401            defaultResult, bypassable) {
1402          @Override
1403          public Boolean call(RegionObserver observer) throws IOException {
1404            return observer.preScannerNext(this, s, results, limit, getResult());
1405          }
1406        });
1407  }
1408
1409  /**
1410   * @param s the scanner
1411   * @param results the result set returned by the region server
1412   * @param limit the maximum number of results to return
1413   * @param hasMore
1414   * @return 'has more' indication to give to client
1415   * @exception IOException Exception
1416   */
1417  public boolean postScannerNext(final InternalScanner s,
1418      final List<Result> results, final int limit, boolean hasMore)
1419      throws IOException {
1420    if (this.coprocEnvironments.isEmpty()) {
1421      return hasMore;
1422    }
1423    return execOperationWithResult(
1424        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1425          @Override
1426          public Boolean call(RegionObserver observer) throws IOException {
1427            return observer.postScannerNext(this, s, results, limit, getResult());
1428          }
1429        });
1430  }
1431
1432  /**
1433   * This will be called by the scan flow when the current scanned row is being filtered out by the
1434   * filter.
1435   * @param s the scanner
1436   * @param curRowCell The cell in the current row which got filtered out
1437   * @return whether more rows are available for the scanner or not
1438   * @throws IOException
1439   */
1440  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1441      throws IOException {
1442    // short circuit for performance
1443    boolean defaultResult = true;
1444    if (!hasCustomPostScannerFilterRow) {
1445      return defaultResult;
1446    }
1447    if (this.coprocEnvironments.isEmpty()) {
1448      return defaultResult;
1449    }
1450    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1451        regionObserverGetter, defaultResult) {
1452      @Override
1453      public Boolean call(RegionObserver observer) throws IOException {
1454        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1455      }
1456    });
1457  }
1458
1459  /**
1460   * Supports Coprocessor 'bypass'.
1461   * @param s the scanner
1462   * @return true if default behavior should be bypassed, false otherwise
1463   * @exception IOException Exception
1464   */
1465  // Should this be bypassable?
1466  public boolean preScannerClose(final InternalScanner s) throws IOException {
1467    return execOperation(coprocEnvironments.isEmpty()? null:
1468        new RegionObserverOperationWithoutResult(true) {
1469      @Override
1470      public void call(RegionObserver observer) throws IOException {
1471        observer.preScannerClose(this, s);
1472      }
1473    });
1474  }
1475
1476  /**
1477   * @exception IOException Exception
1478   */
1479  public void postScannerClose(final InternalScanner s) throws IOException {
1480    execOperation(coprocEnvironments.isEmpty()? null:
1481        new RegionObserverOperationWithoutResult() {
1482      @Override
1483      public void call(RegionObserver observer) throws IOException {
1484        observer.postScannerClose(this, s);
1485      }
1486    });
1487  }
1488
1489  /**
1490   * Called before open store scanner for user scan.
1491   */
1492  public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
1493    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1494    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
1495    execOperation(new RegionObserverOperationWithoutResult() {
1496      @Override
1497      public void call(RegionObserver observer) throws IOException {
1498        observer.preStoreScannerOpen(this, store, builder);
1499      }
1500    });
1501    return builder.build();
1502  }
1503
1504  /**
1505   * @param info the RegionInfo for this region
1506   * @param edits the file of recovered edits
1507   */
1508  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1509    execOperation(coprocEnvironments.isEmpty()? null:
1510        new RegionObserverOperationWithoutResult(true) {
1511      @Override
1512      public void call(RegionObserver observer) throws IOException {
1513        observer.preReplayWALs(this, info, edits);
1514      }
1515    });
1516  }
1517
1518  /**
1519   * @param info the RegionInfo for this region
1520   * @param edits the file of recovered edits
1521   * @throws IOException Exception
1522   */
1523  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1524    execOperation(coprocEnvironments.isEmpty()? null:
1525        new RegionObserverOperationWithoutResult() {
1526      @Override
1527      public void call(RegionObserver observer) throws IOException {
1528        observer.postReplayWALs(this, info, edits);
1529      }
1530    });
1531  }
1532
1533  /**
1534   * Supports Coprocessor 'bypass'.
1535   * @return true if default behavior should be bypassed, false otherwise
1536   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1537   * with something that doesn't expose IntefaceAudience.Private classes.
1538   */
1539  @Deprecated
1540  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1541      throws IOException {
1542    return execOperation(coprocEnvironments.isEmpty()? null:
1543        new RegionObserverOperationWithoutResult(true) {
1544      @Override
1545      public void call(RegionObserver observer) throws IOException {
1546        observer.preWALRestore(this, info, logKey, logEdit);
1547      }
1548    });
1549  }
1550
1551  /**
1552   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1553   * with something that doesn't expose IntefaceAudience.Private classes.
1554   */
1555  @Deprecated
1556  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1557      throws IOException {
1558    execOperation(coprocEnvironments.isEmpty()? null:
1559        new RegionObserverOperationWithoutResult() {
1560      @Override
1561      public void call(RegionObserver observer) throws IOException {
1562        observer.postWALRestore(this, info, logKey, logEdit);
1563      }
1564    });
1565  }
1566
1567  /**
1568   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1569   */
1570  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1571    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1572      @Override
1573      public void call(RegionObserver observer) throws IOException {
1574        observer.preBulkLoadHFile(this, familyPaths);
1575      }
1576    });
1577  }
1578
1579  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1580      throws IOException {
1581    return execOperation(coprocEnvironments.isEmpty()? null:
1582        new RegionObserverOperationWithoutResult() {
1583      @Override
1584      public void call(RegionObserver observer) throws IOException {
1585        observer.preCommitStoreFile(this, family, pairs);
1586      }
1587    });
1588  }
1589
1590  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
1591    execOperation(coprocEnvironments.isEmpty()? null:
1592        new RegionObserverOperationWithoutResult() {
1593      @Override
1594      public void call(RegionObserver observer) throws IOException {
1595        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1596      }
1597    });
1598  }
1599
1600  /**
1601   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1602   * @param map Map of CF to List of file paths for the final loaded files
1603   * @throws IOException
1604   */
1605  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1606      Map<byte[], List<Path>> map) throws IOException {
1607    if (this.coprocEnvironments.isEmpty()) {
1608      return;
1609    }
1610    execOperation(coprocEnvironments.isEmpty()? null:
1611        new RegionObserverOperationWithoutResult() {
1612          @Override
1613          public void call(RegionObserver observer) throws IOException {
1614            observer.postBulkLoadHFile(this, familyPaths, map);
1615          }
1616        });
1617  }
1618
1619  public void postStartRegionOperation(final Operation op) throws IOException {
1620    execOperation(coprocEnvironments.isEmpty()? null:
1621        new RegionObserverOperationWithoutResult() {
1622      @Override
1623      public void call(RegionObserver observer) throws IOException {
1624        observer.postStartRegionOperation(this, op);
1625      }
1626    });
1627  }
1628
1629  public void postCloseRegionOperation(final Operation op) throws IOException {
1630    execOperation(coprocEnvironments.isEmpty()? null:
1631        new RegionObserverOperationWithoutResult() {
1632      @Override
1633      public void call(RegionObserver observer) throws IOException {
1634        observer.postCloseRegionOperation(this, op);
1635      }
1636    });
1637  }
1638
1639  /**
1640   * @param fs fileystem to read from
1641   * @param p path to the file
1642   * @param in {@link FSDataInputStreamWrapper}
1643   * @param size Full size of the file
1644   * @param cacheConf
1645   * @param r original reference file. This will be not null only when reading a split file.
1646   * @return a Reader instance to use instead of the base reader if overriding
1647   * default behavior, null otherwise
1648   * @throws IOException
1649   */
1650  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1651      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1652      final Reference r) throws IOException {
1653    if (coprocEnvironments.isEmpty()) {
1654      return null;
1655    }
1656    return execOperationWithResult(
1657        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1658          @Override
1659          public StoreFileReader call(RegionObserver observer) throws IOException {
1660            return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1661                getResult());
1662          }
1663        });
1664  }
1665
1666  /**
1667   * @param fs fileystem to read from
1668   * @param p path to the file
1669   * @param in {@link FSDataInputStreamWrapper}
1670   * @param size Full size of the file
1671   * @param cacheConf
1672   * @param r original reference file. This will be not null only when reading a split file.
1673   * @param reader the base reader instance
1674   * @return The reader to use
1675   * @throws IOException
1676   */
1677  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1678      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1679      final Reference r, final StoreFileReader reader) throws IOException {
1680    if (this.coprocEnvironments.isEmpty()) {
1681      return reader;
1682    }
1683    return execOperationWithResult(
1684        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
1685          @Override
1686          public StoreFileReader call(RegionObserver observer) throws IOException {
1687            return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1688                getResult());
1689          }
1690        });
1691  }
1692
1693  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1694      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1695    if (this.coprocEnvironments.isEmpty()) {
1696      return cellPairs;
1697    }
1698    return execOperationWithResult(
1699        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1700            regionObserverGetter, cellPairs) {
1701          @Override
1702          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1703            return observer.postIncrementBeforeWAL(this, mutation, getResult());
1704          }
1705        });
1706  }
1707
1708  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1709      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1710    if (this.coprocEnvironments.isEmpty()) {
1711      return cellPairs;
1712    }
1713    return execOperationWithResult(
1714        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1715            regionObserverGetter, cellPairs) {
1716          @Override
1717          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1718            return observer.postAppendBeforeWAL(this, mutation, getResult());
1719          }
1720        });
1721  }
1722
1723  public Message preEndpointInvocation(final Service service, final String methodName,
1724      Message request) throws IOException {
1725    if (coprocEnvironments.isEmpty()) {
1726      return request;
1727    }
1728    return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver,
1729        Message>(endpointObserverGetter, request) {
1730      @Override
1731      public Message call(EndpointObserver observer) throws IOException {
1732        return observer.preEndpointInvocation(this, service, methodName, getResult());
1733      }
1734    });
1735  }
1736
1737  public void postEndpointInvocation(final Service service, final String methodName,
1738      final Message request, final Message.Builder responseBuilder) throws IOException {
1739    execOperation(coprocEnvironments.isEmpty() ? null :
1740        new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1741          @Override
1742          public void call(EndpointObserver observer) throws IOException {
1743            observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1744          }
1745        });
1746  }
1747
1748  /**
1749   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1750   */
1751  @Deprecated
1752  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1753    if (this.coprocEnvironments.isEmpty()) {
1754      return result;
1755    }
1756    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>(
1757        regionObserverGetter, result) {
1758      @Override
1759      public DeleteTracker call(RegionObserver observer) throws IOException {
1760        return observer.postInstantiateDeleteTracker(this, getResult());
1761      }
1762    });
1763  }
1764
1765  /////////////////////////////////////////////////////////////////////////////////////////////////
1766  // BulkLoadObserver hooks
1767  /////////////////////////////////////////////////////////////////////////////////////////////////
1768  public void prePrepareBulkLoad(User user) throws IOException {
1769    execOperation(coprocEnvironments.isEmpty() ? null :
1770        new BulkLoadObserverOperation(user) {
1771          @Override protected void call(BulkLoadObserver observer) throws IOException {
1772            observer.prePrepareBulkLoad(this);
1773          }
1774        });
1775  }
1776
1777  public void preCleanupBulkLoad(User user) throws IOException {
1778    execOperation(coprocEnvironments.isEmpty() ? null :
1779        new BulkLoadObserverOperation(user) {
1780          @Override protected void call(BulkLoadObserver observer) throws IOException {
1781            observer.preCleanupBulkLoad(this);
1782          }
1783        });
1784  }
1785}