001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import com.google.protobuf.Message;
022import com.google.protobuf.Service;
023
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.Map;
029import java.util.UUID;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CompareOperator;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.RawCellBuilder;
040import org.apache.hadoop.hbase.RawCellBuilderFactory;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.SharedConnection;
043import org.apache.hadoop.hbase.client.Append;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.Delete;
046import org.apache.hadoop.hbase.client.Durability;
047import org.apache.hadoop.hbase.client.Get;
048import org.apache.hadoop.hbase.client.Increment;
049import org.apache.hadoop.hbase.client.Mutation;
050import org.apache.hadoop.hbase.client.Put;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.TableDescriptor;
055import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
056import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
058import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
059import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
061import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
063import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
064import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
065import org.apache.hadoop.hbase.coprocessor.ObserverContext;
066import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
068import org.apache.hadoop.hbase.coprocessor.RegionObserver;
069import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
070import org.apache.hadoop.hbase.filter.ByteArrayComparable;
071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
072import org.apache.hadoop.hbase.io.Reference;
073import org.apache.hadoop.hbase.io.hfile.CacheConfig;
074import org.apache.hadoop.hbase.metrics.MetricRegistry;
075import org.apache.hadoop.hbase.regionserver.Region.Operation;
076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
079import org.apache.hadoop.hbase.security.User;
080import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
081import org.apache.hadoop.hbase.util.Pair;
082import org.apache.hadoop.hbase.wal.WALEdit;
083import org.apache.hadoop.hbase.wal.WALKey;
084import org.apache.yetus.audience.InterfaceAudience;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
090
091/**
092 * Implements the coprocessor environment and runtime support for coprocessors
093 * loaded within a {@link Region}.
094 */
095@InterfaceAudience.Private
096public class RegionCoprocessorHost
097    extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
098
099  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
100  // The shared data map
101  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
102      new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
103          AbstractReferenceMap.ReferenceStrength.WEAK);
104
105  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
106  private final boolean hasCustomPostScannerFilterRow;
107
108  /**
109   *
110   * Encapsulation of the environment of each coprocessor
111   */
112  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
113      implements RegionCoprocessorEnvironment {
114    private Region region;
115    ConcurrentMap<String, Object> sharedData;
116    private final MetricRegistry metricRegistry;
117    private final RegionServerServices services;
118
119    /**
120     * Constructor
121     * @param impl the coprocessor instance
122     * @param priority chaining priority
123     */
124    public RegionEnvironment(final RegionCoprocessor impl, final int priority,
125        final int seq, final Configuration conf, final Region region,
126        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
127      super(impl, priority, seq, conf);
128      this.region = region;
129      this.sharedData = sharedData;
130      this.services = services;
131      this.metricRegistry =
132          MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
133    }
134
135    /** @return the region */
136    @Override
137    public Region getRegion() {
138      return region;
139    }
140
141    @Override
142    public OnlineRegions getOnlineRegions() {
143      return this.services;
144    }
145
146    @Override
147    public Connection getConnection() {
148      // Mocks may have services as null at test time.
149      return services != null ? new SharedConnection(services.getConnection()) : null;
150    }
151
152    @Override
153    public Connection createConnection(Configuration conf) throws IOException {
154      return services != null ? this.services.createConnection(conf) : null;
155    }
156
157    @Override
158    public ServerName getServerName() {
159      return services != null? services.getServerName(): null;
160    }
161
162    @Override
163    public void shutdown() {
164      super.shutdown();
165      MetricsCoprocessor.removeRegistry(this.metricRegistry);
166    }
167
168    @Override
169    public ConcurrentMap<String, Object> getSharedData() {
170      return sharedData;
171    }
172
173    @Override
174    public RegionInfo getRegionInfo() {
175      return region.getRegionInfo();
176    }
177
178    @Override
179    public MetricRegistry getMetricRegistryForRegionServer() {
180      return metricRegistry;
181    }
182
183    @Override
184    public RawCellBuilder getCellBuilder() {
185      // We always do a DEEP_COPY only
186      return RawCellBuilderFactory.create();
187    }
188  }
189
190  /**
191   * Special version of RegionEnvironment that exposes RegionServerServices for Core
192   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
193   */
194  private static class RegionEnvironmentForCoreCoprocessors extends
195      RegionEnvironment implements HasRegionServerServices {
196    private final RegionServerServices rsServices;
197
198    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
199      final int seq, final Configuration conf, final Region region,
200      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
201      super(impl, priority, seq, conf, region, services, sharedData);
202      this.rsServices = services;
203    }
204
205    /**
206     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
207     * consumption.
208     */
209    @Override
210    public RegionServerServices getRegionServerServices() {
211      return this.rsServices;
212    }
213  }
214
215  static class TableCoprocessorAttribute {
216    private Path path;
217    private String className;
218    private int priority;
219    private Configuration conf;
220
221    public TableCoprocessorAttribute(Path path, String className, int priority,
222        Configuration conf) {
223      this.path = path;
224      this.className = className;
225      this.priority = priority;
226      this.conf = conf;
227    }
228
229    public Path getPath() {
230      return path;
231    }
232
233    public String getClassName() {
234      return className;
235    }
236
237    public int getPriority() {
238      return priority;
239    }
240
241    public Configuration getConf() {
242      return conf;
243    }
244  }
245
246  /** The region server services */
247  RegionServerServices rsServices;
248  /** The region */
249  HRegion region;
250
251  /**
252   * Constructor
253   * @param region the region
254   * @param rsServices interface to available region server functionality
255   * @param conf the configuration
256   */
257  public RegionCoprocessorHost(final HRegion region,
258      final RegionServerServices rsServices, final Configuration conf) {
259    super(rsServices);
260    this.conf = conf;
261    this.rsServices = rsServices;
262    this.region = region;
263    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
264
265    // load system default cp's from configuration.
266    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
267
268    // load system default cp's for user tables from configuration.
269    if (!region.getRegionInfo().getTable().isSystemTable()) {
270      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
271    }
272
273    // load Coprocessor From HDFS
274    loadTableCoprocessors(conf);
275
276    // now check whether any coprocessor implements postScannerFilterRow
277    boolean hasCustomPostScannerFilterRow = false;
278    out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
279      if (env.getInstance() instanceof RegionObserver) {
280        Class<?> clazz = env.getInstance().getClass();
281        for(;;) {
282          if (clazz == null) {
283            // we must have directly implemented RegionObserver
284            hasCustomPostScannerFilterRow = true;
285            break out;
286          }
287          try {
288            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
289              InternalScanner.class, Cell.class, boolean.class);
290            // this coprocessor has a custom version of postScannerFilterRow
291            hasCustomPostScannerFilterRow = true;
292            break out;
293          } catch (NoSuchMethodException ignore) {
294          }
295          // the deprecated signature still exists
296          try {
297            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
298              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
299            // this coprocessor has a custom version of postScannerFilterRow
300            hasCustomPostScannerFilterRow = true;
301            break out;
302          } catch (NoSuchMethodException ignore) {
303          }
304          clazz = clazz.getSuperclass();
305        }
306      }
307    }
308    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
309  }
310
311  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
312      TableDescriptor htd) {
313    return htd.getCoprocessorDescriptors().stream().map(cp -> {
314      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
315      Configuration ourConf;
316      if (!cp.getProperties().isEmpty()) {
317        // do an explicit deep copy of the passed configuration
318        ourConf = new Configuration(false);
319        HBaseConfiguration.merge(ourConf, conf);
320        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
321      } else {
322        ourConf = conf;
323      }
324      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
325    }).collect(Collectors.toList());
326  }
327
328  /**
329   * Sanity check the table coprocessor attributes of the supplied schema. Will
330   * throw an exception if there is a problem.
331   * @param conf
332   * @param htd
333   * @throws IOException
334   */
335  public static void testTableCoprocessorAttrs(final Configuration conf,
336      final TableDescriptor htd) throws IOException {
337    String pathPrefix = UUID.randomUUID().toString();
338    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
339      if (attr.getPriority() < 0) {
340        throw new IOException("Priority for coprocessor " + attr.getClassName() +
341          " cannot be less than 0");
342      }
343      ClassLoader old = Thread.currentThread().getContextClassLoader();
344      try {
345        ClassLoader cl;
346        if (attr.getPath() != null) {
347          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
348            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
349        } else {
350          cl = CoprocessorHost.class.getClassLoader();
351        }
352        Thread.currentThread().setContextClassLoader(cl);
353        cl.loadClass(attr.getClassName());
354      } catch (ClassNotFoundException e) {
355        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
356      } finally {
357        Thread.currentThread().setContextClassLoader(old);
358      }
359    }
360  }
361
362  void loadTableCoprocessors(final Configuration conf) {
363    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
364      DEFAULT_COPROCESSORS_ENABLED);
365    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
366      DEFAULT_USER_COPROCESSORS_ENABLED);
367    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
368      return;
369    }
370
371    // scan the table attributes for coprocessor load specifications
372    // initialize the coprocessors
373    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
374    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
375        region.getTableDescriptor())) {
376      // Load encompasses classloading and coprocessor initialization
377      try {
378        RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
379            attr.getPriority(), attr.getConf());
380        if (env == null) {
381          continue;
382        }
383        configured.add(env);
384        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
385            region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
386      } catch (Throwable t) {
387        // Coprocessor failed to load, do we abort on error?
388        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
389          abortServer(attr.getClassName(), t);
390        } else {
391          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
392        }
393      }
394    }
395    // add together to coprocessor set for COW efficiency
396    coprocEnvironments.addAll(configured);
397  }
398
399  @Override
400  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
401      Configuration conf) {
402    // If coprocessor exposes any services, register them.
403    for (Service service : instance.getServices()) {
404      region.registerService(service);
405    }
406    ConcurrentMap<String, Object> classData;
407    // make sure only one thread can add maps
408    synchronized (SHARED_DATA_MAP) {
409      // as long as at least one RegionEnvironment holds on to its classData it will
410      // remain in this map
411      classData =
412          SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
413              k -> new ConcurrentHashMap<>());
414    }
415    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
416    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
417        new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
418            rsServices, classData):
419        new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
420  }
421
422  @Override
423  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
424      throws InstantiationException, IllegalAccessException {
425    try {
426      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
427        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
428      } else if (CoprocessorService.class.isAssignableFrom(implClass)) {
429        // For backward compatibility with old CoprocessorService impl which don't extend
430        // RegionCoprocessor.
431        CoprocessorService cs;
432        cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance();
433        return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs);
434      } else {
435        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
436            implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
437        return null;
438      }
439    } catch (NoSuchMethodException | InvocationTargetException e) {
440      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
441    }
442  }
443
444  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
445      RegionCoprocessor::getRegionObserver;
446
447  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
448      RegionCoprocessor::getEndpointObserver;
449
450  abstract class RegionObserverOperationWithoutResult extends
451      ObserverOperationWithoutResult<RegionObserver> {
452    public RegionObserverOperationWithoutResult() {
453      super(regionObserverGetter);
454    }
455
456    public RegionObserverOperationWithoutResult(User user) {
457      super(regionObserverGetter, user);
458    }
459
460    public RegionObserverOperationWithoutResult(boolean bypassable) {
461      super(regionObserverGetter, null, bypassable);
462    }
463
464    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
465      super(regionObserverGetter, user, bypassable);
466    }
467  }
468
469  abstract class BulkLoadObserverOperation extends
470      ObserverOperationWithoutResult<BulkLoadObserver> {
471    public BulkLoadObserverOperation(User user) {
472      super(RegionCoprocessor::getBulkLoadObserver, user);
473    }
474  }
475
476
477  //////////////////////////////////////////////////////////////////////////////////////////////////
478  // Observer operations
479  //////////////////////////////////////////////////////////////////////////////////////////////////
480
481  //////////////////////////////////////////////////////////////////////////////////////////////////
482  // Observer operations
483  //////////////////////////////////////////////////////////////////////////////////////////////////
484
485  /**
486   * Invoked before a region open.
487   *
488   * @throws IOException Signals that an I/O exception has occurred.
489   */
490  public void preOpen() throws IOException {
491    if (coprocEnvironments.isEmpty()) {
492      return;
493    }
494    execOperation(new RegionObserverOperationWithoutResult() {
495      @Override
496      public void call(RegionObserver observer) throws IOException {
497        observer.preOpen(this);
498      }
499    });
500  }
501
502
503  /**
504   * Invoked after a region open
505   */
506  public void postOpen() {
507    if (coprocEnvironments.isEmpty()) {
508      return;
509    }
510    try {
511      execOperation(new RegionObserverOperationWithoutResult() {
512        @Override
513        public void call(RegionObserver observer) throws IOException {
514          observer.postOpen(this);
515        }
516      });
517    } catch (IOException e) {
518      LOG.warn(e.toString(), e);
519    }
520  }
521
522  /**
523   * Invoked before a region is closed
524   * @param abortRequested true if the server is aborting
525   */
526  public void preClose(final boolean abortRequested) throws IOException {
527    execOperation(new RegionObserverOperationWithoutResult() {
528      @Override
529      public void call(RegionObserver observer) throws IOException {
530        observer.preClose(this, abortRequested);
531      }
532    });
533  }
534
535  /**
536   * Invoked after a region is closed
537   * @param abortRequested true if the server is aborting
538   */
539  public void postClose(final boolean abortRequested) {
540    try {
541      execOperation(new RegionObserverOperationWithoutResult() {
542        @Override
543        public void call(RegionObserver observer) throws IOException {
544          observer.postClose(this, abortRequested);
545        }
546
547        @Override
548        public void postEnvCall() {
549          shutdown(this.getEnvironment());
550        }
551      });
552    } catch (IOException e) {
553      LOG.warn(e.toString(), e);
554    }
555  }
556
557  /**
558   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
559   * available candidates.
560   * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
561   * the passed in <code>candidates</code>.
562   * @param store The store where compaction is being requested
563   * @param candidates The currently available store files
564   * @param tracker used to track the life cycle of a compaction
565   * @param user the user
566   * @throws IOException
567   */
568  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
569      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
570    if (coprocEnvironments.isEmpty()) {
571      return false;
572    }
573    boolean bypassable = true;
574    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
575      @Override
576      public void call(RegionObserver observer) throws IOException {
577        observer.preCompactSelection(this, store, candidates, tracker);
578      }
579    });
580  }
581
582  /**
583   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
584   * candidates.
585   * @param store The store where compaction is being requested
586   * @param selected The store files selected to compact
587   * @param tracker used to track the life cycle of a compaction
588   * @param request the compaction request
589   * @param user the user
590   */
591  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
592      final CompactionLifeCycleTracker tracker, final CompactionRequest request,
593      final User user) throws IOException {
594    if (coprocEnvironments.isEmpty()) {
595      return;
596    }
597    execOperation(new RegionObserverOperationWithoutResult(user) {
598      @Override
599      public void call(RegionObserver observer) throws IOException {
600        observer.postCompactSelection(this, store, selected, tracker, request);
601      }
602    });
603  }
604
605  /**
606   * Called prior to opening store scanner for compaction.
607   */
608  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
609      CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
610    if (coprocEnvironments.isEmpty()) {
611      return store.getScanInfo();
612    }
613    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
614    execOperation(new RegionObserverOperationWithoutResult(user) {
615      @Override
616      public void call(RegionObserver observer) throws IOException {
617        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
618      }
619    });
620    return builder.build();
621  }
622
623  /**
624   * Called prior to rewriting the store files selected for compaction
625   * @param store the store being compacted
626   * @param scanner the scanner used to read store data during compaction
627   * @param scanType type of Scan
628   * @param tracker used to track the life cycle of a compaction
629   * @param request the compaction request
630   * @param user the user
631   * @return Scanner to use (cannot be null!)
632   * @throws IOException
633   */
634  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
635      final ScanType scanType, final CompactionLifeCycleTracker tracker,
636      final CompactionRequest request, final User user) throws IOException {
637    InternalScanner defaultResult = scanner;
638    if (coprocEnvironments.isEmpty()) {
639      return defaultResult;
640    }
641    return execOperationWithResult(
642        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter,
643            defaultResult, user) {
644          @Override
645          public InternalScanner call(RegionObserver observer) throws IOException {
646            InternalScanner scanner =
647                observer.preCompact(this, store, getResult(), scanType, tracker, request);
648            if (scanner == null) {
649              throw new CoprocessorException("Null Scanner return disallowed!");
650            }
651            return scanner;
652          }
653        });
654  }
655
656  /**
657   * Called after the store compaction has completed.
658   * @param store the store being compacted
659   * @param resultFile the new store file written during compaction
660   * @param tracker used to track the life cycle of a compaction
661   * @param request the compaction request
662   * @param user the user
663   * @throws IOException
664   */
665  public void postCompact(final HStore store, final HStoreFile resultFile,
666      final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
667      throws IOException {
668    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) {
669      @Override
670      public void call(RegionObserver observer) throws IOException {
671        observer.postCompact(this, store, resultFile, tracker, request);
672      }
673    });
674  }
675
676  /**
677   * Invoked before create StoreScanner for flush.
678   */
679  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
680      throws IOException {
681    if (coprocEnvironments.isEmpty()) {
682      return store.getScanInfo();
683    }
684    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
685    execOperation(new RegionObserverOperationWithoutResult() {
686      @Override
687      public void call(RegionObserver observer) throws IOException {
688        observer.preFlushScannerOpen(this, store, builder, tracker);
689      }
690    });
691    return builder.build();
692  }
693
694  /**
695   * Invoked before a memstore flush
696   * @return Scanner to use (cannot be null!)
697   * @throws IOException
698   */
699  public InternalScanner preFlush(HStore store, InternalScanner scanner,
700      FlushLifeCycleTracker tracker) throws IOException {
701    if (coprocEnvironments.isEmpty()) {
702      return scanner;
703    }
704    return execOperationWithResult(
705        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
706          @Override
707          public InternalScanner call(RegionObserver observer) throws IOException {
708            InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
709            if (scanner == null) {
710              throw new CoprocessorException("Null Scanner return disallowed!");
711            }
712            return scanner;
713          }
714        });
715  }
716
717  /**
718   * Invoked before a memstore flush
719   * @throws IOException
720   */
721  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
722    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
723      @Override
724      public void call(RegionObserver observer) throws IOException {
725        observer.preFlush(this, tracker);
726      }
727    });
728  }
729
730  /**
731   * Invoked after a memstore flush
732   * @throws IOException
733   */
734  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
735    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
736      @Override
737      public void call(RegionObserver observer) throws IOException {
738        observer.postFlush(this, tracker);
739      }
740    });
741  }
742
743  /**
744   * Invoked before in memory compaction.
745   */
746  public void preMemStoreCompaction(HStore store) throws IOException {
747    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
748      @Override
749      public void call(RegionObserver observer) throws IOException {
750        observer.preMemStoreCompaction(this, store);
751      }
752    });
753  }
754
755  /**
756   * Invoked before create StoreScanner for in memory compaction.
757   */
758  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
759    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
760    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
761      @Override
762      public void call(RegionObserver observer) throws IOException {
763        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
764      }
765    });
766    return builder.build();
767  }
768
769  /**
770   * Invoked before compacting memstore.
771   */
772  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
773      throws IOException {
774    if (coprocEnvironments.isEmpty()) {
775      return scanner;
776    }
777    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
778        regionObserverGetter, scanner) {
779      @Override
780      public InternalScanner call(RegionObserver observer) throws IOException {
781        return observer.preMemStoreCompactionCompact(this, store, getResult());
782      }
783    });
784  }
785
786  /**
787   * Invoked after in memory compaction.
788   */
789  public void postMemStoreCompaction(HStore store) throws IOException {
790    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
791      @Override
792      public void call(RegionObserver observer) throws IOException {
793        observer.postMemStoreCompaction(this, store);
794      }
795    });
796  }
797
798  /**
799   * Invoked after a memstore flush
800   * @throws IOException
801   */
802  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
803      throws IOException {
804    if (coprocEnvironments.isEmpty()) {
805      return;
806    }
807    execOperation(new RegionObserverOperationWithoutResult() {
808      @Override
809      public void call(RegionObserver observer) throws IOException {
810        observer.postFlush(this, store, storeFile, tracker);
811      }
812    });
813  }
814
815  // RegionObserver support
816  /**
817   * Supports Coprocessor 'bypass'.
818   * @param get the Get request
819   * @param results What to return if return is true/'bypass'.
820   * @return true if default processing should be bypassed.
821   * @exception IOException Exception
822   */
823  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
824    if (coprocEnvironments.isEmpty()) {
825      return false;
826    }
827    boolean bypassable = true;
828    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
829      @Override
830      public void call(RegionObserver observer) throws IOException {
831        observer.preGetOp(this, get, results);
832      }
833    });
834  }
835
836  /**
837   * @param get the Get request
838   * @param results the result set
839   * @exception IOException Exception
840   */
841  public void postGet(final Get get, final List<Cell> results)
842      throws IOException {
843    if (coprocEnvironments.isEmpty()) {
844      return;
845    }
846    execOperation(new RegionObserverOperationWithoutResult() {
847      @Override
848      public void call(RegionObserver observer) throws IOException {
849        observer.postGetOp(this, get, results);
850      }
851    });
852  }
853
854  /**
855   * Supports Coprocessor 'bypass'.
856   * @param get the Get request
857   * @return true or false to return to client if bypassing normal operation, or null otherwise
858   * @exception IOException Exception
859   */
860  public Boolean preExists(final Get get) throws IOException {
861    boolean bypassable = true;
862    boolean defaultResult = false;
863    if (coprocEnvironments.isEmpty()) {
864      return null;
865    }
866    return execOperationWithResult(
867        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
868            defaultResult, bypassable) {
869          @Override
870          public Boolean call(RegionObserver observer) throws IOException {
871            return observer.preExists(this, get, getResult());
872          }
873        });
874  }
875
876  /**
877   * @param get the Get request
878   * @param result the result returned by the region server
879   * @return the result to return to the client
880   * @exception IOException Exception
881   */
882  public boolean postExists(final Get get, boolean result)
883      throws IOException {
884    if (this.coprocEnvironments.isEmpty()) {
885      return result;
886    }
887    return execOperationWithResult(
888        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
889          @Override
890          public Boolean call(RegionObserver observer) throws IOException {
891            return observer.postExists(this, get, getResult());
892          }
893        });
894  }
895
896  /**
897   * Supports Coprocessor 'bypass'.
898   * @param put The Put object
899   * @param edit The WALEdit object.
900   * @param durability The durability used
901   * @return true if default processing should be bypassed
902   * @exception IOException Exception
903   */
904  public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
905      throws IOException {
906    if (coprocEnvironments.isEmpty()) {
907      return false;
908    }
909    boolean bypassable = true;
910    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
911      @Override
912      public void call(RegionObserver observer) throws IOException {
913        observer.prePut(this, put, edit, durability);
914      }
915    });
916  }
917
918  /**
919   * Supports Coprocessor 'bypass'.
920   * @param mutation - the current mutation
921   * @param kv - the current cell
922   * @param byteNow - current timestamp in bytes
923   * @param get - the get that could be used
924   * Note that the get only does not specify the family and qualifier that should be used
925   * @return true if default processing should be bypassed
926   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
927   * Coprocessor for its needs only. Will be removed.
928   */
929  @Deprecated
930  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
931      final Cell kv, final byte[] byteNow, final Get get) throws IOException {
932    if (coprocEnvironments.isEmpty()) {
933      return false;
934    }
935    boolean bypassable = true;
936    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
937      @Override
938      public void call(RegionObserver observer) throws IOException {
939          observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
940      }
941    });
942  }
943
944  /**
945   * @param put The Put object
946   * @param edit The WALEdit object.
947   * @param durability The durability used
948   * @exception IOException Exception
949   */
950  public void postPut(final Put put, final WALEdit edit, final Durability durability)
951      throws IOException {
952    if (coprocEnvironments.isEmpty()) {
953      return;
954    }
955    execOperation(new RegionObserverOperationWithoutResult() {
956      @Override
957      public void call(RegionObserver observer) throws IOException {
958        observer.postPut(this, put, edit, durability);
959      }
960    });
961  }
962
963  /**
964   * Supports Coprocessor 'bypass'.
965   * @param delete The Delete object
966   * @param edit The WALEdit object.
967   * @param durability The durability used
968   * @return true if default processing should be bypassed
969   * @exception IOException Exception
970   */
971  public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
972      throws IOException {
973    if (this.coprocEnvironments.isEmpty()) {
974      return false;
975    }
976    boolean bypassable = true;
977    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
978      @Override
979      public void call(RegionObserver observer) throws IOException {
980         observer.preDelete(this, delete, edit, durability);
981      }
982    });
983  }
984
985  /**
986   * @param delete The Delete object
987   * @param edit The WALEdit object.
988   * @param durability The durability used
989   * @exception IOException Exception
990   */
991  public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
992      throws IOException {
993    execOperation(coprocEnvironments.isEmpty()? null:
994        new RegionObserverOperationWithoutResult() {
995      @Override
996      public void call(RegionObserver observer) throws IOException {
997        observer.postDelete(this, delete, edit, durability);
998      }
999    });
1000  }
1001
1002  public void preBatchMutate(
1003      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1004    if(this.coprocEnvironments.isEmpty()) {
1005      return;
1006    }
1007    execOperation(new RegionObserverOperationWithoutResult() {
1008      @Override
1009      public void call(RegionObserver observer) throws IOException {
1010        observer.preBatchMutate(this, miniBatchOp);
1011      }
1012    });
1013  }
1014
1015  public void postBatchMutate(
1016      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1017    if (this.coprocEnvironments.isEmpty()) {
1018      return;
1019    }
1020    execOperation(new RegionObserverOperationWithoutResult() {
1021      @Override
1022      public void call(RegionObserver observer) throws IOException {
1023        observer.postBatchMutate(this, miniBatchOp);
1024      }
1025    });
1026  }
1027
1028  public void postBatchMutateIndispensably(
1029      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1030      throws IOException {
1031    if (this.coprocEnvironments.isEmpty()) {
1032      return;
1033    }
1034    execOperation(new RegionObserverOperationWithoutResult() {
1035      @Override
1036      public void call(RegionObserver observer) throws IOException {
1037        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1038      }
1039    });
1040  }
1041
1042  /**
1043   * Supports Coprocessor 'bypass'.
1044   * @param row row to check
1045   * @param family column family
1046   * @param qualifier column qualifier
1047   * @param op the comparison operation
1048   * @param comparator the comparator
1049   * @param put data to put if check succeeds
1050   * @return true or false to return to client if default processing should be bypassed, or null
1051   * otherwise
1052   */
1053  public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1054      final byte [] qualifier, final CompareOperator op,
1055      final ByteArrayComparable comparator, final Put put)
1056      throws IOException {
1057    boolean bypassable = true;
1058    boolean defaultResult = false;
1059    if (coprocEnvironments.isEmpty()) {
1060      return null;
1061    }
1062    return execOperationWithResult(
1063        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1064            defaultResult,  bypassable) {
1065          @Override
1066          public Boolean call(RegionObserver observer) throws IOException {
1067            return observer.preCheckAndPut(this, row, family, qualifier,
1068                op, comparator, put, getResult());
1069          }
1070        });
1071  }
1072
1073  /**
1074   * Supports Coprocessor 'bypass'.
1075   * @param row row to check
1076   * @param family column family
1077   * @param qualifier column qualifier
1078   * @param op the comparison operation
1079   * @param comparator the comparator
1080   * @param put data to put if check succeeds
1081   * @return true or false to return to client if default processing should be bypassed, or null
1082   * otherwise
1083   */
1084  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
1085      justification="Null is legit")
1086  public Boolean preCheckAndPutAfterRowLock(
1087      final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op,
1088      final ByteArrayComparable comparator, final Put put) throws IOException {
1089    boolean bypassable = true;
1090    boolean defaultResult = false;
1091    if (coprocEnvironments.isEmpty()) {
1092      return null;
1093    }
1094    return execOperationWithResult(
1095        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1096            defaultResult, bypassable) {
1097          @Override
1098          public Boolean call(RegionObserver observer) throws IOException {
1099            return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier,
1100                op, comparator, put, getResult());
1101          }
1102        });
1103  }
1104
1105  /**
1106   * @param row row to check
1107   * @param family column family
1108   * @param qualifier column qualifier
1109   * @param op the comparison operation
1110   * @param comparator the comparator
1111   * @param put data to put if check succeeds
1112   * @throws IOException e
1113   */
1114  public boolean postCheckAndPut(final byte [] row, final byte [] family,
1115      final byte [] qualifier, final CompareOperator op,
1116      final ByteArrayComparable comparator, final Put put,
1117      boolean result) throws IOException {
1118    if (this.coprocEnvironments.isEmpty()) {
1119      return result;
1120    }
1121    return execOperationWithResult(
1122        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
1123          @Override
1124          public Boolean call(RegionObserver observer) throws IOException {
1125            return observer.postCheckAndPut(this, row, family, qualifier,
1126                op, comparator, put, getResult());
1127          }
1128        });
1129  }
1130
1131  /**
1132   * Supports Coprocessor 'bypass'.
1133   * @param row row to check
1134   * @param family column family
1135   * @param qualifier column qualifier
1136   * @param op the comparison operation
1137   * @param comparator the comparator
1138   * @param delete delete to commit if check succeeds
1139   * @return true or false to return to client if default processing should be bypassed,
1140   * or null otherwise
1141   */
1142  public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1143      final byte [] qualifier, final CompareOperator op,
1144      final ByteArrayComparable comparator, final Delete delete)
1145      throws IOException {
1146    boolean bypassable = true;
1147    boolean defaultResult = false;
1148    if (coprocEnvironments.isEmpty()) {
1149      return null;
1150    }
1151    return execOperationWithResult(
1152        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1153            defaultResult, bypassable) {
1154          @Override
1155          public Boolean call(RegionObserver observer) throws IOException {
1156            return observer.preCheckAndDelete(this, row, family,
1157                qualifier, op, comparator, delete, getResult());
1158          }
1159        });
1160  }
1161
1162  /**
1163   * Supports Coprocessor 'bypass'.
1164   * @param row row to check
1165   * @param family column family
1166   * @param qualifier column qualifier
1167   * @param op the comparison operation
1168   * @param comparator the comparator
1169   * @param delete delete to commit if check succeeds
1170   * @return true or false to return to client if default processing should be bypassed,
1171   * or null otherwise
1172   */
1173  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
1174      justification="Null is legit")
1175  public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1176      final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
1177      final Delete delete) throws IOException {
1178    boolean bypassable = true;
1179    boolean defaultResult = false;
1180    if (coprocEnvironments.isEmpty()) {
1181      return null;
1182    }
1183    return execOperationWithResult(
1184        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1185            defaultResult, bypassable) {
1186          @Override
1187          public Boolean call(RegionObserver observer) throws IOException {
1188            return observer.preCheckAndDeleteAfterRowLock(this, row,
1189                family, qualifier, op, comparator, delete, getResult());
1190          }
1191        });
1192  }
1193
1194  /**
1195   * @param row row to check
1196   * @param family column family
1197   * @param qualifier column qualifier
1198   * @param op the comparison operation
1199   * @param comparator the comparator
1200   * @param delete delete to commit if check succeeds
1201   * @throws IOException e
1202   */
1203  public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1204      final byte [] qualifier, final CompareOperator op,
1205      final ByteArrayComparable comparator, final Delete delete,
1206      boolean result) throws IOException {
1207    if (this.coprocEnvironments.isEmpty()) {
1208      return result;
1209    }
1210    return execOperationWithResult(
1211        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
1212          @Override
1213          public Boolean call(RegionObserver observer) throws IOException {
1214            return observer.postCheckAndDelete(this, row, family,
1215                qualifier, op, comparator, delete, getResult());
1216          }
1217        });
1218  }
1219
1220  /**
1221   * Supports Coprocessor 'bypass'.
1222   * @param append append object
1223   * @return result to return to client if default operation should be bypassed, null otherwise
1224   * @throws IOException if an error occurred on the coprocessor
1225   */
1226  public Result preAppend(final Append append) throws IOException {
1227    boolean bypassable = true;
1228    Result defaultResult = null;
1229    if (this.coprocEnvironments.isEmpty()) {
1230      return defaultResult;
1231    }
1232    return execOperationWithResult(
1233      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1234            bypassable) {
1235          @Override
1236          public Result call(RegionObserver observer) throws IOException {
1237            return observer.preAppend(this, append);
1238          }
1239        });
1240  }
1241
1242  /**
1243   * Supports Coprocessor 'bypass'.
1244   * @param append append object
1245   * @return result to return to client if default operation should be bypassed, null otherwise
1246   * @throws IOException if an error occurred on the coprocessor
1247   */
1248  public Result preAppendAfterRowLock(final Append append) throws IOException {
1249    boolean bypassable = true;
1250    Result defaultResult = null;
1251    if (this.coprocEnvironments.isEmpty()) {
1252      return defaultResult;
1253    }
1254    return execOperationWithResult(
1255        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter,
1256            defaultResult, bypassable) {
1257          @Override
1258          public Result call(RegionObserver observer) throws IOException {
1259            return observer.preAppendAfterRowLock(this, append);
1260          }
1261        });
1262  }
1263
1264  /**
1265   * Supports Coprocessor 'bypass'.
1266   * @param increment increment object
1267   * @return result to return to client if default operation should be bypassed, null otherwise
1268   * @throws IOException if an error occurred on the coprocessor
1269   */
1270  public Result preIncrement(final Increment increment) throws IOException {
1271    boolean bypassable = true;
1272    Result defaultResult = null;
1273    if (coprocEnvironments.isEmpty()) {
1274      return defaultResult;
1275    }
1276    return execOperationWithResult(
1277        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1278            bypassable) {
1279          @Override
1280          public Result call(RegionObserver observer) throws IOException {
1281            return observer.preIncrement(this, increment);
1282          }
1283        });
1284  }
1285
1286  /**
1287   * Supports Coprocessor 'bypass'.
1288   * @param increment increment object
1289   * @return result to return to client if default operation should be bypassed, null otherwise
1290   * @throws IOException if an error occurred on the coprocessor
1291   */
1292  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1293    boolean bypassable = true;
1294    Result defaultResult = null;
1295    if (coprocEnvironments.isEmpty()) {
1296      return defaultResult;
1297    }
1298    return execOperationWithResult(
1299        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1300            bypassable) {
1301          @Override
1302          public Result call(RegionObserver observer) throws IOException {
1303            return observer.preIncrementAfterRowLock(this, increment);
1304          }
1305        });
1306  }
1307
1308  /**
1309   * @param append Append object
1310   * @param result the result returned by the append
1311   * @throws IOException if an error occurred on the coprocessor
1312   */
1313  public Result postAppend(final Append append, final Result result) throws IOException {
1314    if (this.coprocEnvironments.isEmpty()) {
1315      return result;
1316    }
1317    return execOperationWithResult(
1318        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1319          @Override
1320          public Result call(RegionObserver observer) throws IOException {
1321            return observer.postAppend(this, append, result);
1322          }
1323        });
1324  }
1325
1326  /**
1327   * @param increment increment object
1328   * @param result the result returned by postIncrement
1329   * @throws IOException if an error occurred on the coprocessor
1330   */
1331  public Result postIncrement(final Increment increment, Result result) throws IOException {
1332    if (this.coprocEnvironments.isEmpty()) {
1333      return result;
1334    }
1335    return execOperationWithResult(
1336        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1337          @Override
1338          public Result call(RegionObserver observer) throws IOException {
1339            return observer.postIncrement(this, increment, getResult());
1340          }
1341        });
1342  }
1343
1344  /**
1345   * @param scan the Scan specification
1346   * @exception IOException Exception
1347   */
1348  public void preScannerOpen(final Scan scan) throws IOException {
1349    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1350      @Override
1351      public void call(RegionObserver observer) throws IOException {
1352        observer.preScannerOpen(this, scan);
1353      }
1354    });
1355  }
1356
1357  /**
1358   * @param scan the Scan specification
1359   * @param s the scanner
1360   * @return the scanner instance to use
1361   * @exception IOException Exception
1362   */
1363  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1364    if (this.coprocEnvironments.isEmpty()) {
1365      return s;
1366    }
1367    return execOperationWithResult(
1368        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1369          @Override
1370          public RegionScanner call(RegionObserver observer) throws IOException {
1371            return observer.postScannerOpen(this, scan, getResult());
1372          }
1373        });
1374  }
1375
1376  /**
1377   * @param s the scanner
1378   * @param results the result set returned by the region server
1379   * @param limit the maximum number of results to return
1380   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1381   * @exception IOException Exception
1382   */
1383  public Boolean preScannerNext(final InternalScanner s,
1384      final List<Result> results, final int limit) throws IOException {
1385    boolean bypassable = true;
1386    boolean defaultResult = false;
1387    if (coprocEnvironments.isEmpty()) {
1388      return null;
1389    }
1390    return execOperationWithResult(
1391        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1392            defaultResult, bypassable) {
1393          @Override
1394          public Boolean call(RegionObserver observer) throws IOException {
1395            return observer.preScannerNext(this, s, results, limit, getResult());
1396          }
1397        });
1398  }
1399
1400  /**
1401   * @param s the scanner
1402   * @param results the result set returned by the region server
1403   * @param limit the maximum number of results to return
1404   * @param hasMore
1405   * @return 'has more' indication to give to client
1406   * @exception IOException Exception
1407   */
1408  public boolean postScannerNext(final InternalScanner s,
1409      final List<Result> results, final int limit, boolean hasMore)
1410      throws IOException {
1411    if (this.coprocEnvironments.isEmpty()) {
1412      return hasMore;
1413    }
1414    return execOperationWithResult(
1415        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1416          @Override
1417          public Boolean call(RegionObserver observer) throws IOException {
1418            return observer.postScannerNext(this, s, results, limit, getResult());
1419          }
1420        });
1421  }
1422
1423  /**
1424   * This will be called by the scan flow when the current scanned row is being filtered out by the
1425   * filter.
1426   * @param s the scanner
1427   * @param curRowCell The cell in the current row which got filtered out
1428   * @return whether more rows are available for the scanner or not
1429   * @throws IOException
1430   */
1431  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1432      throws IOException {
1433    // short circuit for performance
1434    boolean defaultResult = true;
1435    if (!hasCustomPostScannerFilterRow) {
1436      return defaultResult;
1437    }
1438    if (this.coprocEnvironments.isEmpty()) {
1439      return defaultResult;
1440    }
1441    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1442        regionObserverGetter, defaultResult) {
1443      @Override
1444      public Boolean call(RegionObserver observer) throws IOException {
1445        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1446      }
1447    });
1448  }
1449
1450  /**
1451   * Supports Coprocessor 'bypass'.
1452   * @param s the scanner
1453   * @return true if default behavior should be bypassed, false otherwise
1454   * @exception IOException Exception
1455   */
1456  // Should this be bypassable?
1457  public boolean preScannerClose(final InternalScanner s) throws IOException {
1458    return execOperation(coprocEnvironments.isEmpty()? null:
1459        new RegionObserverOperationWithoutResult(true) {
1460      @Override
1461      public void call(RegionObserver observer) throws IOException {
1462        observer.preScannerClose(this, s);
1463      }
1464    });
1465  }
1466
1467  /**
1468   * @exception IOException Exception
1469   */
1470  public void postScannerClose(final InternalScanner s) throws IOException {
1471    execOperation(coprocEnvironments.isEmpty()? null:
1472        new RegionObserverOperationWithoutResult() {
1473      @Override
1474      public void call(RegionObserver observer) throws IOException {
1475        observer.postScannerClose(this, s);
1476      }
1477    });
1478  }
1479
1480  /**
1481   * Called before open store scanner for user scan.
1482   */
1483  public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
1484    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1485    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
1486    execOperation(new RegionObserverOperationWithoutResult() {
1487      @Override
1488      public void call(RegionObserver observer) throws IOException {
1489        observer.preStoreScannerOpen(this, store, builder);
1490      }
1491    });
1492    return builder.build();
1493  }
1494
1495  /**
1496   * @param info the RegionInfo for this region
1497   * @param edits the file of recovered edits
1498   */
1499  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1500    execOperation(coprocEnvironments.isEmpty()? null:
1501        new RegionObserverOperationWithoutResult(true) {
1502      @Override
1503      public void call(RegionObserver observer) throws IOException {
1504        observer.preReplayWALs(this, info, edits);
1505      }
1506    });
1507  }
1508
1509  /**
1510   * @param info the RegionInfo for this region
1511   * @param edits the file of recovered edits
1512   * @throws IOException Exception
1513   */
1514  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1515    execOperation(coprocEnvironments.isEmpty()? null:
1516        new RegionObserverOperationWithoutResult() {
1517      @Override
1518      public void call(RegionObserver observer) throws IOException {
1519        observer.postReplayWALs(this, info, edits);
1520      }
1521    });
1522  }
1523
1524  /**
1525   * Supports Coprocessor 'bypass'.
1526   * @return true if default behavior should be bypassed, false otherwise
1527   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1528   * with something that doesn't expose IntefaceAudience.Private classes.
1529   */
1530  @Deprecated
1531  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1532      throws IOException {
1533    return execOperation(coprocEnvironments.isEmpty()? null:
1534        new RegionObserverOperationWithoutResult(true) {
1535      @Override
1536      public void call(RegionObserver observer) throws IOException {
1537        observer.preWALRestore(this, info, logKey, logEdit);
1538      }
1539    });
1540  }
1541
1542  /**
1543   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1544   * with something that doesn't expose IntefaceAudience.Private classes.
1545   */
1546  @Deprecated
1547  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1548      throws IOException {
1549    execOperation(coprocEnvironments.isEmpty()? null:
1550        new RegionObserverOperationWithoutResult() {
1551      @Override
1552      public void call(RegionObserver observer) throws IOException {
1553        observer.postWALRestore(this, info, logKey, logEdit);
1554      }
1555    });
1556  }
1557
1558  /**
1559   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1560   */
1561  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1562    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1563      @Override
1564      public void call(RegionObserver observer) throws IOException {
1565        observer.preBulkLoadHFile(this, familyPaths);
1566      }
1567    });
1568  }
1569
1570  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1571      throws IOException {
1572    return execOperation(coprocEnvironments.isEmpty()? null:
1573        new RegionObserverOperationWithoutResult() {
1574      @Override
1575      public void call(RegionObserver observer) throws IOException {
1576        observer.preCommitStoreFile(this, family, pairs);
1577      }
1578    });
1579  }
1580
1581  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
1582    execOperation(coprocEnvironments.isEmpty()? null:
1583        new RegionObserverOperationWithoutResult() {
1584      @Override
1585      public void call(RegionObserver observer) throws IOException {
1586        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1587      }
1588    });
1589  }
1590
1591  /**
1592   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1593   * @param map Map of CF to List of file paths for the final loaded files
1594   * @throws IOException
1595   */
1596  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1597      Map<byte[], List<Path>> map) throws IOException {
1598    if (this.coprocEnvironments.isEmpty()) {
1599      return;
1600    }
1601    execOperation(coprocEnvironments.isEmpty()? null:
1602        new RegionObserverOperationWithoutResult() {
1603          @Override
1604          public void call(RegionObserver observer) throws IOException {
1605            observer.postBulkLoadHFile(this, familyPaths, map);
1606          }
1607        });
1608  }
1609
1610  public void postStartRegionOperation(final Operation op) throws IOException {
1611    execOperation(coprocEnvironments.isEmpty()? null:
1612        new RegionObserverOperationWithoutResult() {
1613      @Override
1614      public void call(RegionObserver observer) throws IOException {
1615        observer.postStartRegionOperation(this, op);
1616      }
1617    });
1618  }
1619
1620  public void postCloseRegionOperation(final Operation op) throws IOException {
1621    execOperation(coprocEnvironments.isEmpty()? null:
1622        new RegionObserverOperationWithoutResult() {
1623      @Override
1624      public void call(RegionObserver observer) throws IOException {
1625        observer.postCloseRegionOperation(this, op);
1626      }
1627    });
1628  }
1629
1630  /**
1631   * @param fs fileystem to read from
1632   * @param p path to the file
1633   * @param in {@link FSDataInputStreamWrapper}
1634   * @param size Full size of the file
1635   * @param cacheConf
1636   * @param r original reference file. This will be not null only when reading a split file.
1637   * @return a Reader instance to use instead of the base reader if overriding
1638   * default behavior, null otherwise
1639   * @throws IOException
1640   */
1641  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1642      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1643      final Reference r) throws IOException {
1644    if (coprocEnvironments.isEmpty()) {
1645      return null;
1646    }
1647    return execOperationWithResult(
1648        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1649          @Override
1650          public StoreFileReader call(RegionObserver observer) throws IOException {
1651            return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1652                getResult());
1653          }
1654        });
1655  }
1656
1657  /**
1658   * @param fs fileystem to read from
1659   * @param p path to the file
1660   * @param in {@link FSDataInputStreamWrapper}
1661   * @param size Full size of the file
1662   * @param cacheConf
1663   * @param r original reference file. This will be not null only when reading a split file.
1664   * @param reader the base reader instance
1665   * @return The reader to use
1666   * @throws IOException
1667   */
1668  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1669      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1670      final Reference r, final StoreFileReader reader) throws IOException {
1671    if (this.coprocEnvironments.isEmpty()) {
1672      return reader;
1673    }
1674    return execOperationWithResult(
1675        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
1676          @Override
1677          public StoreFileReader call(RegionObserver observer) throws IOException {
1678            return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1679                getResult());
1680          }
1681        });
1682  }
1683
1684  public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1685      final Cell oldCell, Cell newCell) throws IOException {
1686    if (this.coprocEnvironments.isEmpty()) {
1687      return newCell;
1688    }
1689    return execOperationWithResult(
1690        new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) {
1691          @Override
1692          public Cell call(RegionObserver observer) throws IOException {
1693            return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult());
1694          }
1695        });
1696  }
1697
1698  public Message preEndpointInvocation(final Service service, final String methodName,
1699      Message request) throws IOException {
1700    if (coprocEnvironments.isEmpty()) {
1701      return request;
1702    }
1703    return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver,
1704        Message>(endpointObserverGetter, request) {
1705      @Override
1706      public Message call(EndpointObserver observer) throws IOException {
1707        return observer.preEndpointInvocation(this, service, methodName, getResult());
1708      }
1709    });
1710  }
1711
1712  public void postEndpointInvocation(final Service service, final String methodName,
1713      final Message request, final Message.Builder responseBuilder) throws IOException {
1714    execOperation(coprocEnvironments.isEmpty() ? null :
1715        new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1716          @Override
1717          public void call(EndpointObserver observer) throws IOException {
1718            observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1719          }
1720        });
1721  }
1722
1723  /**
1724   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1725   */
1726  @Deprecated
1727  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1728    if (this.coprocEnvironments.isEmpty()) {
1729      return result;
1730    }
1731    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>(
1732        regionObserverGetter, result) {
1733      @Override
1734      public DeleteTracker call(RegionObserver observer) throws IOException {
1735        return observer.postInstantiateDeleteTracker(this, getResult());
1736      }
1737    });
1738  }
1739
1740  /////////////////////////////////////////////////////////////////////////////////////////////////
1741  // BulkLoadObserver hooks
1742  /////////////////////////////////////////////////////////////////////////////////////////////////
1743  public void prePrepareBulkLoad(User user) throws IOException {
1744    execOperation(coprocEnvironments.isEmpty() ? null :
1745        new BulkLoadObserverOperation(user) {
1746          @Override protected void call(BulkLoadObserver observer) throws IOException {
1747            observer.prePrepareBulkLoad(this);
1748          }
1749        });
1750  }
1751
1752  public void preCleanupBulkLoad(User user) throws IOException {
1753    execOperation(coprocEnvironments.isEmpty() ? null :
1754        new BulkLoadObserverOperation(user) {
1755          @Override protected void call(BulkLoadObserver observer) throws IOException {
1756            observer.preCleanupBulkLoad(this);
1757          }
1758        });
1759  }
1760}