001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.lang.reflect.InvocationTargetException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.UUID;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.RawCellBuilder;
037import org.apache.hadoop.hbase.RawCellBuilderFactory;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.client.Append;
040import org.apache.hadoop.hbase.client.CheckAndMutate;
041import org.apache.hadoop.hbase.client.CheckAndMutateResult;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.SharedConnection;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
055import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
056import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
058import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
059import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
060import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
061import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
062import org.apache.hadoop.hbase.coprocessor.ObserverContext;
063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
065import org.apache.hadoop.hbase.coprocessor.RegionObserver;
066import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
067import org.apache.hadoop.hbase.io.Reference;
068import org.apache.hadoop.hbase.io.hfile.CacheConfig;
069import org.apache.hadoop.hbase.metrics.MetricRegistry;
070import org.apache.hadoop.hbase.regionserver.Region.Operation;
071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
073import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
074import org.apache.hadoop.hbase.security.User;
075import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
076import org.apache.hadoop.hbase.util.Pair;
077import org.apache.hadoop.hbase.wal.WALEdit;
078import org.apache.hadoop.hbase.wal.WALKey;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.protobuf.Message;
084import org.apache.hbase.thirdparty.com.google.protobuf.Service;
085import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
086import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;
087
088/**
089 * Implements the coprocessor environment and runtime support for coprocessors
090 * loaded within a {@link Region}.
091 */
092@InterfaceAudience.Private
093public class RegionCoprocessorHost
094    extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> {
095
096  private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class);
097  // The shared data map
098  private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP =
099      new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD,
100          AbstractReferenceMap.ReferenceStrength.WEAK);
101
102  // optimization: no need to call postScannerFilterRow, if no coprocessor implements it
103  private final boolean hasCustomPostScannerFilterRow;
104
105  /**
106   *
107   * Encapsulation of the environment of each coprocessor
108   */
109  private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor>
110      implements RegionCoprocessorEnvironment {
111    private Region region;
112    ConcurrentMap<String, Object> sharedData;
113    private final MetricRegistry metricRegistry;
114    private final RegionServerServices services;
115
116    /**
117     * Constructor
118     * @param impl the coprocessor instance
119     * @param priority chaining priority
120     */
121    public RegionEnvironment(final RegionCoprocessor impl, final int priority,
122        final int seq, final Configuration conf, final Region region,
123        final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
124      super(impl, priority, seq, conf);
125      this.region = region;
126      this.sharedData = sharedData;
127      this.services = services;
128      this.metricRegistry =
129          MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
130    }
131
132    /** @return the region */
133    @Override
134    public Region getRegion() {
135      return region;
136    }
137
138    @Override
139    public OnlineRegions getOnlineRegions() {
140      return this.services;
141    }
142
143    @Override
144    public Connection getConnection() {
145      // Mocks may have services as null at test time.
146      return services != null ? new SharedConnection(services.getConnection()) : null;
147    }
148
149    @Override
150    public Connection createConnection(Configuration conf) throws IOException {
151      return services != null ? this.services.createConnection(conf) : null;
152    }
153
154    @Override
155    public ServerName getServerName() {
156      return services != null? services.getServerName(): null;
157    }
158
159    @Override
160    public void shutdown() {
161      super.shutdown();
162      MetricsCoprocessor.removeRegistry(this.metricRegistry);
163    }
164
165    @Override
166    public ConcurrentMap<String, Object> getSharedData() {
167      return sharedData;
168    }
169
170    @Override
171    public RegionInfo getRegionInfo() {
172      return region.getRegionInfo();
173    }
174
175    @Override
176    public MetricRegistry getMetricRegistryForRegionServer() {
177      return metricRegistry;
178    }
179
180    @Override
181    public RawCellBuilder getCellBuilder() {
182      // We always do a DEEP_COPY only
183      return RawCellBuilderFactory.create();
184    }
185  }
186
187  /**
188   * Special version of RegionEnvironment that exposes RegionServerServices for Core
189   * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core.
190   */
191  private static class RegionEnvironmentForCoreCoprocessors extends
192      RegionEnvironment implements HasRegionServerServices {
193    private final RegionServerServices rsServices;
194
195    public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority,
196      final int seq, final Configuration conf, final Region region,
197      final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
198      super(impl, priority, seq, conf, region, services, sharedData);
199      this.rsServices = services;
200    }
201
202    /**
203     * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
204     * consumption.
205     */
206    @Override
207    public RegionServerServices getRegionServerServices() {
208      return this.rsServices;
209    }
210  }
211
212  static class TableCoprocessorAttribute {
213    private Path path;
214    private String className;
215    private int priority;
216    private Configuration conf;
217
218    public TableCoprocessorAttribute(Path path, String className, int priority,
219        Configuration conf) {
220      this.path = path;
221      this.className = className;
222      this.priority = priority;
223      this.conf = conf;
224    }
225
226    public Path getPath() {
227      return path;
228    }
229
230    public String getClassName() {
231      return className;
232    }
233
234    public int getPriority() {
235      return priority;
236    }
237
238    public Configuration getConf() {
239      return conf;
240    }
241  }
242
243  /** The region server services */
244  RegionServerServices rsServices;
245  /** The region */
246  HRegion region;
247
248  /**
249   * Constructor
250   * @param region the region
251   * @param rsServices interface to available region server functionality
252   * @param conf the configuration
253   */
254  public RegionCoprocessorHost(final HRegion region,
255      final RegionServerServices rsServices, final Configuration conf) {
256    super(rsServices);
257    this.conf = conf;
258    this.rsServices = rsServices;
259    this.region = region;
260    this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
261
262    // load system default cp's from configuration.
263    loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
264
265    // load system default cp's for user tables from configuration.
266    if (!region.getRegionInfo().getTable().isSystemTable()) {
267      loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
268    }
269
270    // load Coprocessor From HDFS
271    loadTableCoprocessors(conf);
272
273    // now check whether any coprocessor implements postScannerFilterRow
274    boolean hasCustomPostScannerFilterRow = false;
275    out: for (RegionCoprocessorEnvironment env: coprocEnvironments) {
276      if (env.getInstance() instanceof RegionObserver) {
277        Class<?> clazz = env.getInstance().getClass();
278        for(;;) {
279          if (clazz == null) {
280            // we must have directly implemented RegionObserver
281            hasCustomPostScannerFilterRow = true;
282            break out;
283          }
284          try {
285            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
286              InternalScanner.class, Cell.class, boolean.class);
287            // this coprocessor has a custom version of postScannerFilterRow
288            hasCustomPostScannerFilterRow = true;
289            break out;
290          } catch (NoSuchMethodException ignore) {
291          }
292          // the deprecated signature still exists
293          try {
294            clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class,
295              InternalScanner.class, byte[].class, int.class, short.class, boolean.class);
296            // this coprocessor has a custom version of postScannerFilterRow
297            hasCustomPostScannerFilterRow = true;
298            break out;
299          } catch (NoSuchMethodException ignore) {
300          }
301          clazz = clazz.getSuperclass();
302        }
303      }
304    }
305    this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow;
306  }
307
308  static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
309      TableDescriptor htd) {
310    return htd.getCoprocessorDescriptors().stream().map(cp -> {
311      Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null);
312      Configuration ourConf;
313      if (!cp.getProperties().isEmpty()) {
314        // do an explicit deep copy of the passed configuration
315        ourConf = new Configuration(false);
316        HBaseConfiguration.merge(ourConf, conf);
317        cp.getProperties().forEach((k, v) -> ourConf.set(k, v));
318      } else {
319        ourConf = conf;
320      }
321      return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf);
322    }).collect(Collectors.toList());
323  }
324
325  /**
326   * Sanity check the table coprocessor attributes of the supplied schema. Will
327   * throw an exception if there is a problem.
328   * @param conf
329   * @param htd
330   * @throws IOException
331   */
332  public static void testTableCoprocessorAttrs(final Configuration conf,
333      final TableDescriptor htd) throws IOException {
334    String pathPrefix = UUID.randomUUID().toString();
335    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
336      if (attr.getPriority() < 0) {
337        throw new IOException("Priority for coprocessor " + attr.getClassName() +
338          " cannot be less than 0");
339      }
340      ClassLoader old = Thread.currentThread().getContextClassLoader();
341      try {
342        ClassLoader cl;
343        if (attr.getPath() != null) {
344          cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
345            CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
346        } else {
347          cl = CoprocessorHost.class.getClassLoader();
348        }
349        Thread.currentThread().setContextClassLoader(cl);
350        if (cl instanceof CoprocessorClassLoader) {
351          String[] includedClassPrefixes = null;
352          if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) {
353            String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY);
354            includedClassPrefixes = prefixes.split(";");
355          }
356          ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes);
357        } else {
358          cl.loadClass(attr.getClassName());
359        }
360      } catch (ClassNotFoundException e) {
361        throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
362      } finally {
363        Thread.currentThread().setContextClassLoader(old);
364      }
365    }
366  }
367
368  void loadTableCoprocessors(final Configuration conf) {
369    boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
370      DEFAULT_COPROCESSORS_ENABLED);
371    boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
372      DEFAULT_USER_COPROCESSORS_ENABLED);
373    if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
374      return;
375    }
376
377    // scan the table attributes for coprocessor load specifications
378    // initialize the coprocessors
379    List<RegionCoprocessorEnvironment> configured = new ArrayList<>();
380    for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
381        region.getTableDescriptor())) {
382      // Load encompasses classloading and coprocessor initialization
383      try {
384        RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(),
385            attr.getPriority(), attr.getConf());
386        if (env == null) {
387          continue;
388        }
389        configured.add(env);
390        LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
391            region.getTableDescriptor().getTableName().getNameAsString() + " successfully.");
392      } catch (Throwable t) {
393        // Coprocessor failed to load, do we abort on error?
394        if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
395          abortServer(attr.getClassName(), t);
396        } else {
397          LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
398        }
399      }
400    }
401    // add together to coprocessor set for COW efficiency
402    coprocEnvironments.addAll(configured);
403  }
404
405  @Override
406  public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq,
407      Configuration conf) {
408    // If coprocessor exposes any services, register them.
409    for (Service service : instance.getServices()) {
410      region.registerService(service);
411    }
412    ConcurrentMap<String, Object> classData;
413    // make sure only one thread can add maps
414    synchronized (SHARED_DATA_MAP) {
415      // as long as at least one RegionEnvironment holds on to its classData it will
416      // remain in this map
417      classData =
418          SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(),
419              k -> new ConcurrentHashMap<>());
420    }
421    // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices.
422    return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)?
423        new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region,
424            rsServices, classData):
425        new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData);
426  }
427
428  @Override
429  public RegionCoprocessor checkAndGetInstance(Class<?> implClass)
430      throws InstantiationException, IllegalAccessException {
431    try {
432      if (RegionCoprocessor.class.isAssignableFrom(implClass)) {
433        return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance();
434      } else {
435        LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}",
436            implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
437        return null;
438      }
439    } catch (NoSuchMethodException | InvocationTargetException e) {
440      throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e);
441    }
442  }
443
444  private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter =
445      RegionCoprocessor::getRegionObserver;
446
447  private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter =
448      RegionCoprocessor::getEndpointObserver;
449
450  abstract class RegionObserverOperationWithoutResult extends
451      ObserverOperationWithoutResult<RegionObserver> {
452    public RegionObserverOperationWithoutResult() {
453      super(regionObserverGetter);
454    }
455
456    public RegionObserverOperationWithoutResult(User user) {
457      super(regionObserverGetter, user);
458    }
459
460    public RegionObserverOperationWithoutResult(boolean bypassable) {
461      super(regionObserverGetter, null, bypassable);
462    }
463
464    public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
465      super(regionObserverGetter, user, bypassable);
466    }
467  }
468
469  abstract class BulkLoadObserverOperation extends
470      ObserverOperationWithoutResult<BulkLoadObserver> {
471    public BulkLoadObserverOperation(User user) {
472      super(RegionCoprocessor::getBulkLoadObserver, user);
473    }
474  }
475
476
477  //////////////////////////////////////////////////////////////////////////////////////////////////
478  // Observer operations
479  //////////////////////////////////////////////////////////////////////////////////////////////////
480
481  //////////////////////////////////////////////////////////////////////////////////////////////////
482  // Observer operations
483  //////////////////////////////////////////////////////////////////////////////////////////////////
484
485  /**
486   * Invoked before a region open.
487   *
488   * @throws IOException Signals that an I/O exception has occurred.
489   */
490  public void preOpen() throws IOException {
491    if (coprocEnvironments.isEmpty()) {
492      return;
493    }
494    execOperation(new RegionObserverOperationWithoutResult() {
495      @Override
496      public void call(RegionObserver observer) throws IOException {
497        observer.preOpen(this);
498      }
499    });
500  }
501
502
503  /**
504   * Invoked after a region open
505   */
506  public void postOpen() {
507    if (coprocEnvironments.isEmpty()) {
508      return;
509    }
510    try {
511      execOperation(new RegionObserverOperationWithoutResult() {
512        @Override
513        public void call(RegionObserver observer) throws IOException {
514          observer.postOpen(this);
515        }
516      });
517    } catch (IOException e) {
518      LOG.warn(e.toString(), e);
519    }
520  }
521
522  /**
523   * Invoked before a region is closed
524   * @param abortRequested true if the server is aborting
525   */
526  public void preClose(final boolean abortRequested) throws IOException {
527    execOperation(new RegionObserverOperationWithoutResult() {
528      @Override
529      public void call(RegionObserver observer) throws IOException {
530        observer.preClose(this, abortRequested);
531      }
532    });
533  }
534
535  /**
536   * Invoked after a region is closed
537   * @param abortRequested true if the server is aborting
538   */
539  public void postClose(final boolean abortRequested) {
540    try {
541      execOperation(new RegionObserverOperationWithoutResult() {
542        @Override
543        public void call(RegionObserver observer) throws IOException {
544          observer.postClose(this, abortRequested);
545        }
546
547        @Override
548        public void postEnvCall() {
549          shutdown(this.getEnvironment());
550        }
551      });
552    } catch (IOException e) {
553      LOG.warn(e.toString(), e);
554    }
555  }
556
557  /**
558   * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
559   * available candidates.
560   * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed
561   * the passed in <code>candidates</code>.
562   * @param store The store where compaction is being requested
563   * @param candidates The currently available store files
564   * @param tracker used to track the life cycle of a compaction
565   * @param user the user
566   * @throws IOException
567   */
568  public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates,
569      final CompactionLifeCycleTracker tracker, final User user) throws IOException {
570    if (coprocEnvironments.isEmpty()) {
571      return false;
572    }
573    boolean bypassable = true;
574    return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) {
575      @Override
576      public void call(RegionObserver observer) throws IOException {
577        observer.preCompactSelection(this, store, candidates, tracker);
578      }
579    });
580  }
581
582  /**
583   * Called after the {@link HStoreFile}s to be compacted have been selected from the available
584   * candidates.
585   * @param store The store where compaction is being requested
586   * @param selected The store files selected to compact
587   * @param tracker used to track the life cycle of a compaction
588   * @param request the compaction request
589   * @param user the user
590   */
591  public void postCompactSelection(final HStore store, final List<HStoreFile> selected,
592      final CompactionLifeCycleTracker tracker, final CompactionRequest request,
593      final User user) throws IOException {
594    if (coprocEnvironments.isEmpty()) {
595      return;
596    }
597    execOperation(new RegionObserverOperationWithoutResult(user) {
598      @Override
599      public void call(RegionObserver observer) throws IOException {
600        observer.postCompactSelection(this, store, selected, tracker, request);
601      }
602    });
603  }
604
605  /**
606   * Called prior to opening store scanner for compaction.
607   */
608  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
609      CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
610    if (coprocEnvironments.isEmpty()) {
611      return store.getScanInfo();
612    }
613    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
614    execOperation(new RegionObserverOperationWithoutResult(user) {
615      @Override
616      public void call(RegionObserver observer) throws IOException {
617        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
618      }
619    });
620    return builder.build();
621  }
622
623  /**
624   * Called prior to rewriting the store files selected for compaction
625   * @param store the store being compacted
626   * @param scanner the scanner used to read store data during compaction
627   * @param scanType type of Scan
628   * @param tracker used to track the life cycle of a compaction
629   * @param request the compaction request
630   * @param user the user
631   * @return Scanner to use (cannot be null!)
632   * @throws IOException
633   */
634  public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
635      final ScanType scanType, final CompactionLifeCycleTracker tracker,
636      final CompactionRequest request, final User user) throws IOException {
637    InternalScanner defaultResult = scanner;
638    if (coprocEnvironments.isEmpty()) {
639      return defaultResult;
640    }
641    return execOperationWithResult(
642        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter,
643            defaultResult, user) {
644          @Override
645          public InternalScanner call(RegionObserver observer) throws IOException {
646            InternalScanner scanner =
647                observer.preCompact(this, store, getResult(), scanType, tracker, request);
648            if (scanner == null) {
649              throw new CoprocessorException("Null Scanner return disallowed!");
650            }
651            return scanner;
652          }
653        });
654  }
655
656  /**
657   * Called after the store compaction has completed.
658   * @param store the store being compacted
659   * @param resultFile the new store file written during compaction
660   * @param tracker used to track the life cycle of a compaction
661   * @param request the compaction request
662   * @param user the user
663   * @throws IOException
664   */
665  public void postCompact(final HStore store, final HStoreFile resultFile,
666      final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user)
667      throws IOException {
668    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) {
669      @Override
670      public void call(RegionObserver observer) throws IOException {
671        observer.postCompact(this, store, resultFile, tracker, request);
672      }
673    });
674  }
675
676  /**
677   * Invoked before create StoreScanner for flush.
678   */
679  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
680      throws IOException {
681    if (coprocEnvironments.isEmpty()) {
682      return store.getScanInfo();
683    }
684    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
685    execOperation(new RegionObserverOperationWithoutResult() {
686      @Override
687      public void call(RegionObserver observer) throws IOException {
688        observer.preFlushScannerOpen(this, store, builder, tracker);
689      }
690    });
691    return builder.build();
692  }
693
694  /**
695   * Invoked before a memstore flush
696   * @return Scanner to use (cannot be null!)
697   * @throws IOException
698   */
699  public InternalScanner preFlush(HStore store, InternalScanner scanner,
700      FlushLifeCycleTracker tracker) throws IOException {
701    if (coprocEnvironments.isEmpty()) {
702      return scanner;
703    }
704    return execOperationWithResult(
705        new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
706          @Override
707          public InternalScanner call(RegionObserver observer) throws IOException {
708            InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
709            if (scanner == null) {
710              throw new CoprocessorException("Null Scanner return disallowed!");
711            }
712            return scanner;
713          }
714        });
715  }
716
717  /**
718   * Invoked before a memstore flush
719   * @throws IOException
720   */
721  public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
722    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
723      @Override
724      public void call(RegionObserver observer) throws IOException {
725        observer.preFlush(this, tracker);
726      }
727    });
728  }
729
730  /**
731   * Invoked after a memstore flush
732   * @throws IOException
733   */
734  public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
735    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
736      @Override
737      public void call(RegionObserver observer) throws IOException {
738        observer.postFlush(this, tracker);
739      }
740    });
741  }
742
743  /**
744   * Invoked before in memory compaction.
745   */
746  public void preMemStoreCompaction(HStore store) throws IOException {
747    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
748      @Override
749      public void call(RegionObserver observer) throws IOException {
750        observer.preMemStoreCompaction(this, store);
751      }
752    });
753  }
754
755  /**
756   * Invoked before create StoreScanner for in memory compaction.
757   */
758  public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
759    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
760    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
761      @Override
762      public void call(RegionObserver observer) throws IOException {
763        observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
764      }
765    });
766    return builder.build();
767  }
768
769  /**
770   * Invoked before compacting memstore.
771   */
772  public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
773      throws IOException {
774    if (coprocEnvironments.isEmpty()) {
775      return scanner;
776    }
777    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
778        regionObserverGetter, scanner) {
779      @Override
780      public InternalScanner call(RegionObserver observer) throws IOException {
781        return observer.preMemStoreCompactionCompact(this, store, getResult());
782      }
783    });
784  }
785
786  /**
787   * Invoked after in memory compaction.
788   */
789  public void postMemStoreCompaction(HStore store) throws IOException {
790    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
791      @Override
792      public void call(RegionObserver observer) throws IOException {
793        observer.postMemStoreCompaction(this, store);
794      }
795    });
796  }
797
798  /**
799   * Invoked after a memstore flush
800   * @throws IOException
801   */
802  public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
803      throws IOException {
804    if (coprocEnvironments.isEmpty()) {
805      return;
806    }
807    execOperation(new RegionObserverOperationWithoutResult() {
808      @Override
809      public void call(RegionObserver observer) throws IOException {
810        observer.postFlush(this, store, storeFile, tracker);
811      }
812    });
813  }
814
815  // RegionObserver support
816  /**
817   * Supports Coprocessor 'bypass'.
818   * @param get the Get request
819   * @param results What to return if return is true/'bypass'.
820   * @return true if default processing should be bypassed.
821   * @exception IOException Exception
822   */
823  public boolean preGet(final Get get, final List<Cell> results) throws IOException {
824    if (coprocEnvironments.isEmpty()) {
825      return false;
826    }
827    boolean bypassable = true;
828    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
829      @Override
830      public void call(RegionObserver observer) throws IOException {
831        observer.preGetOp(this, get, results);
832      }
833    });
834  }
835
836  /**
837   * @param get the Get request
838   * @param results the result set
839   * @exception IOException Exception
840   */
841  public void postGet(final Get get, final List<Cell> results)
842      throws IOException {
843    if (coprocEnvironments.isEmpty()) {
844      return;
845    }
846    execOperation(new RegionObserverOperationWithoutResult() {
847      @Override
848      public void call(RegionObserver observer) throws IOException {
849        observer.postGetOp(this, get, results);
850      }
851    });
852  }
853
854  /**
855   * Supports Coprocessor 'bypass'.
856   * @param get the Get request
857   * @return true or false to return to client if bypassing normal operation, or null otherwise
858   * @exception IOException Exception
859   */
860  public Boolean preExists(final Get get) throws IOException {
861    boolean bypassable = true;
862    boolean defaultResult = false;
863    if (coprocEnvironments.isEmpty()) {
864      return null;
865    }
866    return execOperationWithResult(
867        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
868            defaultResult, bypassable) {
869          @Override
870          public Boolean call(RegionObserver observer) throws IOException {
871            return observer.preExists(this, get, getResult());
872          }
873        });
874  }
875
876  /**
877   * @param get the Get request
878   * @param result the result returned by the region server
879   * @return the result to return to the client
880   * @exception IOException Exception
881   */
882  public boolean postExists(final Get get, boolean result)
883      throws IOException {
884    if (this.coprocEnvironments.isEmpty()) {
885      return result;
886    }
887    return execOperationWithResult(
888        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) {
889          @Override
890          public Boolean call(RegionObserver observer) throws IOException {
891            return observer.postExists(this, get, getResult());
892          }
893        });
894  }
895
896  /**
897   * Supports Coprocessor 'bypass'.
898   * @param put The Put object
899   * @param edit The WALEdit object.
900   * @param durability The durability used
901   * @return true if default processing should be bypassed
902   * @exception IOException Exception
903   */
904  public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
905      throws IOException {
906    if (coprocEnvironments.isEmpty()) {
907      return false;
908    }
909    boolean bypassable = true;
910    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
911      @Override
912      public void call(RegionObserver observer) throws IOException {
913        observer.prePut(this, put, edit, durability);
914      }
915    });
916  }
917
918  /**
919   * Supports Coprocessor 'bypass'.
920   * @param mutation - the current mutation
921   * @param kv - the current cell
922   * @param byteNow - current timestamp in bytes
923   * @param get - the get that could be used
924   * Note that the get only does not specify the family and qualifier that should be used
925   * @return true if default processing should be bypassed
926   * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single
927   * Coprocessor for its needs only. Will be removed.
928   */
929  @Deprecated
930  public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
931      final Cell kv, final byte[] byteNow, final Get get) throws IOException {
932    if (coprocEnvironments.isEmpty()) {
933      return false;
934    }
935    boolean bypassable = true;
936    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
937      @Override
938      public void call(RegionObserver observer) throws IOException {
939          observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get);
940      }
941    });
942  }
943
944  /**
945   * @param put The Put object
946   * @param edit The WALEdit object.
947   * @param durability The durability used
948   * @exception IOException Exception
949   */
950  public void postPut(final Put put, final WALEdit edit, final Durability durability)
951      throws IOException {
952    if (coprocEnvironments.isEmpty()) {
953      return;
954    }
955    execOperation(new RegionObserverOperationWithoutResult() {
956      @Override
957      public void call(RegionObserver observer) throws IOException {
958        observer.postPut(this, put, edit, durability);
959      }
960    });
961  }
962
963  /**
964   * Supports Coprocessor 'bypass'.
965   * @param delete The Delete object
966   * @param edit The WALEdit object.
967   * @param durability The durability used
968   * @return true if default processing should be bypassed
969   * @exception IOException Exception
970   */
971  public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
972      throws IOException {
973    if (this.coprocEnvironments.isEmpty()) {
974      return false;
975    }
976    boolean bypassable = true;
977    return execOperation(new RegionObserverOperationWithoutResult(bypassable) {
978      @Override
979      public void call(RegionObserver observer) throws IOException {
980         observer.preDelete(this, delete, edit, durability);
981      }
982    });
983  }
984
985  /**
986   * @param delete The Delete object
987   * @param edit The WALEdit object.
988   * @param durability The durability used
989   * @exception IOException Exception
990   */
991  public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
992      throws IOException {
993    execOperation(coprocEnvironments.isEmpty()? null:
994        new RegionObserverOperationWithoutResult() {
995      @Override
996      public void call(RegionObserver observer) throws IOException {
997        observer.postDelete(this, delete, edit, durability);
998      }
999    });
1000  }
1001
1002  public void preBatchMutate(
1003      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1004    if(this.coprocEnvironments.isEmpty()) {
1005      return;
1006    }
1007    execOperation(new RegionObserverOperationWithoutResult() {
1008      @Override
1009      public void call(RegionObserver observer) throws IOException {
1010        observer.preBatchMutate(this, miniBatchOp);
1011      }
1012    });
1013  }
1014
1015  public void postBatchMutate(
1016      final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1017    if (this.coprocEnvironments.isEmpty()) {
1018      return;
1019    }
1020    execOperation(new RegionObserverOperationWithoutResult() {
1021      @Override
1022      public void call(RegionObserver observer) throws IOException {
1023        observer.postBatchMutate(this, miniBatchOp);
1024      }
1025    });
1026  }
1027
1028  public void postBatchMutateIndispensably(
1029      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
1030      throws IOException {
1031    if (this.coprocEnvironments.isEmpty()) {
1032      return;
1033    }
1034    execOperation(new RegionObserverOperationWithoutResult() {
1035      @Override
1036      public void call(RegionObserver observer) throws IOException {
1037        observer.postBatchMutateIndispensably(this, miniBatchOp, success);
1038      }
1039    });
1040  }
1041
1042  /**
1043   * Supports Coprocessor 'bypass'.
1044   * @param checkAndMutate the CheckAndMutate object
1045   * @return true or false to return to client if default processing should be bypassed, or null
1046   *   otherwise
1047   * @throws IOException if an error occurred on the coprocessor
1048   */
1049  public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate)
1050    throws IOException {
1051    boolean bypassable = true;
1052    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1053    if (coprocEnvironments.isEmpty()) {
1054      return null;
1055    }
1056    return execOperationWithResult(
1057      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1058        regionObserverGetter, defaultResult, bypassable) {
1059        @Override
1060        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1061          return observer.preCheckAndMutate(this, checkAndMutate, getResult());
1062        }
1063      });
1064  }
1065
1066  /**
1067   * Supports Coprocessor 'bypass'.
1068   * @param checkAndMutate the CheckAndMutate object
1069   * @return true or false to return to client if default processing should be bypassed, or null
1070   *   otherwise
1071   * @throws IOException if an error occurred on the coprocessor
1072   */
1073  public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate)
1074    throws IOException {
1075    boolean bypassable = true;
1076    CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null);
1077    if (coprocEnvironments.isEmpty()) {
1078      return null;
1079    }
1080    return execOperationWithResult(
1081      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1082        regionObserverGetter, defaultResult, bypassable) {
1083        @Override
1084        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1085          return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult());
1086        }
1087      });
1088  }
1089
1090  /**
1091   * @param checkAndMutate the CheckAndMutate object
1092   * @param result the result returned by the checkAndMutate
1093   * @return true or false to return to client if default processing should be bypassed, or null
1094   *   otherwise
1095   * @throws IOException if an error occurred on the coprocessor
1096   */
1097  public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate,
1098    CheckAndMutateResult result) throws IOException {
1099    if (this.coprocEnvironments.isEmpty()) {
1100      return result;
1101    }
1102    return execOperationWithResult(
1103      new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(
1104        regionObserverGetter, result) {
1105        @Override
1106        public CheckAndMutateResult call(RegionObserver observer) throws IOException {
1107          return observer.postCheckAndMutate(this, checkAndMutate, getResult());
1108        }
1109      });
1110  }
1111
1112  /**
1113   * Supports Coprocessor 'bypass'.
1114   * @param append append object
1115   * @return result to return to client if default operation should be bypassed, null otherwise
1116   * @throws IOException if an error occurred on the coprocessor
1117   */
1118  public Result preAppend(final Append append) throws IOException {
1119    boolean bypassable = true;
1120    Result defaultResult = null;
1121    if (this.coprocEnvironments.isEmpty()) {
1122      return defaultResult;
1123    }
1124    return execOperationWithResult(
1125      new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1126            bypassable) {
1127          @Override
1128          public Result call(RegionObserver observer) throws IOException {
1129            return observer.preAppend(this, append);
1130          }
1131        });
1132  }
1133
1134  /**
1135   * Supports Coprocessor 'bypass'.
1136   * @param append append object
1137   * @return result to return to client if default operation should be bypassed, null otherwise
1138   * @throws IOException if an error occurred on the coprocessor
1139   */
1140  public Result preAppendAfterRowLock(final Append append) throws IOException {
1141    boolean bypassable = true;
1142    Result defaultResult = null;
1143    if (this.coprocEnvironments.isEmpty()) {
1144      return defaultResult;
1145    }
1146    return execOperationWithResult(
1147        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter,
1148            defaultResult, bypassable) {
1149          @Override
1150          public Result call(RegionObserver observer) throws IOException {
1151            return observer.preAppendAfterRowLock(this, append);
1152          }
1153        });
1154  }
1155
1156  /**
1157   * Supports Coprocessor 'bypass'.
1158   * @param increment increment object
1159   * @return result to return to client if default operation should be bypassed, null otherwise
1160   * @throws IOException if an error occurred on the coprocessor
1161   */
1162  public Result preIncrement(final Increment increment) throws IOException {
1163    boolean bypassable = true;
1164    Result defaultResult = null;
1165    if (coprocEnvironments.isEmpty()) {
1166      return defaultResult;
1167    }
1168    return execOperationWithResult(
1169        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1170            bypassable) {
1171          @Override
1172          public Result call(RegionObserver observer) throws IOException {
1173            return observer.preIncrement(this, increment);
1174          }
1175        });
1176  }
1177
1178  /**
1179   * Supports Coprocessor 'bypass'.
1180   * @param increment increment object
1181   * @return result to return to client if default operation should be bypassed, null otherwise
1182   * @throws IOException if an error occurred on the coprocessor
1183   */
1184  public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1185    boolean bypassable = true;
1186    Result defaultResult = null;
1187    if (coprocEnvironments.isEmpty()) {
1188      return defaultResult;
1189    }
1190    return execOperationWithResult(
1191        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult,
1192            bypassable) {
1193          @Override
1194          public Result call(RegionObserver observer) throws IOException {
1195            return observer.preIncrementAfterRowLock(this, increment);
1196          }
1197        });
1198  }
1199
1200  /**
1201   * @param append Append object
1202   * @param result the result returned by the append
1203   * @throws IOException if an error occurred on the coprocessor
1204   */
1205  public Result postAppend(final Append append, final Result result) throws IOException {
1206    if (this.coprocEnvironments.isEmpty()) {
1207      return result;
1208    }
1209    return execOperationWithResult(
1210        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1211          @Override
1212          public Result call(RegionObserver observer) throws IOException {
1213            return observer.postAppend(this, append, result);
1214          }
1215        });
1216  }
1217
1218  /**
1219   * @param increment increment object
1220   * @param result the result returned by postIncrement
1221   * @throws IOException if an error occurred on the coprocessor
1222   */
1223  public Result postIncrement(final Increment increment, Result result) throws IOException {
1224    if (this.coprocEnvironments.isEmpty()) {
1225      return result;
1226    }
1227    return execOperationWithResult(
1228        new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) {
1229          @Override
1230          public Result call(RegionObserver observer) throws IOException {
1231            return observer.postIncrement(this, increment, getResult());
1232          }
1233        });
1234  }
1235
1236  /**
1237   * @param scan the Scan specification
1238   * @exception IOException Exception
1239   */
1240  public void preScannerOpen(final Scan scan) throws IOException {
1241    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1242      @Override
1243      public void call(RegionObserver observer) throws IOException {
1244        observer.preScannerOpen(this, scan);
1245      }
1246    });
1247  }
1248
1249  /**
1250   * @param scan the Scan specification
1251   * @param s the scanner
1252   * @return the scanner instance to use
1253   * @exception IOException Exception
1254   */
1255  public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1256    if (this.coprocEnvironments.isEmpty()) {
1257      return s;
1258    }
1259    return execOperationWithResult(
1260        new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) {
1261          @Override
1262          public RegionScanner call(RegionObserver observer) throws IOException {
1263            return observer.postScannerOpen(this, scan, getResult());
1264          }
1265        });
1266  }
1267
1268  /**
1269   * @param s the scanner
1270   * @param results the result set returned by the region server
1271   * @param limit the maximum number of results to return
1272   * @return 'has next' indication to client if bypassing default behavior, or null otherwise
1273   * @exception IOException Exception
1274   */
1275  public Boolean preScannerNext(final InternalScanner s,
1276      final List<Result> results, final int limit) throws IOException {
1277    boolean bypassable = true;
1278    boolean defaultResult = false;
1279    if (coprocEnvironments.isEmpty()) {
1280      return null;
1281    }
1282    return execOperationWithResult(
1283        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter,
1284            defaultResult, bypassable) {
1285          @Override
1286          public Boolean call(RegionObserver observer) throws IOException {
1287            return observer.preScannerNext(this, s, results, limit, getResult());
1288          }
1289        });
1290  }
1291
1292  /**
1293   * @param s the scanner
1294   * @param results the result set returned by the region server
1295   * @param limit the maximum number of results to return
1296   * @param hasMore
1297   * @return 'has more' indication to give to client
1298   * @exception IOException Exception
1299   */
1300  public boolean postScannerNext(final InternalScanner s,
1301      final List<Result> results, final int limit, boolean hasMore)
1302      throws IOException {
1303    if (this.coprocEnvironments.isEmpty()) {
1304      return hasMore;
1305    }
1306    return execOperationWithResult(
1307        new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) {
1308          @Override
1309          public Boolean call(RegionObserver observer) throws IOException {
1310            return observer.postScannerNext(this, s, results, limit, getResult());
1311          }
1312        });
1313  }
1314
1315  /**
1316   * This will be called by the scan flow when the current scanned row is being filtered out by the
1317   * filter.
1318   * @param s the scanner
1319   * @param curRowCell The cell in the current row which got filtered out
1320   * @return whether more rows are available for the scanner or not
1321   * @throws IOException
1322   */
1323  public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell)
1324      throws IOException {
1325    // short circuit for performance
1326    boolean defaultResult = true;
1327    if (!hasCustomPostScannerFilterRow) {
1328      return defaultResult;
1329    }
1330    if (this.coprocEnvironments.isEmpty()) {
1331      return defaultResult;
1332    }
1333    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>(
1334        regionObserverGetter, defaultResult) {
1335      @Override
1336      public Boolean call(RegionObserver observer) throws IOException {
1337        return observer.postScannerFilterRow(this, s, curRowCell, getResult());
1338      }
1339    });
1340  }
1341
1342  /**
1343   * Supports Coprocessor 'bypass'.
1344   * @param s the scanner
1345   * @return true if default behavior should be bypassed, false otherwise
1346   * @exception IOException Exception
1347   */
1348  // Should this be bypassable?
1349  public boolean preScannerClose(final InternalScanner s) throws IOException {
1350    return execOperation(coprocEnvironments.isEmpty()? null:
1351        new RegionObserverOperationWithoutResult(true) {
1352      @Override
1353      public void call(RegionObserver observer) throws IOException {
1354        observer.preScannerClose(this, s);
1355      }
1356    });
1357  }
1358
1359  /**
1360   * @exception IOException Exception
1361   */
1362  public void postScannerClose(final InternalScanner s) throws IOException {
1363    execOperation(coprocEnvironments.isEmpty()? null:
1364        new RegionObserverOperationWithoutResult() {
1365      @Override
1366      public void call(RegionObserver observer) throws IOException {
1367        observer.postScannerClose(this, s);
1368      }
1369    });
1370  }
1371
1372  /**
1373   * Called before open store scanner for user scan.
1374   */
1375  public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
1376    if (coprocEnvironments.isEmpty()) return store.getScanInfo();
1377    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
1378    execOperation(new RegionObserverOperationWithoutResult() {
1379      @Override
1380      public void call(RegionObserver observer) throws IOException {
1381        observer.preStoreScannerOpen(this, store, builder);
1382      }
1383    });
1384    return builder.build();
1385  }
1386
1387  /**
1388   * @param info the RegionInfo for this region
1389   * @param edits the file of recovered edits
1390   */
1391  public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1392    execOperation(coprocEnvironments.isEmpty()? null:
1393        new RegionObserverOperationWithoutResult(true) {
1394      @Override
1395      public void call(RegionObserver observer) throws IOException {
1396        observer.preReplayWALs(this, info, edits);
1397      }
1398    });
1399  }
1400
1401  /**
1402   * @param info the RegionInfo for this region
1403   * @param edits the file of recovered edits
1404   * @throws IOException Exception
1405   */
1406  public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException {
1407    execOperation(coprocEnvironments.isEmpty()? null:
1408        new RegionObserverOperationWithoutResult() {
1409      @Override
1410      public void call(RegionObserver observer) throws IOException {
1411        observer.postReplayWALs(this, info, edits);
1412      }
1413    });
1414  }
1415
1416  /**
1417   * Supports Coprocessor 'bypass'.
1418   * @return true if default behavior should be bypassed, false otherwise
1419   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1420   * with something that doesn't expose IntefaceAudience.Private classes.
1421   */
1422  @Deprecated
1423  public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1424      throws IOException {
1425    return execOperation(coprocEnvironments.isEmpty()? null:
1426        new RegionObserverOperationWithoutResult(true) {
1427      @Override
1428      public void call(RegionObserver observer) throws IOException {
1429        observer.preWALRestore(this, info, logKey, logEdit);
1430      }
1431    });
1432  }
1433
1434  /**
1435   * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
1436   * with something that doesn't expose IntefaceAudience.Private classes.
1437   */
1438  @Deprecated
1439  public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1440      throws IOException {
1441    execOperation(coprocEnvironments.isEmpty()? null:
1442        new RegionObserverOperationWithoutResult() {
1443      @Override
1444      public void call(RegionObserver observer) throws IOException {
1445        observer.postWALRestore(this, info, logKey, logEdit);
1446      }
1447    });
1448  }
1449
1450  /**
1451   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1452   */
1453  public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1454    execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() {
1455      @Override
1456      public void call(RegionObserver observer) throws IOException {
1457        observer.preBulkLoadHFile(this, familyPaths);
1458      }
1459    });
1460  }
1461
1462  public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1463      throws IOException {
1464    return execOperation(coprocEnvironments.isEmpty()? null:
1465        new RegionObserverOperationWithoutResult() {
1466      @Override
1467      public void call(RegionObserver observer) throws IOException {
1468        observer.preCommitStoreFile(this, family, pairs);
1469      }
1470    });
1471  }
1472
1473  public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException {
1474    execOperation(coprocEnvironments.isEmpty()? null:
1475        new RegionObserverOperationWithoutResult() {
1476      @Override
1477      public void call(RegionObserver observer) throws IOException {
1478        observer.postCommitStoreFile(this, family, srcPath, dstPath);
1479      }
1480    });
1481  }
1482
1483  /**
1484   * @param familyPaths pairs of { CF, file path } submitted for bulk load
1485   * @param map Map of CF to List of file paths for the final loaded files
1486   * @throws IOException
1487   */
1488  public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1489      Map<byte[], List<Path>> map) throws IOException {
1490    if (this.coprocEnvironments.isEmpty()) {
1491      return;
1492    }
1493    execOperation(coprocEnvironments.isEmpty()? null:
1494        new RegionObserverOperationWithoutResult() {
1495          @Override
1496          public void call(RegionObserver observer) throws IOException {
1497            observer.postBulkLoadHFile(this, familyPaths, map);
1498          }
1499        });
1500  }
1501
1502  public void postStartRegionOperation(final Operation op) throws IOException {
1503    execOperation(coprocEnvironments.isEmpty()? null:
1504        new RegionObserverOperationWithoutResult() {
1505      @Override
1506      public void call(RegionObserver observer) throws IOException {
1507        observer.postStartRegionOperation(this, op);
1508      }
1509    });
1510  }
1511
1512  public void postCloseRegionOperation(final Operation op) throws IOException {
1513    execOperation(coprocEnvironments.isEmpty()? null:
1514        new RegionObserverOperationWithoutResult() {
1515      @Override
1516      public void call(RegionObserver observer) throws IOException {
1517        observer.postCloseRegionOperation(this, op);
1518      }
1519    });
1520  }
1521
1522  /**
1523   * @param fs fileystem to read from
1524   * @param p path to the file
1525   * @param in {@link FSDataInputStreamWrapper}
1526   * @param size Full size of the file
1527   * @param cacheConf
1528   * @param r original reference file. This will be not null only when reading a split file.
1529   * @return a Reader instance to use instead of the base reader if overriding
1530   * default behavior, null otherwise
1531   * @throws IOException
1532   */
1533  public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1534      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1535      final Reference r) throws IOException {
1536    if (coprocEnvironments.isEmpty()) {
1537      return null;
1538    }
1539    return execOperationWithResult(
1540        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
1541          @Override
1542          public StoreFileReader call(RegionObserver observer) throws IOException {
1543            return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1544                getResult());
1545          }
1546        });
1547  }
1548
1549  /**
1550   * @param fs fileystem to read from
1551   * @param p path to the file
1552   * @param in {@link FSDataInputStreamWrapper}
1553   * @param size Full size of the file
1554   * @param cacheConf
1555   * @param r original reference file. This will be not null only when reading a split file.
1556   * @param reader the base reader instance
1557   * @return The reader to use
1558   * @throws IOException
1559   */
1560  public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1561      final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1562      final Reference r, final StoreFileReader reader) throws IOException {
1563    if (this.coprocEnvironments.isEmpty()) {
1564      return reader;
1565    }
1566    return execOperationWithResult(
1567        new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
1568          @Override
1569          public StoreFileReader call(RegionObserver observer) throws IOException {
1570            return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
1571                getResult());
1572          }
1573        });
1574  }
1575
1576  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
1577      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1578    if (this.coprocEnvironments.isEmpty()) {
1579      return cellPairs;
1580    }
1581    return execOperationWithResult(
1582        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1583            regionObserverGetter, cellPairs) {
1584          @Override
1585          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1586            return observer.postIncrementBeforeWAL(this, mutation, getResult());
1587          }
1588        });
1589  }
1590
1591  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
1592      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
1593    if (this.coprocEnvironments.isEmpty()) {
1594      return cellPairs;
1595    }
1596    return execOperationWithResult(
1597        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
1598            regionObserverGetter, cellPairs) {
1599          @Override
1600          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
1601            return observer.postAppendBeforeWAL(this, mutation, getResult());
1602          }
1603        });
1604  }
1605
1606  public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1607    if (this.coprocEnvironments.isEmpty()){
1608      return;
1609    }
1610    execOperation(new RegionObserverOperationWithoutResult() {
1611      @Override
1612      public void call(RegionObserver observer) throws IOException {
1613        observer.preWALAppend(this, key, edit);
1614      }
1615    });
1616  }
1617
1618  public Message preEndpointInvocation(final Service service, final String methodName,
1619      Message request) throws IOException {
1620    if (coprocEnvironments.isEmpty()) {
1621      return request;
1622    }
1623    return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver,
1624        Message>(endpointObserverGetter, request) {
1625      @Override
1626      public Message call(EndpointObserver observer) throws IOException {
1627        return observer.preEndpointInvocation(this, service, methodName, getResult());
1628      }
1629    });
1630  }
1631
1632  public void postEndpointInvocation(final Service service, final String methodName,
1633      final Message request, final Message.Builder responseBuilder) throws IOException {
1634    execOperation(coprocEnvironments.isEmpty() ? null :
1635        new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) {
1636          @Override
1637          public void call(EndpointObserver observer) throws IOException {
1638            observer.postEndpointInvocation(this, service, methodName, request, responseBuilder);
1639          }
1640        });
1641  }
1642
1643  /**
1644   * @deprecated Since 2.0 with out any replacement and will be removed in 3.0
1645   */
1646  @Deprecated
1647  public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException {
1648    if (this.coprocEnvironments.isEmpty()) {
1649      return result;
1650    }
1651    return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>(
1652        regionObserverGetter, result) {
1653      @Override
1654      public DeleteTracker call(RegionObserver observer) throws IOException {
1655        return observer.postInstantiateDeleteTracker(this, getResult());
1656      }
1657    });
1658  }
1659
1660  /////////////////////////////////////////////////////////////////////////////////////////////////
1661  // BulkLoadObserver hooks
1662  /////////////////////////////////////////////////////////////////////////////////////////////////
1663  public void prePrepareBulkLoad(User user) throws IOException {
1664    execOperation(coprocEnvironments.isEmpty() ? null :
1665        new BulkLoadObserverOperation(user) {
1666          @Override protected void call(BulkLoadObserver observer) throws IOException {
1667            observer.prePrepareBulkLoad(this);
1668          }
1669        });
1670  }
1671
1672  public void preCleanupBulkLoad(User user) throws IOException {
1673    execOperation(coprocEnvironments.isEmpty() ? null :
1674        new BulkLoadObserverOperation(user) {
1675          @Override protected void call(BulkLoadObserver observer) throws IOException {
1676            observer.preCleanupBulkLoad(this);
1677          }
1678        });
1679  }
1680}