001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import com.google.protobuf.Message;
022import com.google.protobuf.Service;
023
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.UUID;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.RawCellBuilder;
040import org.apache.hadoop.hbase.RawCellBuilderFactory;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.SharedConnection;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.CheckAndMutate;
045import org.apache.hadoop.hbase.client.CheckAndMutateResult;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.Delete;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Increment;
051import org.apache.hadoop.hbase.client.Mutation;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.RegionInfo;
054import org.apache.hadoop.hbase.client.Result;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
058import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
059import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
062import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
063import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
064import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
065import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
066import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
067import org.apache.hadoop.hbase.coprocessor.ObserverContext;
068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
069import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
070import org.apache.hadoop.hbase.coprocessor.RegionObserver;
071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
072import org.apache.hadoop.hbase.io.Reference;
073import org.apache.hadoop.hbase.io.hfile.CacheConfig;
074import org.apache.hadoop.hbase.metrics.MetricRegistry;
075import org.apache.hadoop.hbase.regionserver.Region.Operation;
076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
079import org.apache.hadoop.hbase.security.User;
080import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WALEdit;
083import org.apache.hadoop.hbase.wal.WALKey;
084import org.apache.yetus.audience.InterfaceAudience;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
090
091/**
092 * Implements the coprocessor environment and runtime support for coprocessors
093 * loaded within a {@link Region}.
094 */
095@InterfaceAudience.Private
096public class RegionCoprocessorHost
097    extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
098
099  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
100  // The shared data map
101  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
102      new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
103          AbstractReferenceMap.ReferenceStrength.WEAK);
104
105  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
106  private final boolean hasCustomPostScannerFilterRow;
107
108  /**
109   *
110   * Encapsulation of the environment of each coprocessor
111   */
112  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
113      implements RegionCoprocessorEnvironment {
114    private Region region;
115    ConcurrentMap<String, Object> sharedData;
116    private final MetricRegistry metricRegistry;
117    private final RegionServerServices services;
118
119    /**
120     * Constructor
121     * @param impl the coprocessor instance
122     * @param priority chaining priority
123     */
124    public RegionEnvironment(final RegionCoprocessor impl, final int priority,
125        final int seq, final Configuration conf, final Region region,
126        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
127      super(impl, priority, seq, conf);
128      this.region = region;
129      this.sharedData = sharedData;
130      this.services = services;
131      this.metricRegistry =
132          MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
133    }
134
135    /** @return the region */
136    @Override
137    public Region getRegion() {
138      return region;
139    }
140
141    @Override
142    public OnlineRegions getOnlineRegions() {
143      return this.services;
144    }
145
146    @Override
147    public Connection getConnection() {
148      // Mocks may have services as null at test time.
149      return services != null ? new SharedConnection(services.getConnection()) : null;
150    }
151
152    @Override
153    public Connection createConnection(Configuration conf) throws IOException {
154      return services != null ? this.services.createConnection(conf) : null;
155    }
156
157    @Override
158    public ServerName getServerName() {
159      return services != null? services.getServerName(): null;
160    }
161
162    @Override
163    public void shutdown() {
164      super.shutdown();
165      MetricsCoprocessor.removeRegistry(this.metricRegistry);
166    }
167
168    @Override
169    public ConcurrentMap<String, Object> getSharedData() {
170      return sharedData;
171    }
172
173    @Override
174    public RegionInfo getRegionInfo() {
175      return region.getRegionInfo();
176    }
177
178    @Override
179    public MetricRegistry getMetricRegistryForRegionServer() {
180      return metricRegistry;
181    }
182
183    @Override
184    public RawCellBuilder getCellBuilder() {
185      // We always do a DEEP_COPY only
186      return RawCellBuilderFactory.create();
187    }
188  }
189
190  /**
191   * Special version of RegionEnvironment that exposes RegionServerServices for Core
192   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
193   */
194  private static class RegionEnvironmentForCoreCoprocessors extends
195      RegionEnvironment implements HasRegionServerServices {
196    private final RegionServerServices rsServices;
197
198    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
199      final int seq, final Configuration conf, final Region region,
200      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
201      super(impl, priority, seq, conf, region, services, sharedData);
202      this.rsServices = services;
203    }
204
205    /**
206     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
207     * consumption.
208     */
209    @Override
210    public RegionServerServices getRegionServerServices() {
211      return this.rsServices;
212    }
213  }
214
215  static class TableCoprocessorAttribute {
216    private Path path;
217    private String className;
218    private int priority;
219    private Configuration conf;
220
221    public TableCoprocessorAttribute(Path path, String className, int priority,
222        Configuration conf) {
223      this.path = path;
224      this.className = className;
225      this.priority = priority;
226      this.conf = conf;
227    }
228
229    public Path getPath() {
230      return path;
231    }
232
233    public String getClassName() {
234      return className;
235    }
236
237    public int getPriority() {
238      return priority;
239    }
240
241    public Configuration getConf() {
242      return conf;
243    }
244  }
245
246  /** The region server services */
247  RegionServerServices rsServices;
248  /** The region */
249  HRegion region;
250
251  /**
252   * Constructor
253   * @param region the region
254   * @param rsServices interface to available region server functionality
255   * @param conf the configuration
256   */
257  public RegionCoprocessorHost(final HRegion region,
258      final RegionServerServices rsServices, final Configuration conf) {
259    super(rsServices);
260    this.conf = conf;
261    this.rsServices = rsServices;
262    this.region = region;
263    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
264
265    // load system default cp's from configuration.
266    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
267
268    // load system default cp's for user tables from configuration.
269    if (!region.getRegionInfo().getTable().isSystemTable()) {
270      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
271    }
272
273    // load Coprocessor From HDFS
274    loadTableCoprocessors(conf);
275
276    // now check whether any coprocessor implements postScannerFilterRow
277    boolean hasCustomPostScannerFilterRow = false;
278    out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
279      if (env.getInstance() instanceof RegionObserver) {
280        Class<?> clazz = env.getInstance().getClass();
281        for(;;) {
282          if (clazz == null) {
283            // we must have directly implemented RegionObserver
284            hasCustomPostScannerFilterRow = true;
285            break out;
286          }
287          try {
288            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
289              InternalScanner.class, Cell.class, boolean.class);
290            // this coprocessor has a custom version of postScannerFilterRow
291            hasCustomPostScannerFilterRow = true;
292            break out;
293          } catch (NoSuchMethodException ignore) {
294          }
295          // the deprecated signature still exists
296          try {
297            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
298              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
299            // this coprocessor has a custom version of postScannerFilterRow
300            hasCustomPostScannerFilterRow = true;
301            break out;
302          } catch (NoSuchMethodException ignore) {
303          }
304          clazz = clazz.getSuperclass();
305        }
306      }
307    }
308    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
309  }
310
311  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
312      TableDescriptor htd) {
313    return htd.getCoprocessorDescriptors().stream().map(cp -> {
314      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
315      Configuration ourConf;
316      if (!cp.getProperties().isEmpty()) {
317        // do an explicit deep copy of the passed configuration
318        ourConf = new Configuration(false);
319        HBaseConfiguration.merge(ourConf, conf);
320        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
321      } else {
322        ourConf = conf;
323      }
324      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
325    }).collect(Collectors.toList());
326  }
327
328  /**
329   * Sanity check the table coprocessor attributes of the supplied schema. Will
330   * throw an exception if there is a problem.
331   * @param conf
332   * @param htd
333   * @throws IOException
334   */
335  public static void testTableCoprocessorAttrs(final Configuration conf,
336      final TableDescriptor htd) throws IOException {
337    String pathPrefix = UUID.randomUUID().toString();
338    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
339      if (attr.getPriority() < 0) {
340        throw new IOException("Priority for coprocessor " + attr.getClassName() +
341          " cannot be less than 0");
342      }
343      ClassLoader old = Thread.currentThread().getContextClassLoader();
344      try {
345        ClassLoader cl;
346        if (attr.getPath() != null) {
347          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
348            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
349        } else {
350          cl = CoprocessorHost.class.getClassLoader();
351        }
352        Thread.currentThread().setContextClassLoader(cl);
353        if (cl instanceof CoprocessorClassLoader) {
354          String[] includedClassPrefixes = null;
355          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
356            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
357            includedClassPrefixes = prefixes.split(";");
358          }
359          ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes);
360        } else {
361          cl.loadClass(attr.getClassName());
362        }
363      } catch (ClassNotFoundException e) {
364        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
365      } finally {
366        Thread.currentThread().setContextClassLoader(old);
367      }
368    }
369  }
370
371  void loadTableCoprocessors(final Configuration conf) {
372    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
373      DEFAULT_COPROCESSORS_ENABLED);
374    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
375      DEFAULT_USER_COPROCESSORS_ENABLED);
376    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
377      return;
378    }
379
380    // scan the table attributes for coprocessor load specifications
381    // initialize the coprocessors
382    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
383    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
384        region.getTableDescriptor())) {
385      // Load encompasses classloading and coprocessor initialization
386      try {
387        RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
388            attr.getPriority(), attr.getConf());
389        if (env == null) {
390          continue;
391        }
392        configured.add(env);
393        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
394            region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
395      } catch (Throwable t) {
396        // Coprocessor failed to load, do we abort on error?
397        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
398          abortServer(attr.getClassName(), t);
399        } else {
400          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
401        }
402      }
403    }
404    // add together to coprocessor set for COW efficiency
405    coprocEnvironments.addAll(configured);
406  }
407
408  @Override
409  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
410      Configuration conf) {
411    // If coprocessor exposes any services, register them.
412    for (Service service : instance.getServices()) {
413      region.registerService(service);
414    }
415    ConcurrentMap<String, Object> classData;
416    // make sure only one thread can add maps
417    synchronized (SHARED_DATA_MAP) {
418      // as long as at least one RegionEnvironment holds on to its classData it will
419      // remain in this map
420      classData =
421          SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
422              k -> new ConcurrentHashMap<>());
423    }
424    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
425    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
426        new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
427            rsServices, classData):
428        new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
429  }
430
431  @Override
432  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
433      throws InstantiationException, IllegalAccessException {
434    try {
435      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
436        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
437      } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
438        // For backward compatibility with old CoprocessorService impl which don't extend
439        // RegionCoprocessor.
440        CoprocessorService cs;
441        cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance();
442        return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs);
443      } else {
444        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
445            implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
446        return null;
447      }
448    } catch (NoSuchMethodException | InvocationTargetException e) {
449      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
450    }
451  }
452
453  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
454      RegionCoprocessor::getRegionObserver;
455
456  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
457      RegionCoprocessor::getEndpointObserver;
458
459  abstract class RegionObserverOperationWithoutResult extends
460      ObserverOperationWithoutResult<RegionObserver> {
461    public RegionObserverOperationWithoutResult() {
462      super(regionObserverGetter);
463    }
464
465    public RegionObserverOperationWithoutResult(User user) {
466      super(regionObserverGetter, user);
467    }
468
469    public RegionObserverOperationWithoutResult(boolean bypassable) {
470      super(regionObserverGetter, null, bypassable);
471    }
472
473    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
474      super(regionObserverGetter, user, bypassable);
475    }
476  }
477
478  abstract class BulkLoadObserverOperation extends
479      ObserverOperationWithoutResult<BulkLoadObserver> {
480    public BulkLoadObserverOperation(User user) {
481      super(RegionCoprocessor::getBulkLoadObserver, user);
482    }
483  }
484
485
486  //////////////////////////////////////////////////////////////////////////////////////////////////
487  // Observer operations
488  //////////////////////////////////////////////////////////////////////////////////////////////////
489
490  //////////////////////////////////////////////////////////////////////////////////////////////////
491  // Observer operations
492  //////////////////////////////////////////////////////////////////////////////////////////////////
493
494  /**
495   * Invoked before a region open.
496   *
497   * @throws IOException Signals that an I/O exception has occurred.
498   */
499  public void preOpen() throws IOException {
500    if (coprocEnvironments.isEmpty()) {
501      return;
502    }
503    execOperation(new RegionObserverOperationWithoutResult() {
504      @Override
505      public void call(RegionObserver observer) throws IOException {
506        observer.preOpen(this);
507      }
508    });
509  }
510
511
512  /**
513   * Invoked after a region open
514   */
515  public void postOpen() {
516    if (coprocEnvironments.isEmpty()) {
517      return;
518    }
519    try {
520      execOperation(new RegionObserverOperationWithoutResult() {
521        @Override
522        public void call(RegionObserver observer) throws IOException {
523          observer.postOpen(this);
524        }
525      });
526    } catch (IOException e) {
527      LOG.warn(e.toString(), e);
528    }
529  }
530
531  /**
532   * Invoked before a region is closed
533   * @param abortRequested true if the server is aborting
534   */
535  public void preClose(final boolean abortRequested) throws IOException {
536    execOperation(new RegionObserverOperationWithoutResult() {
537      @Override
538      public void call(RegionObserver observer) throws IOException {
539        observer.preClose(this, abortRequested);
540      }
541    });
542  }
543
544  /**
545   * Invoked after a region is closed
546   * @param abortRequested true if the server is aborting
547   */
548  public void postClose(final boolean abortRequested) {
549    try {
550      execOperation(new RegionObserverOperationWithoutResult() {
551        @Override
552        public void call(RegionObserver observer) throws IOException {
553          observer.postClose(this, abortRequested);
554        }
555
556        @Override
557        public void postEnvCall() {
558          shutdown(this.getEnvironment());
559        }
560      });
561    } catch (IOException e) {
562      LOG.warn(e.toString(), e);
563    }
564  }
565
566  /**
567   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
568   * available candidates.
569   * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
570   * the passed in <code>candidates</code>.
571   * @param store The store where compaction is being requested
572   * @param candidates The currently available store files
573   * @param tracker used to track the life cycle of a compaction
574   * @param user the user
575   * @throws IOException
576   */
577  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
578      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
579    if (coprocEnvironments.isEmpty()) {
580      return false;
581    }
582    boolean bypassable = true;
583    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
584      @Override
585      public void call(RegionObserver observer) throws IOException {
586        observer.preCompactSelection(this, store, candidates, tracker);
587      }
588    });
589  }
590
591  /**
592   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
593   * candidates.
594   * @param store The store where compaction is being requested
595   * @param selected The store files selected to compact
596   * @param tracker used to track the life cycle of a compaction
597   * @param request the compaction request
598   * @param user the user
599   */
600  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
601      final CompactionLifeCycleTracker tracker, final CompactionRequest request,
602      final User user) throws IOException {
603    if (coprocEnvironments.isEmpty()) {
604      return;
605    }
606    execOperation(new RegionObserverOperationWithoutResult(user) {
607      @Override
608      public void call(RegionObserver observer) throws IOException {
609        observer.postCompactSelection(this, store, selected, tracker, request);
610      }
611    });
612  }
613
614  /**
615   * Called prior to opening store scanner for compaction.
616   */
617  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
618      CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
619    if (coprocEnvironments.isEmpty()) {
620      return store.getScanInfo();
621    }
622    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
623    execOperation(new RegionObserverOperationWithoutResult(user) {
624      @Override
625      public void call(RegionObserver observer) throws IOException {
626        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
627      }
628    });
629    return builder.build();
630  }
631
632  /**
633   * Called prior to rewriting the store files selected for compaction
634   * @param store the store being compacted
635   * @param scanner the scanner used to read store data during compaction
636   * @param scanType type of Scan
637   * @param tracker used to track the life cycle of a compaction
638   * @param request the compaction request
639   * @param user the user
640   * @return Scanner to use (cannot be null!)
641   * @throws IOException
642   */
643  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
644      final ScanType scanType, final CompactionLifeCycleTracker tracker,
645      final CompactionRequest request, final User user) throws IOException {
646    InternalScanner defaultResult = scanner;
647    if (coprocEnvironments.isEmpty()) {
648      return defaultResult;
649    }
650    return execOperationWithResult(
651        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter,
652            defaultResult, user) {
653          @Override
654          public InternalScanner call(RegionObserver observer) throws IOException {
655            InternalScanner scanner =
656                observer.preCompact(this, store, getResult(), scanType, tracker, request);
657            if (scanner == null) {
658              throw new CoprocessorException("Null Scanner return disallowed!");
659            }
660            return scanner;
661          }
662        });
663  }
664
665  /**
666   * Called after the store compaction has completed.
667   * @param store the store being compacted
668   * @param resultFile the new store file written during compaction
669   * @param tracker used to track the life cycle of a compaction
670   * @param request the compaction request
671   * @param user the user
672   * @throws IOException
673   */
674  public void postCompact(final HStore store, final HStoreFile resultFile,
675      final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
676      throws IOException {
677    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) {
678      @Override
679      public void call(RegionObserver observer) throws IOException {
680        observer.postCompact(this, store, resultFile, tracker, request);
681      }
682    });
683  }
684
685  /**
686   * Invoked before create StoreScanner for flush.
687   */
688  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
689      throws IOException {
690    if (coprocEnvironments.isEmpty()) {
691      return store.getScanInfo();
692    }
693    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
694    execOperation(new RegionObserverOperationWithoutResult() {
695      @Override
696      public void call(RegionObserver observer) throws IOException {
697        observer.preFlushScannerOpen(this, store, builder, tracker);
698      }
699    });
700    return builder.build();
701  }
702
703  /**
704   * Invoked before a memstore flush
705   * @return Scanner to use (cannot be null!)
706   * @throws IOException
707   */
708  public InternalScanner preFlush(HStore store, InternalScanner scanner,
709      FlushLifeCycleTracker tracker) throws IOException {
710    if (coprocEnvironments.isEmpty()) {
711      return scanner;
712    }
713    return execOperationWithResult(
714        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
715          @Override
716          public InternalScanner call(RegionObserver observer) throws IOException {
717            InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
718            if (scanner == null) {
719              throw new CoprocessorException("Null Scanner return disallowed!");
720            }
721            return scanner;
722          }
723        });
724  }
725
726  /**
727   * Invoked before a memstore flush
728   * @throws IOException
729   */
730  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
731    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
732      @Override
733      public void call(RegionObserver observer) throws IOException {
734        observer.preFlush(this, tracker);
735      }
736    });
737  }
738
739  /**
740   * Invoked after a memstore flush
741   * @throws IOException
742   */
743  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
744    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
745      @Override
746      public void call(RegionObserver observer) throws IOException {
747        observer.postFlush(this, tracker);
748      }
749    });
750  }
751
752  /**
753   * Invoked before in memory compaction.
754   */
755  public void preMemStoreCompaction(HStore store) throws IOException {
756    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
757      @Override
758      public void call(RegionObserver observer) throws IOException {
759        observer.preMemStoreCompaction(this, store);
760      }
761    });
762  }
763
764  /**
765   * Invoked before create StoreScanner for in memory compaction.
766   */
767  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
768    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
769    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
770      @Override
771      public void call(RegionObserver observer) throws IOException {
772        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
773      }
774    });
775    return builder.build();
776  }
777
778  /**
779   * Invoked before compacting memstore.
780   */
781  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
782      throws IOException {
783    if (coprocEnvironments.isEmpty()) {
784      return scanner;
785    }
786    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
787        regionObserverGetter, scanner) {
788      @Override
789      public InternalScanner call(RegionObserver observer) throws IOException {
790        return observer.preMemStoreCompactionCompact(this, store, getResult());
791      }
792    });
793  }
794
795  /**
796   * Invoked after in memory compaction.
797   */
798  public void postMemStoreCompaction(HStore store) throws IOException {
799    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
800      @Override
801      public void call(RegionObserver observer) throws IOException {
802        observer.postMemStoreCompaction(this, store);
803      }
804    });
805  }
806
807  /**
808   * Invoked after a memstore flush
809   * @throws IOException
810   */
811  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
812      throws IOException {
813    if (coprocEnvironments.isEmpty()) {
814      return;
815    }
816    execOperation(new RegionObserverOperationWithoutResult() {
817      @Override
818      public void call(RegionObserver observer) throws IOException {
819        observer.postFlush(this, store, storeFile, tracker);
820      }
821    });
822  }
823
824  // RegionObserver support
825  /**
826   * Supports Coprocessor 'bypass'.
827   * @param get the Get request
828   * @param results What to return if return is true/'bypass'.
829   * @return true if default processing should be bypassed.
830   * @exception IOException Exception
831   */
832  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
833    if (coprocEnvironments.isEmpty()) {
834      return false;
835    }
836    boolean bypassable = true;
837    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
838      @Override
839      public void call(RegionObserver observer) throws IOException {
840        observer.preGetOp(this, get, results);
841      }
842    });
843  }
844
845  /**
846   * @param get the Get request
847   * @param results the result set
848   * @exception IOException Exception
849   */
850  public void postGet(final Get get, final List<Cell> results)
851      throws IOException {
852    if (coprocEnvironments.isEmpty()) {
853      return;
854    }
855    execOperation(new RegionObserverOperationWithoutResult() {
856      @Override
857      public void call(RegionObserver observer) throws IOException {
858        observer.postGetOp(this, get, results);
859      }
860    });
861  }
862
863  /**
864   * Supports Coprocessor 'bypass'.
865   * @param get the Get request
866   * @return true or false to return to client if bypassing normal operation, or null otherwise
867   * @exception IOException Exception
868   */
869  public Boolean preExists(final Get get) throws IOException {
870    boolean bypassable = true;
871    boolean defaultResult = false;
872    if (coprocEnvironments.isEmpty()) {
873      return null;
874    }
875    return execOperationWithResult(
876        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
877            defaultResult, bypassable) {
878          @Override
879          public Boolean call(RegionObserver observer) throws IOException {
880            return observer.preExists(this, get, getResult());
881          }
882        });
883  }
884
885  /**
886   * @param get the Get request
887   * @param result the result returned by the region server
888   * @return the result to return to the client
889   * @exception IOException Exception
890   */
891  public boolean postExists(final Get get, boolean result)
892      throws IOException {
893    if (this.coprocEnvironments.isEmpty()) {
894      return result;
895    }
896    return execOperationWithResult(
897        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
898          @Override
899          public Boolean call(RegionObserver observer) throws IOException {
900            return observer.postExists(this, get, getResult());
901          }
902        });
903  }
904
905  /**
906   * Supports Coprocessor 'bypass'.
907   * @param put The Put object
908   * @param edit The WALEdit object.
909   * @param durability The durability used
910   * @return true if default processing should be bypassed
911   * @exception IOException Exception
912   */
913  public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
914      throws IOException {
915    if (coprocEnvironments.isEmpty()) {
916      return false;
917    }
918    boolean bypassable = true;
919    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
920      @Override
921      public void call(RegionObserver observer) throws IOException {
922        observer.prePut(this, put, edit, durability);
923      }
924    });
925  }
926
927  /**
928   * Supports Coprocessor 'bypass'.
929   * @param mutation - the current mutation
930   * @param kv - the current cell
931   * @param byteNow - current timestamp in bytes
932   * @param get - the get that could be used
933   * Note that the get only does not specify the family and qualifier that should be used
934   * @return true if default processing should be bypassed
935   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
936   * Coprocessor for its needs only. Will be removed.
937   */
938  @Deprecated
939  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
940      final Cell kv, final byte[] byteNow, final Get get) throws IOException {
941    if (coprocEnvironments.isEmpty()) {
942      return false;
943    }
944    boolean bypassable = true;
945    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
946      @Override
947      public void call(RegionObserver observer) throws IOException {
948          observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
949      }
950    });
951  }
952
953  /**
954   * @param put The Put object
955   * @param edit The WALEdit object.
956   * @param durability The durability used
957   * @exception IOException Exception
958   */
959  public void postPut(final Put put, final WALEdit edit, final Durability durability)
960      throws IOException {
961    if (coprocEnvironments.isEmpty()) {
962      return;
963    }
964    execOperation(new RegionObserverOperationWithoutResult() {
965      @Override
966      public void call(RegionObserver observer) throws IOException {
967        observer.postPut(this, put, edit, durability);
968      }
969    });
970  }
971
972  /**
973   * Supports Coprocessor 'bypass'.
974   * @param delete The Delete object
975   * @param edit The WALEdit object.
976   * @param durability The durability used
977   * @return true if default processing should be bypassed
978   * @exception IOException Exception
979   */
980  public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
981      throws IOException {
982    if (this.coprocEnvironments.isEmpty()) {
983      return false;
984    }
985    boolean bypassable = true;
986    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
987      @Override
988      public void call(RegionObserver observer) throws IOException {
989         observer.preDelete(this, delete, edit, durability);
990      }
991    });
992  }
993
994  /**
995   * @param delete The Delete object
996   * @param edit The WALEdit object.
997   * @param durability The durability used
998   * @exception IOException Exception
999   */
1000  public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
1001      throws IOException {
1002    execOperation(coprocEnvironments.isEmpty()? null:
1003        new RegionObserverOperationWithoutResult() {
1004      @Override
1005      public void call(RegionObserver observer) throws IOException {
1006        observer.postDelete(this, delete, edit, durability);
1007      }
1008    });
1009  }
1010
1011  public void preBatchMutate(
1012      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1013    if(this.coprocEnvironments.isEmpty()) {
1014      return;
1015    }
1016    execOperation(new RegionObserverOperationWithoutResult() {
1017      @Override
1018      public void call(RegionObserver observer) throws IOException {
1019        observer.preBatchMutate(this, miniBatchOp);
1020      }
1021    });
1022  }
1023
1024  public void postBatchMutate(
1025      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1026    if (this.coprocEnvironments.isEmpty()) {
1027      return;
1028    }
1029    execOperation(new RegionObserverOperationWithoutResult() {
1030      @Override
1031      public void call(RegionObserver observer) throws IOException {
1032        observer.postBatchMutate(this, miniBatchOp);
1033      }
1034    });
1035  }
1036
1037  public void postBatchMutateIndispensably(
1038      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1039      throws IOException {
1040    if (this.coprocEnvironments.isEmpty()) {
1041      return;
1042    }
1043    execOperation(new RegionObserverOperationWithoutResult() {
1044      @Override
1045      public void call(RegionObserver observer) throws IOException {
1046        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1047      }
1048    });
1049  }
1050
1051  /**
1052   * Supports Coprocessor 'bypass'.
1053   * @param checkAndMutate the CheckAndMutate object
1054   * @return true or false to return to client if default processing should be bypassed, or null
1055   *   otherwise
1056   * @throws IOException if an error occurred on the coprocessor
1057   */
1058  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
1059    throws IOException {
1060    boolean bypassable = true;
1061    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1062    if (coprocEnvironments.isEmpty()) {
1063      return null;
1064    }
1065    return execOperationWithResult(
1066      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1067        regionObserverGetter, defaultResult, bypassable) {
1068        @Override
1069        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1070          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1071        }
1072      });
1073  }
1074
1075  /**
1076   * Supports Coprocessor 'bypass'.
1077   * @param checkAndMutate the CheckAndMutate object
1078   * @return true or false to return to client if default processing should be bypassed, or null
1079   *   otherwise
1080   * @throws IOException if an error occurred on the coprocessor
1081   */
1082  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1083    throws IOException {
1084    boolean bypassable = true;
1085    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1086    if (coprocEnvironments.isEmpty()) {
1087      return null;
1088    }
1089    return execOperationWithResult(
1090      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1091        regionObserverGetter, defaultResult, bypassable) {
1092        @Override
1093        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1094          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1095        }
1096      });
1097  }
1098
1099  /**
1100   * @param checkAndMutate the CheckAndMutate object
1101   * @param result the result returned by the checkAndMutate
1102   * @return true or false to return to client if default processing should be bypassed, or null
1103   *   otherwise
1104   * @throws IOException if an error occurred on the coprocessor
1105   */
1106  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1107    CheckAndMutateResult result) throws IOException {
1108    if (this.coprocEnvironments.isEmpty()) {
1109      return result;
1110    }
1111    return execOperationWithResult(
1112      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1113        regionObserverGetter, result) {
1114        @Override
1115        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1116          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1117        }
1118      });
1119  }
1120
1121  /**
1122   * Supports Coprocessor 'bypass'.
1123   * @param append append object
1124   * @return result to return to client if default operation should be bypassed, null otherwise
1125   * @throws IOException if an error occurred on the coprocessor
1126   */
1127  public Result preAppend(final Append append) throws IOException {
1128    boolean bypassable = true;
1129    Result defaultResult = null;
1130    if (this.coprocEnvironments.isEmpty()) {
1131      return defaultResult;
1132    }
1133    return execOperationWithResult(
1134      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1135            bypassable) {
1136          @Override
1137          public Result call(RegionObserver observer) throws IOException {
1138            return observer.preAppend(this, append);
1139          }
1140        });
1141  }
1142
1143  /**
1144   * Supports Coprocessor 'bypass'.
1145   * @param append append object
1146   * @return result to return to client if default operation should be bypassed, null otherwise
1147   * @throws IOException if an error occurred on the coprocessor
1148   */
1149  public Result preAppendAfterRowLock(final Append append) throws IOException {
1150    boolean bypassable = true;
1151    Result defaultResult = null;
1152    if (this.coprocEnvironments.isEmpty()) {
1153      return defaultResult;
1154    }
1155    return execOperationWithResult(
1156        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter,
1157            defaultResult, bypassable) {
1158          @Override
1159          public Result call(RegionObserver observer) throws IOException {
1160            return observer.preAppendAfterRowLock(this, append);
1161          }
1162        });
1163  }
1164
1165  /**
1166   * Supports Coprocessor 'bypass'.
1167   * @param increment increment object
1168   * @return result to return to client if default operation should be bypassed, null otherwise
1169   * @throws IOException if an error occurred on the coprocessor
1170   */
1171  public Result preIncrement(final Increment increment) throws IOException {
1172    boolean bypassable = true;
1173    Result defaultResult = null;
1174    if (coprocEnvironments.isEmpty()) {
1175      return defaultResult;
1176    }
1177    return execOperationWithResult(
1178        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1179            bypassable) {
1180          @Override
1181          public Result call(RegionObserver observer) throws IOException {
1182            return observer.preIncrement(this, increment);
1183          }
1184        });
1185  }
1186
1187  /**
1188   * Supports Coprocessor 'bypass'.
1189   * @param increment increment object
1190   * @return result to return to client if default operation should be bypassed, null otherwise
1191   * @throws IOException if an error occurred on the coprocessor
1192   */
1193  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1194    boolean bypassable = true;
1195    Result defaultResult = null;
1196    if (coprocEnvironments.isEmpty()) {
1197      return defaultResult;
1198    }
1199    return execOperationWithResult(
1200        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1201            bypassable) {
1202          @Override
1203          public Result call(RegionObserver observer) throws IOException {
1204            return observer.preIncrementAfterRowLock(this, increment);
1205          }
1206        });
1207  }
1208
1209  /**
1210   * @param append Append object
1211   * @param result the result returned by the append
1212   * @throws IOException if an error occurred on the coprocessor
1213   */
1214  public Result postAppend(final Append append, final Result result) throws IOException {
1215    if (this.coprocEnvironments.isEmpty()) {
1216      return result;
1217    }
1218    return execOperationWithResult(
1219        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1220          @Override
1221          public Result call(RegionObserver observer) throws IOException {
1222            return observer.postAppend(this, append, result);
1223          }
1224        });
1225  }
1226
1227  /**
1228   * @param increment increment object
1229   * @param result the result returned by postIncrement
1230   * @throws IOException if an error occurred on the coprocessor
1231   */
1232  public Result postIncrement(final Increment increment, Result result) throws IOException {
1233    if (this.coprocEnvironments.isEmpty()) {
1234      return result;
1235    }
1236    return execOperationWithResult(
1237        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1238          @Override
1239          public Result call(RegionObserver observer) throws IOException {
1240            return observer.postIncrement(this, increment, getResult());
1241          }
1242        });
1243  }
1244
1245  /**
1246   * @param scan the Scan specification
1247   * @exception IOException Exception
1248   */
1249  public void preScannerOpen(final Scan scan) throws IOException {
1250    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1251      @Override
1252      public void call(RegionObserver observer) throws IOException {
1253        observer.preScannerOpen(this, scan);
1254      }
1255    });
1256  }
1257
1258  /**
1259   * @param scan the Scan specification
1260   * @param s the scanner
1261   * @return the scanner instance to use
1262   * @exception IOException Exception
1263   */
1264  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1265    if (this.coprocEnvironments.isEmpty()) {
1266      return s;
1267    }
1268    return execOperationWithResult(
1269        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1270          @Override
1271          public RegionScanner call(RegionObserver observer) throws IOException {
1272            return observer.postScannerOpen(this, scan, getResult());
1273          }
1274        });
1275  }
1276
1277  /**
1278   * @param s the scanner
1279   * @param results the result set returned by the region server
1280   * @param limit the maximum number of results to return
1281   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1282   * @exception IOException Exception
1283   */
1284  public Boolean preScannerNext(final InternalScanner s,
1285      final List<Result> results, final int limit) throws IOException {
1286    boolean bypassable = true;
1287    boolean defaultResult = false;
1288    if (coprocEnvironments.isEmpty()) {
1289      return null;
1290    }
1291    return execOperationWithResult(
1292        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1293            defaultResult, bypassable) {
1294          @Override
1295          public Boolean call(RegionObserver observer) throws IOException {
1296            return observer.preScannerNext(this, s, results, limit, getResult());
1297          }
1298        });
1299  }
1300
1301  /**
1302   * @param s the scanner
1303   * @param results the result set returned by the region server
1304   * @param limit the maximum number of results to return
1305   * @param hasMore
1306   * @return 'has more' indication to give to client
1307   * @exception IOException Exception
1308   */
1309  public boolean postScannerNext(final InternalScanner s,
1310      final List<Result> results, final int limit, boolean hasMore)
1311      throws IOException {
1312    if (this.coprocEnvironments.isEmpty()) {
1313      return hasMore;
1314    }
1315    return execOperationWithResult(
1316        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1317          @Override
1318          public Boolean call(RegionObserver observer) throws IOException {
1319            return observer.postScannerNext(this, s, results, limit, getResult());
1320          }
1321        });
1322  }
1323
1324  /**
1325   * This will be called by the scan flow when the current scanned row is being filtered out by the
1326   * filter.
1327   * @param s the scanner
1328   * @param curRowCell The cell in the current row which got filtered out
1329   * @return whether more rows are available for the scanner or not
1330   * @throws IOException
1331   */
1332  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1333      throws IOException {
1334    // short circuit for performance
1335    boolean defaultResult = true;
1336    if (!hasCustomPostScannerFilterRow) {
1337      return defaultResult;
1338    }
1339    if (this.coprocEnvironments.isEmpty()) {
1340      return defaultResult;
1341    }
1342    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1343        regionObserverGetter, defaultResult) {
1344      @Override
1345      public Boolean call(RegionObserver observer) throws IOException {
1346        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1347      }
1348    });
1349  }
1350
1351  /**
1352   * Supports Coprocessor 'bypass'.
1353   * @param s the scanner
1354   * @return true if default behavior should be bypassed, false otherwise
1355   * @exception IOException Exception
1356   */
1357  // Should this be bypassable?
1358  public boolean preScannerClose(final InternalScanner s) throws IOException {
1359    return execOperation(coprocEnvironments.isEmpty()? null:
1360        new RegionObserverOperationWithoutResult(true) {
1361      @Override
1362      public void call(RegionObserver observer) throws IOException {
1363        observer.preScannerClose(this, s);
1364      }
1365    });
1366  }
1367
1368  /**
1369   * @exception IOException Exception
1370   */
1371  public void postScannerClose(final InternalScanner s) throws IOException {
1372    execOperation(coprocEnvironments.isEmpty()? null:
1373        new RegionObserverOperationWithoutResult() {
1374      @Override
1375      public void call(RegionObserver observer) throws IOException {
1376        observer.postScannerClose(this, s);
1377      }
1378    });
1379  }
1380
1381  /**
1382   * Called before open store scanner for user scan.
1383   */
1384  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1385    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1386    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1387    execOperation(new RegionObserverOperationWithoutResult() {
1388      @Override
1389      public void call(RegionObserver observer) throws IOException {
1390        observer.preStoreScannerOpen(this, store, builder);
1391      }
1392    });
1393    return builder.build();
1394  }
1395
1396  /**
1397   * @param info the RegionInfo for this region
1398   * @param edits the file of recovered edits
1399   */
1400  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1401    execOperation(coprocEnvironments.isEmpty()? null:
1402        new RegionObserverOperationWithoutResult(true) {
1403      @Override
1404      public void call(RegionObserver observer) throws IOException {
1405        observer.preReplayWALs(this, info, edits);
1406      }
1407    });
1408  }
1409
1410  /**
1411   * @param info the RegionInfo for this region
1412   * @param edits the file of recovered edits
1413   * @throws IOException Exception
1414   */
1415  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1416    execOperation(coprocEnvironments.isEmpty()? null:
1417        new RegionObserverOperationWithoutResult() {
1418      @Override
1419      public void call(RegionObserver observer) throws IOException {
1420        observer.postReplayWALs(this, info, edits);
1421      }
1422    });
1423  }
1424
1425  /**
1426   * Supports Coprocessor 'bypass'.
1427   * @return true if default behavior should be bypassed, false otherwise
1428   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1429   * with something that doesn't expose IntefaceAudience.Private classes.
1430   */
1431  @Deprecated
1432  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1433      throws IOException {
1434    return execOperation(coprocEnvironments.isEmpty()? null:
1435        new RegionObserverOperationWithoutResult(true) {
1436      @Override
1437      public void call(RegionObserver observer) throws IOException {
1438        observer.preWALRestore(this, info, logKey, logEdit);
1439      }
1440    });
1441  }
1442
1443  /**
1444   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1445   * with something that doesn't expose IntefaceAudience.Private classes.
1446   */
1447  @Deprecated
1448  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1449      throws IOException {
1450    execOperation(coprocEnvironments.isEmpty()? null:
1451        new RegionObserverOperationWithoutResult() {
1452      @Override
1453      public void call(RegionObserver observer) throws IOException {
1454        observer.postWALRestore(this, info, logKey, logEdit);
1455      }
1456    });
1457  }
1458
1459  /**
1460   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1461   */
1462  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1463    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1464      @Override
1465      public void call(RegionObserver observer) throws IOException {
1466        observer.preBulkLoadHFile(this, familyPaths);
1467      }
1468    });
1469  }
1470
1471  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1472      throws IOException {
1473    return execOperation(coprocEnvironments.isEmpty()? null:
1474        new RegionObserverOperationWithoutResult() {
1475      @Override
1476      public void call(RegionObserver observer) throws IOException {
1477        observer.preCommitStoreFile(this, family, pairs);
1478      }
1479    });
1480  }
1481
1482  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
1483    execOperation(coprocEnvironments.isEmpty()? null:
1484        new RegionObserverOperationWithoutResult() {
1485      @Override
1486      public void call(RegionObserver observer) throws IOException {
1487        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1488      }
1489    });
1490  }
1491
1492  /**
1493   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1494   * @param map Map of CF to List of file paths for the final loaded files
1495   * @throws IOException
1496   */
1497  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1498      Map<byte[], List<Path>> map) throws IOException {
1499    if (this.coprocEnvironments.isEmpty()) {
1500      return;
1501    }
1502    execOperation(coprocEnvironments.isEmpty()? null:
1503        new RegionObserverOperationWithoutResult() {
1504          @Override
1505          public void call(RegionObserver observer) throws IOException {
1506            observer.postBulkLoadHFile(this, familyPaths, map);
1507          }
1508        });
1509  }
1510
1511  public void postStartRegionOperation(final Operation op) throws IOException {
1512    execOperation(coprocEnvironments.isEmpty()? null:
1513        new RegionObserverOperationWithoutResult() {
1514      @Override
1515      public void call(RegionObserver observer) throws IOException {
1516        observer.postStartRegionOperation(this, op);
1517      }
1518    });
1519  }
1520
1521  public void postCloseRegionOperation(final Operation op) throws IOException {
1522    execOperation(coprocEnvironments.isEmpty()? null:
1523        new RegionObserverOperationWithoutResult() {
1524      @Override
1525      public void call(RegionObserver observer) throws IOException {
1526        observer.postCloseRegionOperation(this, op);
1527      }
1528    });
1529  }
1530
1531  /**
1532   * @param fs fileystem to read from
1533   * @param p path to the file
1534   * @param in {@link FSDataInputStreamWrapper}
1535   * @param size Full size of the file
1536   * @param cacheConf
1537   * @param r original reference file. This will be not null only when reading a split file.
1538   * @return a Reader instance to use instead of the base reader if overriding
1539   * default behavior, null otherwise
1540   * @throws IOException
1541   */
1542  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1543      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1544      final Reference r) throws IOException {
1545    if (coprocEnvironments.isEmpty()) {
1546      return null;
1547    }
1548    return execOperationWithResult(
1549        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1550          @Override
1551          public StoreFileReader call(RegionObserver observer) throws IOException {
1552            return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1553                getResult());
1554          }
1555        });
1556  }
1557
1558  /**
1559   * @param fs fileystem to read from
1560   * @param p path to the file
1561   * @param in {@link FSDataInputStreamWrapper}
1562   * @param size Full size of the file
1563   * @param cacheConf
1564   * @param r original reference file. This will be not null only when reading a split file.
1565   * @param reader the base reader instance
1566   * @return The reader to use
1567   * @throws IOException
1568   */
1569  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1570      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1571      final Reference r, final StoreFileReader reader) throws IOException {
1572    if (this.coprocEnvironments.isEmpty()) {
1573      return reader;
1574    }
1575    return execOperationWithResult(
1576        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
1577          @Override
1578          public StoreFileReader call(RegionObserver observer) throws IOException {
1579            return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1580                getResult());
1581          }
1582        });
1583  }
1584
1585  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1586      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1587    if (this.coprocEnvironments.isEmpty()) {
1588      return cellPairs;
1589    }
1590    return execOperationWithResult(
1591        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1592            regionObserverGetter, cellPairs) {
1593          @Override
1594          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1595            return observer.postIncrementBeforeWAL(this, mutation, getResult());
1596          }
1597        });
1598  }
1599
1600  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1601      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1602    if (this.coprocEnvironments.isEmpty()) {
1603      return cellPairs;
1604    }
1605    return execOperationWithResult(
1606        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1607            regionObserverGetter, cellPairs) {
1608          @Override
1609          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1610            return observer.postAppendBeforeWAL(this, mutation, getResult());
1611          }
1612        });
1613  }
1614
1615  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1616    if (this.coprocEnvironments.isEmpty()){
1617      return;
1618    }
1619    execOperation(new RegionObserverOperationWithoutResult() {
1620      @Override
1621      public void call(RegionObserver observer) throws IOException {
1622        observer.preWALAppend(this, key, edit);
1623      }
1624    });
1625  }
1626
1627  public Message preEndpointInvocation(final Service service, final String methodName,
1628      Message request) throws IOException {
1629    if (coprocEnvironments.isEmpty()) {
1630      return request;
1631    }
1632    return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver,
1633        Message>(endpointObserverGetter, request) {
1634      @Override
1635      public Message call(EndpointObserver observer) throws IOException {
1636        return observer.preEndpointInvocation(this, service, methodName, getResult());
1637      }
1638    });
1639  }
1640
1641  public void postEndpointInvocation(final Service service, final String methodName,
1642      final Message request, final Message.Builder responseBuilder) throws IOException {
1643    execOperation(coprocEnvironments.isEmpty() ? null :
1644        new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1645          @Override
1646          public void call(EndpointObserver observer) throws IOException {
1647            observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1648          }
1649        });
1650  }
1651
1652  /**
1653   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1654   */
1655  @Deprecated
1656  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1657    if (this.coprocEnvironments.isEmpty()) {
1658      return result;
1659    }
1660    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>(
1661        regionObserverGetter, result) {
1662      @Override
1663      public DeleteTracker call(RegionObserver observer) throws IOException {
1664        return observer.postInstantiateDeleteTracker(this, getResult());
1665      }
1666    });
1667  }
1668
1669  /////////////////////////////////////////////////////////////////////////////////////////////////
1670  // BulkLoadObserver hooks
1671  /////////////////////////////////////////////////////////////////////////////////////////////////
1672  public void prePrepareBulkLoad(User user) throws IOException {
1673    execOperation(coprocEnvironments.isEmpty() ? null :
1674        new BulkLoadObserverOperation(user) {
1675          @Override protected void call(BulkLoadObserver observer) throws IOException {
1676            observer.prePrepareBulkLoad(this);
1677          }
1678        });
1679  }
1680
1681  public void preCleanupBulkLoad(User user) throws IOException {
1682    execOperation(coprocEnvironments.isEmpty() ? null :
1683        new BulkLoadObserverOperation(user) {
1684          @Override protected void call(BulkLoadObserver observer) throws IOException {
1685            observer.preCleanupBulkLoad(this);
1686          }
1687        });
1688  }
1689}