001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.UUID; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.RawCellBuilder; 036import org.apache.hadoop.hbase.RawCellBuilderFactory; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.CheckAndMutate; 040import org.apache.hadoop.hbase.client.CheckAndMutateResult; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.SharedConnection; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 057import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.ObserverContext; 061import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 064import org.apache.hadoop.hbase.coprocessor.RegionObserver; 065import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 066import org.apache.hadoop.hbase.io.Reference; 067import org.apache.hadoop.hbase.io.hfile.CacheConfig; 068import org.apache.hadoop.hbase.metrics.MetricRegistry; 069import org.apache.hadoop.hbase.quotas.OperationQuota; 070import org.apache.hadoop.hbase.quotas.RpcQuotaManager; 071import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 072import org.apache.hadoop.hbase.regionserver.Region.Operation; 073import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 074import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 075import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 076import org.apache.hadoop.hbase.security.User; 077import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.wal.WALEdit; 080import org.apache.hadoop.hbase.wal.WALKey; 081import org.apache.yetus.audience.InterfaceAudience; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084 085import org.apache.hbase.thirdparty.com.google.protobuf.Message; 086import org.apache.hbase.thirdparty.com.google.protobuf.Service; 087import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 089 090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 092 093/** 094 * Implements the coprocessor environment and runtime support for coprocessors loaded within a 095 * {@link Region}. 096 */ 097@InterfaceAudience.Private 098public class RegionCoprocessorHost 099 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 100 101 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 102 // The shared data map 103 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 104 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 105 AbstractReferenceMap.ReferenceStrength.WEAK); 106 107 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 108 private final boolean hasCustomPostScannerFilterRow; 109 110 /* 111 * Whether any configured CPs override postScannerFilterRow hook 112 */ 113 public boolean hasCustomPostScannerFilterRow() { 114 return hasCustomPostScannerFilterRow; 115 } 116 117 /** 118 * Encapsulation of the environment of each coprocessor 119 */ 120 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 121 implements RegionCoprocessorEnvironment { 122 private Region region; 123 ConcurrentMap<String, Object> sharedData; 124 private final MetricRegistry metricRegistry; 125 private final RegionServerServices services; 126 private final RpcQuotaManager rpcQuotaManager; 127 128 /** 129 * Constructor 130 * @param impl the coprocessor instance 131 * @param priority chaining priority 132 */ 133 public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq, 134 final Configuration conf, final Region region, final RegionServerServices services, 135 final ConcurrentMap<String, Object> sharedData) { 136 super(impl, priority, seq, conf); 137 this.region = region; 138 this.sharedData = sharedData; 139 this.services = services; 140 this.metricRegistry = 141 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 142 // Some unit tests reach this line with services == null, and are okay with rpcQuotaManager 143 // being null. Let these unit tests succeed. This should not happen in real usage. 144 if (services != null) { 145 this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); 146 } else { 147 this.rpcQuotaManager = null; 148 } 149 } 150 151 /** Returns the region */ 152 @Override 153 public Region getRegion() { 154 return region; 155 } 156 157 @Override 158 public OnlineRegions getOnlineRegions() { 159 return this.services; 160 } 161 162 @Override 163 public Connection getConnection() { 164 // Mocks may have services as null at test time. 165 return services != null ? new SharedConnection(services.getConnection()) : null; 166 } 167 168 @Override 169 public Connection createConnection(Configuration conf) throws IOException { 170 return services != null ? this.services.createConnection(conf) : null; 171 } 172 173 @Override 174 public ServerName getServerName() { 175 return services != null ? services.getServerName() : null; 176 } 177 178 @Override 179 public void shutdown() { 180 super.shutdown(); 181 MetricsCoprocessor.removeRegistry(this.metricRegistry); 182 } 183 184 @Override 185 public ConcurrentMap<String, Object> getSharedData() { 186 return sharedData; 187 } 188 189 @Override 190 public RegionInfo getRegionInfo() { 191 return region.getRegionInfo(); 192 } 193 194 @Override 195 public MetricRegistry getMetricRegistryForRegionServer() { 196 return metricRegistry; 197 } 198 199 @Override 200 public RawCellBuilder getCellBuilder() { 201 // We always do a DEEP_COPY only 202 return RawCellBuilderFactory.create(); 203 } 204 205 @Override 206 public RpcQuotaManager getRpcQuotaManager() { 207 return rpcQuotaManager; 208 } 209 210 @Override 211 public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned, 212 long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException { 213 ClientProtos.ScanRequest scanRequest = RequestConverter 214 .buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false); 215 long maxScannerResultSize = 216 services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 217 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 218 return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize, 219 maxBlockBytesScanned, prevBlockBytesScannedDifference); 220 } 221 222 @Override 223 public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type) 224 throws IOException, RpcThrottlingException { 225 return rpcQuotaManager.checkBatchQuota(region, type); 226 } 227 228 @Override 229 public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) 230 throws IOException, RpcThrottlingException { 231 return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads, false); 232 } 233 } 234 235 /** 236 * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors 237 * only. Temporary hack until Core Coprocessors are integrated into Core. 238 */ 239 private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment 240 implements HasRegionServerServices { 241 private final RegionServerServices rsServices; 242 243 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 244 final int seq, final Configuration conf, final Region region, 245 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 246 super(impl, priority, seq, conf, region, services, sharedData); 247 this.rsServices = services; 248 } 249 250 /** 251 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 252 * consumption. 253 */ 254 @Override 255 public RegionServerServices getRegionServerServices() { 256 return this.rsServices; 257 } 258 } 259 260 static class TableCoprocessorAttribute { 261 private Path path; 262 private String className; 263 private int priority; 264 private Configuration conf; 265 266 public TableCoprocessorAttribute(Path path, String className, int priority, 267 Configuration conf) { 268 this.path = path; 269 this.className = className; 270 this.priority = priority; 271 this.conf = conf; 272 } 273 274 public Path getPath() { 275 return path; 276 } 277 278 public String getClassName() { 279 return className; 280 } 281 282 public int getPriority() { 283 return priority; 284 } 285 286 public Configuration getConf() { 287 return conf; 288 } 289 } 290 291 /** The region server services */ 292 RegionServerServices rsServices; 293 /** The region */ 294 HRegion region; 295 296 /** 297 * Constructor 298 * @param region the region 299 * @param rsServices interface to available region server functionality 300 * @param conf the configuration 301 */ 302 @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization 303 public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices, 304 final Configuration conf) { 305 super(rsServices); 306 this.conf = conf; 307 this.rsServices = rsServices; 308 this.region = region; 309 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 310 311 // load system default cp's from configuration. 312 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 313 314 // load system default cp's for user tables from configuration. 315 if (!region.getRegionInfo().getTable().isSystemTable()) { 316 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 317 } 318 319 // load Coprocessor From HDFS 320 loadTableCoprocessors(conf); 321 322 // now check whether any coprocessor implements postScannerFilterRow 323 boolean hasCustomPostScannerFilterRow = false; 324 out: for (RegionCoprocessorEnvironment env : coprocEnvironments) { 325 if (env.getInstance() instanceof RegionObserver) { 326 Class<?> clazz = env.getInstance().getClass(); 327 for (;;) { 328 if (clazz == Object.class) { 329 // we dont need to look postScannerFilterRow into Object class 330 break; // break the inner loop 331 } 332 try { 333 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 334 InternalScanner.class, Cell.class, boolean.class); 335 // this coprocessor has a custom version of postScannerFilterRow 336 hasCustomPostScannerFilterRow = true; 337 break out; 338 } catch (NoSuchMethodException ignore) { 339 } 340 // the deprecated signature still exists 341 try { 342 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 343 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 344 // this coprocessor has a custom version of postScannerFilterRow 345 hasCustomPostScannerFilterRow = true; 346 break out; 347 } catch (NoSuchMethodException ignore) { 348 } 349 clazz = clazz.getSuperclass(); 350 } 351 } 352 } 353 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 354 } 355 356 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 357 TableDescriptor htd) { 358 return htd.getCoprocessorDescriptors().stream().map(cp -> { 359 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 360 Configuration ourConf; 361 if (!cp.getProperties().isEmpty()) { 362 // do an explicit deep copy of the passed configuration 363 ourConf = new Configuration(false); 364 HBaseConfiguration.merge(ourConf, conf); 365 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 366 } else { 367 ourConf = conf; 368 } 369 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 370 }).collect(Collectors.toList()); 371 } 372 373 /** 374 * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception 375 * if there is a problem. 376 */ 377 public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd) 378 throws IOException { 379 String pathPrefix = UUID.randomUUID().toString(); 380 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) { 381 if (attr.getPriority() < 0) { 382 throw new IOException( 383 "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0"); 384 } 385 ClassLoader old = Thread.currentThread().getContextClassLoader(); 386 try { 387 ClassLoader cl; 388 if (attr.getPath() != null) { 389 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 390 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 391 } else { 392 cl = CoprocessorHost.class.getClassLoader(); 393 } 394 Thread.currentThread().setContextClassLoader(cl); 395 if (cl instanceof CoprocessorClassLoader) { 396 String[] includedClassPrefixes = null; 397 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 398 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 399 includedClassPrefixes = prefixes.split(";"); 400 } 401 ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes); 402 } else { 403 cl.loadClass(attr.getClassName()); 404 } 405 } catch (ClassNotFoundException e) { 406 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 407 } finally { 408 Thread.currentThread().setContextClassLoader(old); 409 } 410 } 411 } 412 413 void loadTableCoprocessors(final Configuration conf) { 414 boolean coprocessorsEnabled = 415 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 416 boolean tableCoprocessorsEnabled = 417 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 418 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 419 return; 420 } 421 422 // scan the table attributes for coprocessor load specifications 423 // initialize the coprocessors 424 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 425 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, 426 region.getTableDescriptor())) { 427 // Load encompasses classloading and coprocessor initialization 428 try { 429 RegionCoprocessorEnvironment env = 430 load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf()); 431 if (env == null) { 432 continue; 433 } 434 configured.add(env); 435 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " 436 + region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 437 } catch (Throwable t) { 438 // Coprocessor failed to load, do we abort on error? 439 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 440 abortServer(attr.getClassName(), t); 441 } else { 442 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 443 } 444 } 445 } 446 // add together to coprocessor set for COW efficiency 447 coprocEnvironments.addAll(configured); 448 } 449 450 @Override 451 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 452 Configuration conf) { 453 // If coprocessor exposes any services, register them. 454 for (Service service : instance.getServices()) { 455 region.registerService(service); 456 } 457 ConcurrentMap<String, Object> classData; 458 // make sure only one thread can add maps 459 synchronized (SHARED_DATA_MAP) { 460 // as long as at least one RegionEnvironment holds on to its classData it will 461 // remain in this map 462 classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 463 k -> new ConcurrentHashMap<>()); 464 } 465 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 466 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 467 ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices, 468 classData) 469 : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 470 } 471 472 @Override 473 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 474 throws InstantiationException, IllegalAccessException { 475 try { 476 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 477 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 478 } else { 479 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 480 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 481 return null; 482 } 483 } catch (NoSuchMethodException | InvocationTargetException e) { 484 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 485 } 486 } 487 488 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 489 RegionCoprocessor::getRegionObserver; 490 491 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 492 RegionCoprocessor::getEndpointObserver; 493 494 abstract class RegionObserverOperationWithoutResult 495 extends ObserverOperationWithoutResult<RegionObserver> { 496 public RegionObserverOperationWithoutResult() { 497 super(regionObserverGetter); 498 } 499 500 public RegionObserverOperationWithoutResult(User user) { 501 super(regionObserverGetter, user); 502 } 503 504 public RegionObserverOperationWithoutResult(ObserverRpcCallContext rpcCallContext) { 505 super(regionObserverGetter, rpcCallContext); 506 } 507 508 public RegionObserverOperationWithoutResult(boolean bypassable) { 509 super(regionObserverGetter, (ObserverRpcCallContext) null, bypassable); 510 } 511 512 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 513 super(regionObserverGetter, user, bypassable); 514 } 515 516 public RegionObserverOperationWithoutResult(ObserverRpcCallContext rpcCallContext, 517 boolean bypassable) { 518 super(regionObserverGetter, rpcCallContext, bypassable); 519 } 520 } 521 522 abstract class BulkLoadObserverOperation 523 extends ObserverOperationWithoutResult<BulkLoadObserver> { 524 public BulkLoadObserverOperation(User user) { 525 super(RegionCoprocessor::getBulkLoadObserver, user); 526 } 527 } 528 529 ////////////////////////////////////////////////////////////////////////////////////////////////// 530 // Observer operations 531 ////////////////////////////////////////////////////////////////////////////////////////////////// 532 533 ////////////////////////////////////////////////////////////////////////////////////////////////// 534 // Observer operations 535 ////////////////////////////////////////////////////////////////////////////////////////////////// 536 537 /** 538 * Invoked before a region open. 539 * @throws IOException Signals that an I/O exception has occurred. 540 */ 541 public void preOpen() throws IOException { 542 if (coprocEnvironments.isEmpty()) { 543 return; 544 } 545 execOperation(new RegionObserverOperationWithoutResult() { 546 @Override 547 public void call(RegionObserver observer) throws IOException { 548 observer.preOpen(this); 549 } 550 }); 551 } 552 553 /** 554 * Invoked after a region open 555 */ 556 public void postOpen() { 557 if (coprocEnvironments.isEmpty()) { 558 return; 559 } 560 try { 561 execOperation(new RegionObserverOperationWithoutResult() { 562 @Override 563 public void call(RegionObserver observer) throws IOException { 564 observer.postOpen(this); 565 } 566 }); 567 } catch (IOException e) { 568 LOG.warn(e.toString(), e); 569 } 570 } 571 572 /** 573 * Invoked before a region is closed 574 * @param abortRequested true if the server is aborting 575 */ 576 public void preClose(final boolean abortRequested) throws IOException { 577 execOperation(new RegionObserverOperationWithoutResult() { 578 @Override 579 public void call(RegionObserver observer) throws IOException { 580 observer.preClose(this, abortRequested); 581 } 582 }); 583 } 584 585 /** 586 * Invoked after a region is closed 587 * @param abortRequested true if the server is aborting 588 */ 589 public void postClose(final boolean abortRequested) { 590 try { 591 execOperation(new RegionObserverOperationWithoutResult() { 592 @Override 593 public void call(RegionObserver observer) throws IOException { 594 observer.postClose(this, abortRequested); 595 } 596 597 @Override 598 public void postEnvCall() { 599 shutdown(this.getEnvironment()); 600 } 601 }); 602 } catch (IOException e) { 603 LOG.warn(e.toString(), e); 604 } 605 } 606 607 /** 608 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 609 * available candidates. 610 * <p> 611 * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the 612 * passed in <code>candidates</code>. 613 * @param store The store where compaction is being requested 614 * @param candidates The currently available store files 615 * @param tracker used to track the life cycle of a compaction 616 * @param user the user 617 */ 618 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 619 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 620 if (coprocEnvironments.isEmpty()) { 621 return false; 622 } 623 boolean bypassable = true; 624 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 625 @Override 626 public void call(RegionObserver observer) throws IOException { 627 observer.preCompactSelection(this, store, candidates, tracker); 628 } 629 }); 630 } 631 632 /** 633 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 634 * candidates. 635 * @param store The store where compaction is being requested 636 * @param selected The store files selected to compact 637 * @param tracker used to track the life cycle of a compaction 638 * @param request the compaction request 639 * @param user the user 640 */ 641 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 642 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 643 throws IOException { 644 if (coprocEnvironments.isEmpty()) { 645 return; 646 } 647 execOperation(new RegionObserverOperationWithoutResult(user) { 648 @Override 649 public void call(RegionObserver observer) throws IOException { 650 observer.postCompactSelection(this, store, selected, tracker, request); 651 } 652 }); 653 } 654 655 /** 656 * Called prior to opening store scanner for compaction. 657 */ 658 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 659 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 660 if (coprocEnvironments.isEmpty()) { 661 return store.getScanInfo(); 662 } 663 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 664 execOperation(new RegionObserverOperationWithoutResult(user) { 665 @Override 666 public void call(RegionObserver observer) throws IOException { 667 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 668 } 669 }); 670 return builder.build(); 671 } 672 673 /** 674 * Called prior to rewriting the store files selected for compaction 675 * @param store the store being compacted 676 * @param scanner the scanner used to read store data during compaction 677 * @param scanType type of Scan 678 * @param tracker used to track the life cycle of a compaction 679 * @param request the compaction request 680 * @param user the user 681 * @return Scanner to use (cannot be null!) 682 */ 683 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 684 final ScanType scanType, final CompactionLifeCycleTracker tracker, 685 final CompactionRequest request, final User user) throws IOException { 686 InternalScanner defaultResult = scanner; 687 if (coprocEnvironments.isEmpty()) { 688 return defaultResult; 689 } 690 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 691 regionObserverGetter, defaultResult, user) { 692 @Override 693 public InternalScanner call(RegionObserver observer) throws IOException { 694 InternalScanner scanner = 695 observer.preCompact(this, store, getResult(), scanType, tracker, request); 696 if (scanner == null) { 697 throw new CoprocessorException("Null Scanner return disallowed!"); 698 } 699 return scanner; 700 } 701 }); 702 } 703 704 /** 705 * Called after the store compaction has completed. 706 * @param store the store being compacted 707 * @param resultFile the new store file written during compaction 708 * @param tracker used to track the life cycle of a compaction 709 * @param request the compaction request 710 * @param user the user 711 */ 712 public void postCompact(final HStore store, final HStoreFile resultFile, 713 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 714 throws IOException { 715 execOperation( 716 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) { 717 @Override 718 public void call(RegionObserver observer) throws IOException { 719 observer.postCompact(this, store, resultFile, tracker, request); 720 } 721 }); 722 } 723 724 /** 725 * Invoked before create StoreScanner for flush. 726 */ 727 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 728 throws IOException { 729 if (coprocEnvironments.isEmpty()) { 730 return store.getScanInfo(); 731 } 732 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 733 execOperation(new RegionObserverOperationWithoutResult() { 734 @Override 735 public void call(RegionObserver observer) throws IOException { 736 observer.preFlushScannerOpen(this, store, builder, tracker); 737 } 738 }); 739 return builder.build(); 740 } 741 742 /** 743 * Invoked before a memstore flush 744 * @return Scanner to use (cannot be null!) 745 */ 746 public InternalScanner preFlush(HStore store, InternalScanner scanner, 747 FlushLifeCycleTracker tracker) throws IOException { 748 if (coprocEnvironments.isEmpty()) { 749 return scanner; 750 } 751 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 752 regionObserverGetter, scanner) { 753 @Override 754 public InternalScanner call(RegionObserver observer) throws IOException { 755 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 756 if (scanner == null) { 757 throw new CoprocessorException("Null Scanner return disallowed!"); 758 } 759 return scanner; 760 } 761 }); 762 } 763 764 /** 765 * Invoked before a memstore flush 766 */ 767 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 768 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 769 @Override 770 public void call(RegionObserver observer) throws IOException { 771 observer.preFlush(this, tracker); 772 } 773 }); 774 } 775 776 /** 777 * Invoked after a memstore flush 778 */ 779 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 780 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 781 @Override 782 public void call(RegionObserver observer) throws IOException { 783 observer.postFlush(this, tracker); 784 } 785 }); 786 } 787 788 /** 789 * Invoked before in memory compaction. 790 */ 791 public void preMemStoreCompaction(HStore store) throws IOException { 792 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 793 @Override 794 public void call(RegionObserver observer) throws IOException { 795 observer.preMemStoreCompaction(this, store); 796 } 797 }); 798 } 799 800 /** 801 * Invoked before create StoreScanner for in memory compaction. 802 */ 803 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 804 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 805 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 806 @Override 807 public void call(RegionObserver observer) throws IOException { 808 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 809 } 810 }); 811 return builder.build(); 812 } 813 814 /** 815 * Invoked before compacting memstore. 816 */ 817 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 818 throws IOException { 819 if (coprocEnvironments.isEmpty()) { 820 return scanner; 821 } 822 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 823 regionObserverGetter, scanner) { 824 @Override 825 public InternalScanner call(RegionObserver observer) throws IOException { 826 return observer.preMemStoreCompactionCompact(this, store, getResult()); 827 } 828 }); 829 } 830 831 /** 832 * Invoked after in memory compaction. 833 */ 834 public void postMemStoreCompaction(HStore store) throws IOException { 835 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 836 @Override 837 public void call(RegionObserver observer) throws IOException { 838 observer.postMemStoreCompaction(this, store); 839 } 840 }); 841 } 842 843 /** 844 * Invoked after a memstore flush 845 */ 846 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 847 throws IOException { 848 if (coprocEnvironments.isEmpty()) { 849 return; 850 } 851 execOperation(new RegionObserverOperationWithoutResult() { 852 @Override 853 public void call(RegionObserver observer) throws IOException { 854 observer.postFlush(this, store, storeFile, tracker); 855 } 856 }); 857 } 858 859 // RegionObserver support 860 /** 861 * Supports Coprocessor 'bypass'. 862 * @param get the Get request 863 * @param results What to return if return is true/'bypass'. 864 * @return true if default processing should be bypassed. 865 * @exception IOException Exception 866 */ 867 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 868 if (coprocEnvironments.isEmpty()) { 869 return false; 870 } 871 boolean bypassable = true; 872 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 873 @Override 874 public void call(RegionObserver observer) throws IOException { 875 observer.preGetOp(this, get, results); 876 } 877 }); 878 } 879 880 /** 881 * @param get the Get request 882 * @param results the result set 883 * @exception IOException Exception 884 */ 885 public void postGet(final Get get, final List<Cell> results) throws IOException { 886 if (coprocEnvironments.isEmpty()) { 887 return; 888 } 889 execOperation(new RegionObserverOperationWithoutResult() { 890 @Override 891 public void call(RegionObserver observer) throws IOException { 892 observer.postGetOp(this, get, results); 893 } 894 }); 895 } 896 897 /** 898 * Supports Coprocessor 'bypass'. 899 * @param get the Get request 900 * @return true or false to return to client if bypassing normal operation, or null otherwise 901 * @exception IOException Exception 902 */ 903 public Boolean preExists(final Get get) throws IOException { 904 boolean bypassable = true; 905 boolean defaultResult = false; 906 if (coprocEnvironments.isEmpty()) { 907 return null; 908 } 909 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 910 regionObserverGetter, defaultResult, bypassable) { 911 @Override 912 public Boolean call(RegionObserver observer) throws IOException { 913 return observer.preExists(this, get, getResult()); 914 } 915 }); 916 } 917 918 /** 919 * @param get the Get request 920 * @param result the result returned by the region server 921 * @return the result to return to the client 922 * @exception IOException Exception 923 */ 924 public boolean postExists(final Get get, boolean result) throws IOException { 925 if (this.coprocEnvironments.isEmpty()) { 926 return result; 927 } 928 return execOperationWithResult( 929 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 930 @Override 931 public Boolean call(RegionObserver observer) throws IOException { 932 return observer.postExists(this, get, getResult()); 933 } 934 }); 935 } 936 937 /** 938 * Supports Coprocessor 'bypass'. 939 * @param put The Put object 940 * @param edit The WALEdit object. 941 * @return true if default processing should be bypassed 942 * @exception IOException Exception 943 */ 944 public boolean prePut(final Put put, final WALEdit edit) throws IOException { 945 if (coprocEnvironments.isEmpty()) { 946 return false; 947 } 948 boolean bypassable = true; 949 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 950 @Override 951 public void call(RegionObserver observer) throws IOException { 952 observer.prePut(this, put, edit); 953 } 954 }); 955 } 956 957 /** 958 * Supports Coprocessor 'bypass'. 959 * @param mutation - the current mutation 960 * @param kv - the current cell 961 * @param byteNow - current timestamp in bytes 962 * @param get - the get that could be used Note that the get only does not specify the family 963 * and qualifier that should be used 964 * @return true if default processing should be bypassed 965 */ 966 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, 967 final byte[] byteNow, final Get get) throws IOException { 968 if (coprocEnvironments.isEmpty()) { 969 return false; 970 } 971 boolean bypassable = true; 972 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 973 @Override 974 public void call(RegionObserver observer) throws IOException { 975 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 976 } 977 }); 978 } 979 980 /** 981 * @param put The Put object 982 * @param edit The WALEdit object. 983 * @exception IOException Exception 984 */ 985 public void postPut(final Put put, final WALEdit edit) throws IOException { 986 if (coprocEnvironments.isEmpty()) { 987 return; 988 } 989 execOperation(new RegionObserverOperationWithoutResult() { 990 @Override 991 public void call(RegionObserver observer) throws IOException { 992 observer.postPut(this, put, edit); 993 } 994 }); 995 } 996 997 /** 998 * Supports Coprocessor 'bypass'. 999 * @param delete The Delete object 1000 * @param edit The WALEdit object. 1001 * @return true if default processing should be bypassed 1002 * @exception IOException Exception 1003 */ 1004 public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException { 1005 if (this.coprocEnvironments.isEmpty()) { 1006 return false; 1007 } 1008 boolean bypassable = true; 1009 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 1010 @Override 1011 public void call(RegionObserver observer) throws IOException { 1012 observer.preDelete(this, delete, edit); 1013 } 1014 }); 1015 } 1016 1017 /** 1018 * @param delete The Delete object 1019 * @param edit The WALEdit object. 1020 * @exception IOException Exception 1021 */ 1022 public void postDelete(final Delete delete, final WALEdit edit) throws IOException { 1023 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1024 @Override 1025 public void call(RegionObserver observer) throws IOException { 1026 observer.postDelete(this, delete, edit); 1027 } 1028 }); 1029 } 1030 1031 public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 1032 throws IOException { 1033 if (this.coprocEnvironments.isEmpty()) { 1034 return; 1035 } 1036 execOperation(new RegionObserverOperationWithoutResult() { 1037 @Override 1038 public void call(RegionObserver observer) throws IOException { 1039 observer.preBatchMutate(this, miniBatchOp); 1040 } 1041 }); 1042 } 1043 1044 public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 1045 throws IOException { 1046 if (this.coprocEnvironments.isEmpty()) { 1047 return; 1048 } 1049 execOperation(new RegionObserverOperationWithoutResult() { 1050 @Override 1051 public void call(RegionObserver observer) throws IOException { 1052 observer.postBatchMutate(this, miniBatchOp); 1053 } 1054 }); 1055 } 1056 1057 public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp, 1058 final boolean success) throws IOException { 1059 if (this.coprocEnvironments.isEmpty()) { 1060 return; 1061 } 1062 execOperation(new RegionObserverOperationWithoutResult() { 1063 @Override 1064 public void call(RegionObserver observer) throws IOException { 1065 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1066 } 1067 }); 1068 } 1069 1070 /** 1071 * Supports Coprocessor 'bypass'. 1072 * @param checkAndMutate the CheckAndMutate object 1073 * @return true or false to return to client if default processing should be bypassed, or null 1074 * otherwise 1075 * @throws IOException if an error occurred on the coprocessor 1076 */ 1077 public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException { 1078 boolean bypassable = true; 1079 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1080 if (coprocEnvironments.isEmpty()) { 1081 return null; 1082 } 1083 return execOperationWithResult( 1084 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1085 defaultResult, bypassable) { 1086 @Override 1087 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1088 return observer.preCheckAndMutate(this, checkAndMutate, getResult()); 1089 } 1090 }); 1091 } 1092 1093 /** 1094 * Supports Coprocessor 'bypass'. 1095 * @param checkAndMutate the CheckAndMutate object 1096 * @return true or false to return to client if default processing should be bypassed, or null 1097 * otherwise 1098 * @throws IOException if an error occurred on the coprocessor 1099 */ 1100 public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) 1101 throws IOException { 1102 boolean bypassable = true; 1103 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1104 if (coprocEnvironments.isEmpty()) { 1105 return null; 1106 } 1107 return execOperationWithResult( 1108 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1109 defaultResult, bypassable) { 1110 @Override 1111 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1112 return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); 1113 } 1114 }); 1115 } 1116 1117 /** 1118 * @param checkAndMutate the CheckAndMutate object 1119 * @param result the result returned by the checkAndMutate 1120 * @return true or false to return to client if default processing should be bypassed, or null 1121 * otherwise 1122 * @throws IOException if an error occurred on the coprocessor 1123 */ 1124 public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, 1125 CheckAndMutateResult result) throws IOException { 1126 if (this.coprocEnvironments.isEmpty()) { 1127 return result; 1128 } 1129 return execOperationWithResult( 1130 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1131 result) { 1132 @Override 1133 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1134 return observer.postCheckAndMutate(this, checkAndMutate, getResult()); 1135 } 1136 }); 1137 } 1138 1139 /** 1140 * Supports Coprocessor 'bypass'. 1141 * @param append append object 1142 * @param edit The WALEdit object. 1143 * @return result to return to client if default operation should be bypassed, null otherwise 1144 * @throws IOException if an error occurred on the coprocessor 1145 */ 1146 public Result preAppend(final Append append, final WALEdit edit) throws IOException { 1147 boolean bypassable = true; 1148 Result defaultResult = null; 1149 if (this.coprocEnvironments.isEmpty()) { 1150 return defaultResult; 1151 } 1152 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1153 regionObserverGetter, defaultResult, bypassable) { 1154 @Override 1155 public Result call(RegionObserver observer) throws IOException { 1156 return observer.preAppend(this, append, edit); 1157 } 1158 }); 1159 } 1160 1161 /** 1162 * Supports Coprocessor 'bypass'. 1163 * @param append append object 1164 * @return result to return to client if default operation should be bypassed, null otherwise 1165 * @throws IOException if an error occurred on the coprocessor 1166 */ 1167 public Result preAppendAfterRowLock(final Append append) throws IOException { 1168 boolean bypassable = true; 1169 Result defaultResult = null; 1170 if (this.coprocEnvironments.isEmpty()) { 1171 return defaultResult; 1172 } 1173 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1174 regionObserverGetter, defaultResult, bypassable) { 1175 @Override 1176 public Result call(RegionObserver observer) throws IOException { 1177 return observer.preAppendAfterRowLock(this, append); 1178 } 1179 }); 1180 } 1181 1182 /** 1183 * Supports Coprocessor 'bypass'. 1184 * @param increment increment object 1185 * @param edit The WALEdit object. 1186 * @return result to return to client if default operation should be bypassed, null otherwise 1187 * @throws IOException if an error occurred on the coprocessor 1188 */ 1189 public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException { 1190 boolean bypassable = true; 1191 Result defaultResult = null; 1192 if (coprocEnvironments.isEmpty()) { 1193 return defaultResult; 1194 } 1195 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1196 regionObserverGetter, defaultResult, bypassable) { 1197 @Override 1198 public Result call(RegionObserver observer) throws IOException { 1199 return observer.preIncrement(this, increment, edit); 1200 } 1201 }); 1202 } 1203 1204 /** 1205 * Supports Coprocessor 'bypass'. 1206 * @param increment increment object 1207 * @return result to return to client if default operation should be bypassed, null otherwise 1208 * @throws IOException if an error occurred on the coprocessor 1209 */ 1210 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1211 boolean bypassable = true; 1212 Result defaultResult = null; 1213 if (coprocEnvironments.isEmpty()) { 1214 return defaultResult; 1215 } 1216 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1217 regionObserverGetter, defaultResult, bypassable) { 1218 @Override 1219 public Result call(RegionObserver observer) throws IOException { 1220 return observer.preIncrementAfterRowLock(this, increment); 1221 } 1222 }); 1223 } 1224 1225 /** 1226 * @param append Append object 1227 * @param result the result returned by the append 1228 * @param edit The WALEdit object. 1229 * @throws IOException if an error occurred on the coprocessor 1230 */ 1231 public Result postAppend(final Append append, final Result result, final WALEdit edit) 1232 throws IOException { 1233 if (this.coprocEnvironments.isEmpty()) { 1234 return result; 1235 } 1236 return execOperationWithResult( 1237 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1238 @Override 1239 public Result call(RegionObserver observer) throws IOException { 1240 return observer.postAppend(this, append, result, edit); 1241 } 1242 }); 1243 } 1244 1245 /** 1246 * @param increment increment object 1247 * @param result the result returned by postIncrement 1248 * @param edit The WALEdit object. 1249 * @throws IOException if an error occurred on the coprocessor 1250 */ 1251 public Result postIncrement(final Increment increment, Result result, final WALEdit edit) 1252 throws IOException { 1253 if (this.coprocEnvironments.isEmpty()) { 1254 return result; 1255 } 1256 return execOperationWithResult( 1257 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1258 @Override 1259 public Result call(RegionObserver observer) throws IOException { 1260 return observer.postIncrement(this, increment, getResult(), edit); 1261 } 1262 }); 1263 } 1264 1265 /** 1266 * @param scan the Scan specification 1267 * @exception IOException Exception 1268 */ 1269 public void preScannerOpen(final Scan scan) throws IOException { 1270 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1271 @Override 1272 public void call(RegionObserver observer) throws IOException { 1273 observer.preScannerOpen(this, scan); 1274 } 1275 }); 1276 } 1277 1278 /** 1279 * @param scan the Scan specification 1280 * @param s the scanner 1281 * @return the scanner instance to use 1282 * @exception IOException Exception 1283 */ 1284 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1285 if (this.coprocEnvironments.isEmpty()) { 1286 return s; 1287 } 1288 return execOperationWithResult( 1289 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1290 @Override 1291 public RegionScanner call(RegionObserver observer) throws IOException { 1292 return observer.postScannerOpen(this, scan, getResult()); 1293 } 1294 }); 1295 } 1296 1297 /** 1298 * @param s the scanner 1299 * @param results the result set returned by the region server 1300 * @param limit the maximum number of results to return 1301 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1302 * @exception IOException Exception 1303 */ 1304 public Boolean preScannerNext(final InternalScanner s, final List<Result> results, 1305 final int limit) throws IOException { 1306 boolean bypassable = true; 1307 boolean defaultResult = false; 1308 if (coprocEnvironments.isEmpty()) { 1309 return null; 1310 } 1311 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1312 regionObserverGetter, defaultResult, bypassable) { 1313 @Override 1314 public Boolean call(RegionObserver observer) throws IOException { 1315 return observer.preScannerNext(this, s, results, limit, getResult()); 1316 } 1317 }); 1318 } 1319 1320 /** 1321 * @param s the scanner 1322 * @param results the result set returned by the region server 1323 * @param limit the maximum number of results to return 1324 * @return 'has more' indication to give to client 1325 * @exception IOException Exception 1326 */ 1327 public boolean postScannerNext(final InternalScanner s, final List<Result> results, 1328 final int limit, boolean hasMore) throws IOException { 1329 if (this.coprocEnvironments.isEmpty()) { 1330 return hasMore; 1331 } 1332 return execOperationWithResult( 1333 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1334 @Override 1335 public Boolean call(RegionObserver observer) throws IOException { 1336 return observer.postScannerNext(this, s, results, limit, getResult()); 1337 } 1338 }); 1339 } 1340 1341 /** 1342 * This will be called by the scan flow when the current scanned row is being filtered out by the 1343 * filter. 1344 * @param s the scanner 1345 * @param curRowCell The cell in the current row which got filtered out 1346 * @return whether more rows are available for the scanner or not 1347 */ 1348 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1349 throws IOException { 1350 // short circuit for performance 1351 boolean defaultResult = true; 1352 if (!hasCustomPostScannerFilterRow) { 1353 return defaultResult; 1354 } 1355 if (this.coprocEnvironments.isEmpty()) { 1356 return defaultResult; 1357 } 1358 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1359 regionObserverGetter, defaultResult) { 1360 @Override 1361 public Boolean call(RegionObserver observer) throws IOException { 1362 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1363 } 1364 }); 1365 } 1366 1367 /** 1368 * Supports Coprocessor 'bypass'. 1369 * @param s the scanner 1370 * @return true if default behavior should be bypassed, false otherwise 1371 * @exception IOException Exception 1372 */ 1373 // Should this be bypassable? 1374 public boolean preScannerClose(final InternalScanner s) throws IOException { 1375 return execOperation( 1376 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1377 @Override 1378 public void call(RegionObserver observer) throws IOException { 1379 observer.preScannerClose(this, s); 1380 } 1381 }); 1382 } 1383 1384 /** 1385 * @exception IOException Exception 1386 */ 1387 public void postScannerClose(final InternalScanner s) throws IOException { 1388 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1389 @Override 1390 public void call(RegionObserver observer) throws IOException { 1391 observer.postScannerClose(this, s); 1392 } 1393 }); 1394 } 1395 1396 /** 1397 * Called before open store scanner for user scan. 1398 */ 1399 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1400 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1401 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1402 execOperation(new RegionObserverOperationWithoutResult() { 1403 @Override 1404 public void call(RegionObserver observer) throws IOException { 1405 observer.preStoreScannerOpen(this, store, builder); 1406 } 1407 }); 1408 return builder.build(); 1409 } 1410 1411 /** 1412 * @param info the RegionInfo for this region 1413 * @param edits the file of recovered edits 1414 */ 1415 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1416 execOperation( 1417 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1418 @Override 1419 public void call(RegionObserver observer) throws IOException { 1420 observer.preReplayWALs(this, info, edits); 1421 } 1422 }); 1423 } 1424 1425 /** 1426 * @param info the RegionInfo for this region 1427 * @param edits the file of recovered edits 1428 * @throws IOException Exception 1429 */ 1430 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1431 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1432 @Override 1433 public void call(RegionObserver observer) throws IOException { 1434 observer.postReplayWALs(this, info, edits); 1435 } 1436 }); 1437 } 1438 1439 /** 1440 * Supports Coprocessor 'bypass'. 1441 * @return true if default behavior should be bypassed, false otherwise 1442 */ 1443 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1444 throws IOException { 1445 return execOperation( 1446 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1447 @Override 1448 public void call(RegionObserver observer) throws IOException { 1449 observer.preWALRestore(this, info, logKey, logEdit); 1450 } 1451 }); 1452 } 1453 1454 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1455 throws IOException { 1456 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1457 @Override 1458 public void call(RegionObserver observer) throws IOException { 1459 observer.postWALRestore(this, info, logKey, logEdit); 1460 } 1461 }); 1462 } 1463 1464 /** 1465 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1466 */ 1467 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1468 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1469 @Override 1470 public void call(RegionObserver observer) throws IOException { 1471 observer.preBulkLoadHFile(this, familyPaths); 1472 } 1473 }); 1474 } 1475 1476 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1477 throws IOException { 1478 return execOperation( 1479 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1480 @Override 1481 public void call(RegionObserver observer) throws IOException { 1482 observer.preCommitStoreFile(this, family, pairs); 1483 } 1484 }); 1485 } 1486 1487 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) 1488 throws IOException { 1489 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1490 @Override 1491 public void call(RegionObserver observer) throws IOException { 1492 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1493 } 1494 }); 1495 } 1496 1497 /** 1498 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1499 * @param map Map of CF to List of file paths for the final loaded files 1500 */ 1501 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1502 Map<byte[], List<Path>> map) throws IOException { 1503 if (this.coprocEnvironments.isEmpty()) { 1504 return; 1505 } 1506 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1507 @Override 1508 public void call(RegionObserver observer) throws IOException { 1509 observer.postBulkLoadHFile(this, familyPaths, map); 1510 } 1511 }); 1512 } 1513 1514 public void postStartRegionOperation(final Operation op) throws IOException { 1515 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1516 @Override 1517 public void call(RegionObserver observer) throws IOException { 1518 observer.postStartRegionOperation(this, op); 1519 } 1520 }); 1521 } 1522 1523 public void postCloseRegionOperation(final Operation op) throws IOException { 1524 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1525 @Override 1526 public void call(RegionObserver observer) throws IOException { 1527 observer.postCloseRegionOperation(this, op); 1528 } 1529 }); 1530 } 1531 1532 /** 1533 * @param fs fileystem to read from 1534 * @param p path to the file 1535 * @param in {@link FSDataInputStreamWrapper} 1536 * @param size Full size of the file 1537 * @param r original reference file. This will be not null only when reading a split file. 1538 * @return a Reader instance to use instead of the base reader if overriding default behavior, 1539 * null otherwise 1540 */ 1541 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1542 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1543 final Reference r) throws IOException { 1544 if (coprocEnvironments.isEmpty()) { 1545 return null; 1546 } 1547 return execOperationWithResult( 1548 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1549 @Override 1550 public StoreFileReader call(RegionObserver observer) throws IOException { 1551 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1552 } 1553 }); 1554 } 1555 1556 /** 1557 * @param fs fileystem to read from 1558 * @param p path to the file 1559 * @param in {@link FSDataInputStreamWrapper} 1560 * @param size Full size of the file 1561 * @param r original reference file. This will be not null only when reading a split file. 1562 * @param reader the base reader instance 1563 * @return The reader to use 1564 */ 1565 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1566 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1567 final Reference r, final StoreFileReader reader) throws IOException { 1568 if (this.coprocEnvironments.isEmpty()) { 1569 return reader; 1570 } 1571 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>( 1572 regionObserverGetter, reader) { 1573 @Override 1574 public StoreFileReader call(RegionObserver observer) throws IOException { 1575 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1576 } 1577 }); 1578 } 1579 1580 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1581 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1582 if (this.coprocEnvironments.isEmpty()) { 1583 return cellPairs; 1584 } 1585 return execOperationWithResult( 1586 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1587 cellPairs) { 1588 @Override 1589 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1590 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1591 } 1592 }); 1593 } 1594 1595 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1596 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1597 if (this.coprocEnvironments.isEmpty()) { 1598 return cellPairs; 1599 } 1600 return execOperationWithResult( 1601 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1602 cellPairs) { 1603 @Override 1604 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1605 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1606 } 1607 }); 1608 } 1609 1610 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1611 if (this.coprocEnvironments.isEmpty()) { 1612 return; 1613 } 1614 execOperation(new RegionObserverOperationWithoutResult() { 1615 @Override 1616 public void call(RegionObserver observer) throws IOException { 1617 observer.preWALAppend(this, key, edit); 1618 } 1619 }); 1620 } 1621 1622 public Message preEndpointInvocation(final Service service, final String methodName, 1623 Message request) throws IOException { 1624 if (coprocEnvironments.isEmpty()) { 1625 return request; 1626 } 1627 return execOperationWithResult( 1628 new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) { 1629 @Override 1630 public Message call(EndpointObserver observer) throws IOException { 1631 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1632 } 1633 }); 1634 } 1635 1636 public void postEndpointInvocation(final Service service, final String methodName, 1637 final Message request, final Message.Builder responseBuilder) throws IOException { 1638 execOperation(coprocEnvironments.isEmpty() 1639 ? null 1640 : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1641 @Override 1642 public void call(EndpointObserver observer) throws IOException { 1643 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1644 } 1645 }); 1646 } 1647 1648 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1649 if (this.coprocEnvironments.isEmpty()) { 1650 return result; 1651 } 1652 return execOperationWithResult( 1653 new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) { 1654 @Override 1655 public DeleteTracker call(RegionObserver observer) throws IOException { 1656 return observer.postInstantiateDeleteTracker(this, getResult()); 1657 } 1658 }); 1659 } 1660 1661 ///////////////////////////////////////////////////////////////////////////////////////////////// 1662 // BulkLoadObserver hooks 1663 ///////////////////////////////////////////////////////////////////////////////////////////////// 1664 public void prePrepareBulkLoad(User user) throws IOException { 1665 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1666 @Override 1667 protected void call(BulkLoadObserver observer) throws IOException { 1668 observer.prePrepareBulkLoad(this); 1669 } 1670 }); 1671 } 1672 1673 public void preCleanupBulkLoad(User user) throws IOException { 1674 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1675 @Override 1676 protected void call(BulkLoadObserver observer) throws IOException { 1677 observer.preCleanupBulkLoad(this); 1678 } 1679 }); 1680 } 1681}