001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.UUID; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.RawCellBuilder; 036import org.apache.hadoop.hbase.RawCellBuilderFactory; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.CheckAndMutate; 040import org.apache.hadoop.hbase.client.CheckAndMutateResult; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.SharedConnection; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 057import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.ObserverContext; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 063import org.apache.hadoop.hbase.coprocessor.RegionObserver; 064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 065import org.apache.hadoop.hbase.io.Reference; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.metrics.MetricRegistry; 068import org.apache.hadoop.hbase.quotas.OperationQuota; 069import org.apache.hadoop.hbase.quotas.RpcQuotaManager; 070import org.apache.hadoop.hbase.quotas.RpcThrottlingException; 071import org.apache.hadoop.hbase.regionserver.Region.Operation; 072import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 073import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 074import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 075import org.apache.hadoop.hbase.security.User; 076import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.wal.WALEdit; 079import org.apache.hadoop.hbase.wal.WALKey; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.protobuf.Message; 085import org.apache.hbase.thirdparty.com.google.protobuf.Service; 086import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 087import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 088 089import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 091 092/** 093 * Implements the coprocessor environment and runtime support for coprocessors loaded within a 094 * {@link Region}. 095 */ 096@InterfaceAudience.Private 097public class RegionCoprocessorHost 098 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 099 100 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 101 // The shared data map 102 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 103 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 104 AbstractReferenceMap.ReferenceStrength.WEAK); 105 106 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 107 private final boolean hasCustomPostScannerFilterRow; 108 109 /* 110 * Whether any configured CPs override postScannerFilterRow hook 111 */ 112 public boolean hasCustomPostScannerFilterRow() { 113 return hasCustomPostScannerFilterRow; 114 } 115 116 /** 117 * Encapsulation of the environment of each coprocessor 118 */ 119 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 120 implements RegionCoprocessorEnvironment { 121 private Region region; 122 ConcurrentMap<String, Object> sharedData; 123 private final MetricRegistry metricRegistry; 124 private final RegionServerServices services; 125 private final RpcQuotaManager rpcQuotaManager; 126 127 /** 128 * Constructor 129 * @param impl the coprocessor instance 130 * @param priority chaining priority 131 */ 132 public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq, 133 final Configuration conf, final Region region, final RegionServerServices services, 134 final ConcurrentMap<String, Object> sharedData) { 135 super(impl, priority, seq, conf); 136 this.region = region; 137 this.sharedData = sharedData; 138 this.services = services; 139 this.metricRegistry = 140 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 141 // Some unit tests reach this line with services == null, and are okay with rpcQuotaManager 142 // being null. Let these unit tests succeed. This should not happen in real usage. 143 if (services != null) { 144 this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); 145 } else { 146 this.rpcQuotaManager = null; 147 } 148 } 149 150 /** Returns the region */ 151 @Override 152 public Region getRegion() { 153 return region; 154 } 155 156 @Override 157 public OnlineRegions getOnlineRegions() { 158 return this.services; 159 } 160 161 @Override 162 public Connection getConnection() { 163 // Mocks may have services as null at test time. 164 return services != null ? new SharedConnection(services.getConnection()) : null; 165 } 166 167 @Override 168 public Connection createConnection(Configuration conf) throws IOException { 169 return services != null ? this.services.createConnection(conf) : null; 170 } 171 172 @Override 173 public ServerName getServerName() { 174 return services != null ? services.getServerName() : null; 175 } 176 177 @Override 178 public void shutdown() { 179 super.shutdown(); 180 MetricsCoprocessor.removeRegistry(this.metricRegistry); 181 } 182 183 @Override 184 public ConcurrentMap<String, Object> getSharedData() { 185 return sharedData; 186 } 187 188 @Override 189 public RegionInfo getRegionInfo() { 190 return region.getRegionInfo(); 191 } 192 193 @Override 194 public MetricRegistry getMetricRegistryForRegionServer() { 195 return metricRegistry; 196 } 197 198 @Override 199 public RawCellBuilder getCellBuilder() { 200 // We always do a DEEP_COPY only 201 return RawCellBuilderFactory.create(); 202 } 203 204 @Override 205 public RpcQuotaManager getRpcQuotaManager() { 206 return rpcQuotaManager; 207 } 208 209 @Override 210 public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned, 211 long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException { 212 ClientProtos.ScanRequest scanRequest = RequestConverter 213 .buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false); 214 long maxScannerResultSize = 215 services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 216 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); 217 return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize, 218 maxBlockBytesScanned, prevBlockBytesScannedDifference); 219 } 220 221 @Override 222 public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type) 223 throws IOException, RpcThrottlingException { 224 return rpcQuotaManager.checkBatchQuota(region, type); 225 } 226 227 @Override 228 public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) 229 throws IOException, RpcThrottlingException { 230 return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads); 231 } 232 } 233 234 /** 235 * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors 236 * only. Temporary hack until Core Coprocessors are integrated into Core. 237 */ 238 private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment 239 implements HasRegionServerServices { 240 private final RegionServerServices rsServices; 241 242 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 243 final int seq, final Configuration conf, final Region region, 244 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 245 super(impl, priority, seq, conf, region, services, sharedData); 246 this.rsServices = services; 247 } 248 249 /** 250 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 251 * consumption. 252 */ 253 @Override 254 public RegionServerServices getRegionServerServices() { 255 return this.rsServices; 256 } 257 } 258 259 static class TableCoprocessorAttribute { 260 private Path path; 261 private String className; 262 private int priority; 263 private Configuration conf; 264 265 public TableCoprocessorAttribute(Path path, String className, int priority, 266 Configuration conf) { 267 this.path = path; 268 this.className = className; 269 this.priority = priority; 270 this.conf = conf; 271 } 272 273 public Path getPath() { 274 return path; 275 } 276 277 public String getClassName() { 278 return className; 279 } 280 281 public int getPriority() { 282 return priority; 283 } 284 285 public Configuration getConf() { 286 return conf; 287 } 288 } 289 290 /** The region server services */ 291 RegionServerServices rsServices; 292 /** The region */ 293 HRegion region; 294 295 /** 296 * Constructor 297 * @param region the region 298 * @param rsServices interface to available region server functionality 299 * @param conf the configuration 300 */ 301 @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization 302 public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices, 303 final Configuration conf) { 304 super(rsServices); 305 this.conf = conf; 306 this.rsServices = rsServices; 307 this.region = region; 308 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 309 310 // load system default cp's from configuration. 311 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 312 313 // load system default cp's for user tables from configuration. 314 if (!region.getRegionInfo().getTable().isSystemTable()) { 315 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 316 } 317 318 // load Coprocessor From HDFS 319 loadTableCoprocessors(conf); 320 321 // now check whether any coprocessor implements postScannerFilterRow 322 boolean hasCustomPostScannerFilterRow = false; 323 out: for (RegionCoprocessorEnvironment env : coprocEnvironments) { 324 if (env.getInstance() instanceof RegionObserver) { 325 Class<?> clazz = env.getInstance().getClass(); 326 for (;;) { 327 if (clazz == Object.class) { 328 // we dont need to look postScannerFilterRow into Object class 329 break; // break the inner loop 330 } 331 try { 332 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 333 InternalScanner.class, Cell.class, boolean.class); 334 // this coprocessor has a custom version of postScannerFilterRow 335 hasCustomPostScannerFilterRow = true; 336 break out; 337 } catch (NoSuchMethodException ignore) { 338 } 339 // the deprecated signature still exists 340 try { 341 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 342 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 343 // this coprocessor has a custom version of postScannerFilterRow 344 hasCustomPostScannerFilterRow = true; 345 break out; 346 } catch (NoSuchMethodException ignore) { 347 } 348 clazz = clazz.getSuperclass(); 349 } 350 } 351 } 352 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 353 } 354 355 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 356 TableDescriptor htd) { 357 return htd.getCoprocessorDescriptors().stream().map(cp -> { 358 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 359 Configuration ourConf; 360 if (!cp.getProperties().isEmpty()) { 361 // do an explicit deep copy of the passed configuration 362 ourConf = new Configuration(false); 363 HBaseConfiguration.merge(ourConf, conf); 364 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 365 } else { 366 ourConf = conf; 367 } 368 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 369 }).collect(Collectors.toList()); 370 } 371 372 /** 373 * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception 374 * if there is a problem. 375 */ 376 public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd) 377 throws IOException { 378 String pathPrefix = UUID.randomUUID().toString(); 379 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) { 380 if (attr.getPriority() < 0) { 381 throw new IOException( 382 "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0"); 383 } 384 ClassLoader old = Thread.currentThread().getContextClassLoader(); 385 try { 386 ClassLoader cl; 387 if (attr.getPath() != null) { 388 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 389 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 390 } else { 391 cl = CoprocessorHost.class.getClassLoader(); 392 } 393 Thread.currentThread().setContextClassLoader(cl); 394 if (cl instanceof CoprocessorClassLoader) { 395 String[] includedClassPrefixes = null; 396 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 397 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 398 includedClassPrefixes = prefixes.split(";"); 399 } 400 ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes); 401 } else { 402 cl.loadClass(attr.getClassName()); 403 } 404 } catch (ClassNotFoundException e) { 405 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 406 } finally { 407 Thread.currentThread().setContextClassLoader(old); 408 } 409 } 410 } 411 412 void loadTableCoprocessors(final Configuration conf) { 413 boolean coprocessorsEnabled = 414 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 415 boolean tableCoprocessorsEnabled = 416 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 417 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 418 return; 419 } 420 421 // scan the table attributes for coprocessor load specifications 422 // initialize the coprocessors 423 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 424 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, 425 region.getTableDescriptor())) { 426 // Load encompasses classloading and coprocessor initialization 427 try { 428 RegionCoprocessorEnvironment env = 429 load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf()); 430 if (env == null) { 431 continue; 432 } 433 configured.add(env); 434 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " 435 + region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 436 } catch (Throwable t) { 437 // Coprocessor failed to load, do we abort on error? 438 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 439 abortServer(attr.getClassName(), t); 440 } else { 441 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 442 } 443 } 444 } 445 // add together to coprocessor set for COW efficiency 446 coprocEnvironments.addAll(configured); 447 } 448 449 @Override 450 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 451 Configuration conf) { 452 // If coprocessor exposes any services, register them. 453 for (Service service : instance.getServices()) { 454 region.registerService(service); 455 } 456 ConcurrentMap<String, Object> classData; 457 // make sure only one thread can add maps 458 synchronized (SHARED_DATA_MAP) { 459 // as long as at least one RegionEnvironment holds on to its classData it will 460 // remain in this map 461 classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 462 k -> new ConcurrentHashMap<>()); 463 } 464 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 465 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 466 ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices, 467 classData) 468 : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 469 } 470 471 @Override 472 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 473 throws InstantiationException, IllegalAccessException { 474 try { 475 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 476 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 477 } else { 478 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 479 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 480 return null; 481 } 482 } catch (NoSuchMethodException | InvocationTargetException e) { 483 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 484 } 485 } 486 487 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 488 RegionCoprocessor::getRegionObserver; 489 490 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 491 RegionCoprocessor::getEndpointObserver; 492 493 abstract class RegionObserverOperationWithoutResult 494 extends ObserverOperationWithoutResult<RegionObserver> { 495 public RegionObserverOperationWithoutResult() { 496 super(regionObserverGetter); 497 } 498 499 public RegionObserverOperationWithoutResult(User user) { 500 super(regionObserverGetter, user); 501 } 502 503 public RegionObserverOperationWithoutResult(boolean bypassable) { 504 super(regionObserverGetter, null, bypassable); 505 } 506 507 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 508 super(regionObserverGetter, user, bypassable); 509 } 510 } 511 512 abstract class BulkLoadObserverOperation 513 extends ObserverOperationWithoutResult<BulkLoadObserver> { 514 public BulkLoadObserverOperation(User user) { 515 super(RegionCoprocessor::getBulkLoadObserver, user); 516 } 517 } 518 519 ////////////////////////////////////////////////////////////////////////////////////////////////// 520 // Observer operations 521 ////////////////////////////////////////////////////////////////////////////////////////////////// 522 523 ////////////////////////////////////////////////////////////////////////////////////////////////// 524 // Observer operations 525 ////////////////////////////////////////////////////////////////////////////////////////////////// 526 527 /** 528 * Invoked before a region open. 529 * @throws IOException Signals that an I/O exception has occurred. 530 */ 531 public void preOpen() throws IOException { 532 if (coprocEnvironments.isEmpty()) { 533 return; 534 } 535 execOperation(new RegionObserverOperationWithoutResult() { 536 @Override 537 public void call(RegionObserver observer) throws IOException { 538 observer.preOpen(this); 539 } 540 }); 541 } 542 543 /** 544 * Invoked after a region open 545 */ 546 public void postOpen() { 547 if (coprocEnvironments.isEmpty()) { 548 return; 549 } 550 try { 551 execOperation(new RegionObserverOperationWithoutResult() { 552 @Override 553 public void call(RegionObserver observer) throws IOException { 554 observer.postOpen(this); 555 } 556 }); 557 } catch (IOException e) { 558 LOG.warn(e.toString(), e); 559 } 560 } 561 562 /** 563 * Invoked before a region is closed 564 * @param abortRequested true if the server is aborting 565 */ 566 public void preClose(final boolean abortRequested) throws IOException { 567 execOperation(new RegionObserverOperationWithoutResult() { 568 @Override 569 public void call(RegionObserver observer) throws IOException { 570 observer.preClose(this, abortRequested); 571 } 572 }); 573 } 574 575 /** 576 * Invoked after a region is closed 577 * @param abortRequested true if the server is aborting 578 */ 579 public void postClose(final boolean abortRequested) { 580 try { 581 execOperation(new RegionObserverOperationWithoutResult() { 582 @Override 583 public void call(RegionObserver observer) throws IOException { 584 observer.postClose(this, abortRequested); 585 } 586 587 @Override 588 public void postEnvCall() { 589 shutdown(this.getEnvironment()); 590 } 591 }); 592 } catch (IOException e) { 593 LOG.warn(e.toString(), e); 594 } 595 } 596 597 /** 598 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 599 * available candidates. 600 * <p> 601 * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the 602 * passed in <code>candidates</code>. 603 * @param store The store where compaction is being requested 604 * @param candidates The currently available store files 605 * @param tracker used to track the life cycle of a compaction 606 * @param user the user 607 */ 608 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 609 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 610 if (coprocEnvironments.isEmpty()) { 611 return false; 612 } 613 boolean bypassable = true; 614 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 615 @Override 616 public void call(RegionObserver observer) throws IOException { 617 observer.preCompactSelection(this, store, candidates, tracker); 618 } 619 }); 620 } 621 622 /** 623 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 624 * candidates. 625 * @param store The store where compaction is being requested 626 * @param selected The store files selected to compact 627 * @param tracker used to track the life cycle of a compaction 628 * @param request the compaction request 629 * @param user the user 630 */ 631 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 632 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 633 throws IOException { 634 if (coprocEnvironments.isEmpty()) { 635 return; 636 } 637 execOperation(new RegionObserverOperationWithoutResult(user) { 638 @Override 639 public void call(RegionObserver observer) throws IOException { 640 observer.postCompactSelection(this, store, selected, tracker, request); 641 } 642 }); 643 } 644 645 /** 646 * Called prior to opening store scanner for compaction. 647 */ 648 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 649 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 650 if (coprocEnvironments.isEmpty()) { 651 return store.getScanInfo(); 652 } 653 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 654 execOperation(new RegionObserverOperationWithoutResult(user) { 655 @Override 656 public void call(RegionObserver observer) throws IOException { 657 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 658 } 659 }); 660 return builder.build(); 661 } 662 663 /** 664 * Called prior to rewriting the store files selected for compaction 665 * @param store the store being compacted 666 * @param scanner the scanner used to read store data during compaction 667 * @param scanType type of Scan 668 * @param tracker used to track the life cycle of a compaction 669 * @param request the compaction request 670 * @param user the user 671 * @return Scanner to use (cannot be null!) 672 */ 673 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 674 final ScanType scanType, final CompactionLifeCycleTracker tracker, 675 final CompactionRequest request, final User user) throws IOException { 676 InternalScanner defaultResult = scanner; 677 if (coprocEnvironments.isEmpty()) { 678 return defaultResult; 679 } 680 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 681 regionObserverGetter, defaultResult, user) { 682 @Override 683 public InternalScanner call(RegionObserver observer) throws IOException { 684 InternalScanner scanner = 685 observer.preCompact(this, store, getResult(), scanType, tracker, request); 686 if (scanner == null) { 687 throw new CoprocessorException("Null Scanner return disallowed!"); 688 } 689 return scanner; 690 } 691 }); 692 } 693 694 /** 695 * Called after the store compaction has completed. 696 * @param store the store being compacted 697 * @param resultFile the new store file written during compaction 698 * @param tracker used to track the life cycle of a compaction 699 * @param request the compaction request 700 * @param user the user 701 */ 702 public void postCompact(final HStore store, final HStoreFile resultFile, 703 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 704 throws IOException { 705 execOperation( 706 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) { 707 @Override 708 public void call(RegionObserver observer) throws IOException { 709 observer.postCompact(this, store, resultFile, tracker, request); 710 } 711 }); 712 } 713 714 /** 715 * Invoked before create StoreScanner for flush. 716 */ 717 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 718 throws IOException { 719 if (coprocEnvironments.isEmpty()) { 720 return store.getScanInfo(); 721 } 722 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 723 execOperation(new RegionObserverOperationWithoutResult() { 724 @Override 725 public void call(RegionObserver observer) throws IOException { 726 observer.preFlushScannerOpen(this, store, builder, tracker); 727 } 728 }); 729 return builder.build(); 730 } 731 732 /** 733 * Invoked before a memstore flush 734 * @return Scanner to use (cannot be null!) 735 */ 736 public InternalScanner preFlush(HStore store, InternalScanner scanner, 737 FlushLifeCycleTracker tracker) throws IOException { 738 if (coprocEnvironments.isEmpty()) { 739 return scanner; 740 } 741 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 742 regionObserverGetter, scanner) { 743 @Override 744 public InternalScanner call(RegionObserver observer) throws IOException { 745 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 746 if (scanner == null) { 747 throw new CoprocessorException("Null Scanner return disallowed!"); 748 } 749 return scanner; 750 } 751 }); 752 } 753 754 /** 755 * Invoked before a memstore flush 756 */ 757 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 758 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 759 @Override 760 public void call(RegionObserver observer) throws IOException { 761 observer.preFlush(this, tracker); 762 } 763 }); 764 } 765 766 /** 767 * Invoked after a memstore flush 768 */ 769 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 770 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 771 @Override 772 public void call(RegionObserver observer) throws IOException { 773 observer.postFlush(this, tracker); 774 } 775 }); 776 } 777 778 /** 779 * Invoked before in memory compaction. 780 */ 781 public void preMemStoreCompaction(HStore store) throws IOException { 782 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 783 @Override 784 public void call(RegionObserver observer) throws IOException { 785 observer.preMemStoreCompaction(this, store); 786 } 787 }); 788 } 789 790 /** 791 * Invoked before create StoreScanner for in memory compaction. 792 */ 793 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 794 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 795 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 796 @Override 797 public void call(RegionObserver observer) throws IOException { 798 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 799 } 800 }); 801 return builder.build(); 802 } 803 804 /** 805 * Invoked before compacting memstore. 806 */ 807 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 808 throws IOException { 809 if (coprocEnvironments.isEmpty()) { 810 return scanner; 811 } 812 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 813 regionObserverGetter, scanner) { 814 @Override 815 public InternalScanner call(RegionObserver observer) throws IOException { 816 return observer.preMemStoreCompactionCompact(this, store, getResult()); 817 } 818 }); 819 } 820 821 /** 822 * Invoked after in memory compaction. 823 */ 824 public void postMemStoreCompaction(HStore store) throws IOException { 825 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 826 @Override 827 public void call(RegionObserver observer) throws IOException { 828 observer.postMemStoreCompaction(this, store); 829 } 830 }); 831 } 832 833 /** 834 * Invoked after a memstore flush 835 */ 836 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 837 throws IOException { 838 if (coprocEnvironments.isEmpty()) { 839 return; 840 } 841 execOperation(new RegionObserverOperationWithoutResult() { 842 @Override 843 public void call(RegionObserver observer) throws IOException { 844 observer.postFlush(this, store, storeFile, tracker); 845 } 846 }); 847 } 848 849 // RegionObserver support 850 /** 851 * Supports Coprocessor 'bypass'. 852 * @param get the Get request 853 * @param results What to return if return is true/'bypass'. 854 * @return true if default processing should be bypassed. 855 * @exception IOException Exception 856 */ 857 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 858 if (coprocEnvironments.isEmpty()) { 859 return false; 860 } 861 boolean bypassable = true; 862 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 863 @Override 864 public void call(RegionObserver observer) throws IOException { 865 observer.preGetOp(this, get, results); 866 } 867 }); 868 } 869 870 /** 871 * @param get the Get request 872 * @param results the result set 873 * @exception IOException Exception 874 */ 875 public void postGet(final Get get, final List<Cell> results) throws IOException { 876 if (coprocEnvironments.isEmpty()) { 877 return; 878 } 879 execOperation(new RegionObserverOperationWithoutResult() { 880 @Override 881 public void call(RegionObserver observer) throws IOException { 882 observer.postGetOp(this, get, results); 883 } 884 }); 885 } 886 887 /** 888 * Supports Coprocessor 'bypass'. 889 * @param get the Get request 890 * @return true or false to return to client if bypassing normal operation, or null otherwise 891 * @exception IOException Exception 892 */ 893 public Boolean preExists(final Get get) throws IOException { 894 boolean bypassable = true; 895 boolean defaultResult = false; 896 if (coprocEnvironments.isEmpty()) { 897 return null; 898 } 899 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 900 regionObserverGetter, defaultResult, bypassable) { 901 @Override 902 public Boolean call(RegionObserver observer) throws IOException { 903 return observer.preExists(this, get, getResult()); 904 } 905 }); 906 } 907 908 /** 909 * @param get the Get request 910 * @param result the result returned by the region server 911 * @return the result to return to the client 912 * @exception IOException Exception 913 */ 914 public boolean postExists(final Get get, boolean result) throws IOException { 915 if (this.coprocEnvironments.isEmpty()) { 916 return result; 917 } 918 return execOperationWithResult( 919 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 920 @Override 921 public Boolean call(RegionObserver observer) throws IOException { 922 return observer.postExists(this, get, getResult()); 923 } 924 }); 925 } 926 927 /** 928 * Supports Coprocessor 'bypass'. 929 * @param put The Put object 930 * @param edit The WALEdit object. 931 * @return true if default processing should be bypassed 932 * @exception IOException Exception 933 */ 934 public boolean prePut(final Put put, final WALEdit edit) throws IOException { 935 if (coprocEnvironments.isEmpty()) { 936 return false; 937 } 938 boolean bypassable = true; 939 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 940 @Override 941 public void call(RegionObserver observer) throws IOException { 942 observer.prePut(this, put, edit); 943 } 944 }); 945 } 946 947 /** 948 * Supports Coprocessor 'bypass'. 949 * @param mutation - the current mutation 950 * @param kv - the current cell 951 * @param byteNow - current timestamp in bytes 952 * @param get - the get that could be used Note that the get only does not specify the family 953 * and qualifier that should be used 954 * @return true if default processing should be bypassed 955 */ 956 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, 957 final byte[] byteNow, final Get get) throws IOException { 958 if (coprocEnvironments.isEmpty()) { 959 return false; 960 } 961 boolean bypassable = true; 962 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 963 @Override 964 public void call(RegionObserver observer) throws IOException { 965 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 966 } 967 }); 968 } 969 970 /** 971 * @param put The Put object 972 * @param edit The WALEdit object. 973 * @exception IOException Exception 974 */ 975 public void postPut(final Put put, final WALEdit edit) throws IOException { 976 if (coprocEnvironments.isEmpty()) { 977 return; 978 } 979 execOperation(new RegionObserverOperationWithoutResult() { 980 @Override 981 public void call(RegionObserver observer) throws IOException { 982 observer.postPut(this, put, edit); 983 } 984 }); 985 } 986 987 /** 988 * Supports Coprocessor 'bypass'. 989 * @param delete The Delete object 990 * @param edit The WALEdit object. 991 * @return true if default processing should be bypassed 992 * @exception IOException Exception 993 */ 994 public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException { 995 if (this.coprocEnvironments.isEmpty()) { 996 return false; 997 } 998 boolean bypassable = true; 999 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 1000 @Override 1001 public void call(RegionObserver observer) throws IOException { 1002 observer.preDelete(this, delete, edit); 1003 } 1004 }); 1005 } 1006 1007 /** 1008 * @param delete The Delete object 1009 * @param edit The WALEdit object. 1010 * @exception IOException Exception 1011 */ 1012 public void postDelete(final Delete delete, final WALEdit edit) throws IOException { 1013 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1014 @Override 1015 public void call(RegionObserver observer) throws IOException { 1016 observer.postDelete(this, delete, edit); 1017 } 1018 }); 1019 } 1020 1021 public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 1022 throws IOException { 1023 if (this.coprocEnvironments.isEmpty()) { 1024 return; 1025 } 1026 execOperation(new RegionObserverOperationWithoutResult() { 1027 @Override 1028 public void call(RegionObserver observer) throws IOException { 1029 observer.preBatchMutate(this, miniBatchOp); 1030 } 1031 }); 1032 } 1033 1034 public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 1035 throws IOException { 1036 if (this.coprocEnvironments.isEmpty()) { 1037 return; 1038 } 1039 execOperation(new RegionObserverOperationWithoutResult() { 1040 @Override 1041 public void call(RegionObserver observer) throws IOException { 1042 observer.postBatchMutate(this, miniBatchOp); 1043 } 1044 }); 1045 } 1046 1047 public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp, 1048 final boolean success) throws IOException { 1049 if (this.coprocEnvironments.isEmpty()) { 1050 return; 1051 } 1052 execOperation(new RegionObserverOperationWithoutResult() { 1053 @Override 1054 public void call(RegionObserver observer) throws IOException { 1055 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1056 } 1057 }); 1058 } 1059 1060 /** 1061 * Supports Coprocessor 'bypass'. 1062 * @param checkAndMutate the CheckAndMutate object 1063 * @return true or false to return to client if default processing should be bypassed, or null 1064 * otherwise 1065 * @throws IOException if an error occurred on the coprocessor 1066 */ 1067 public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException { 1068 boolean bypassable = true; 1069 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1070 if (coprocEnvironments.isEmpty()) { 1071 return null; 1072 } 1073 return execOperationWithResult( 1074 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1075 defaultResult, bypassable) { 1076 @Override 1077 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1078 return observer.preCheckAndMutate(this, checkAndMutate, getResult()); 1079 } 1080 }); 1081 } 1082 1083 /** 1084 * Supports Coprocessor 'bypass'. 1085 * @param checkAndMutate the CheckAndMutate object 1086 * @return true or false to return to client if default processing should be bypassed, or null 1087 * otherwise 1088 * @throws IOException if an error occurred on the coprocessor 1089 */ 1090 public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) 1091 throws IOException { 1092 boolean bypassable = true; 1093 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1094 if (coprocEnvironments.isEmpty()) { 1095 return null; 1096 } 1097 return execOperationWithResult( 1098 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1099 defaultResult, bypassable) { 1100 @Override 1101 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1102 return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); 1103 } 1104 }); 1105 } 1106 1107 /** 1108 * @param checkAndMutate the CheckAndMutate object 1109 * @param result the result returned by the checkAndMutate 1110 * @return true or false to return to client if default processing should be bypassed, or null 1111 * otherwise 1112 * @throws IOException if an error occurred on the coprocessor 1113 */ 1114 public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, 1115 CheckAndMutateResult result) throws IOException { 1116 if (this.coprocEnvironments.isEmpty()) { 1117 return result; 1118 } 1119 return execOperationWithResult( 1120 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1121 result) { 1122 @Override 1123 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1124 return observer.postCheckAndMutate(this, checkAndMutate, getResult()); 1125 } 1126 }); 1127 } 1128 1129 /** 1130 * Supports Coprocessor 'bypass'. 1131 * @param append append object 1132 * @param edit The WALEdit object. 1133 * @return result to return to client if default operation should be bypassed, null otherwise 1134 * @throws IOException if an error occurred on the coprocessor 1135 */ 1136 public Result preAppend(final Append append, final WALEdit edit) throws IOException { 1137 boolean bypassable = true; 1138 Result defaultResult = null; 1139 if (this.coprocEnvironments.isEmpty()) { 1140 return defaultResult; 1141 } 1142 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1143 regionObserverGetter, defaultResult, bypassable) { 1144 @Override 1145 public Result call(RegionObserver observer) throws IOException { 1146 return observer.preAppend(this, append, edit); 1147 } 1148 }); 1149 } 1150 1151 /** 1152 * Supports Coprocessor 'bypass'. 1153 * @param append append object 1154 * @return result to return to client if default operation should be bypassed, null otherwise 1155 * @throws IOException if an error occurred on the coprocessor 1156 */ 1157 public Result preAppendAfterRowLock(final Append append) throws IOException { 1158 boolean bypassable = true; 1159 Result defaultResult = null; 1160 if (this.coprocEnvironments.isEmpty()) { 1161 return defaultResult; 1162 } 1163 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1164 regionObserverGetter, defaultResult, bypassable) { 1165 @Override 1166 public Result call(RegionObserver observer) throws IOException { 1167 return observer.preAppendAfterRowLock(this, append); 1168 } 1169 }); 1170 } 1171 1172 /** 1173 * Supports Coprocessor 'bypass'. 1174 * @param increment increment object 1175 * @param edit The WALEdit object. 1176 * @return result to return to client if default operation should be bypassed, null otherwise 1177 * @throws IOException if an error occurred on the coprocessor 1178 */ 1179 public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException { 1180 boolean bypassable = true; 1181 Result defaultResult = null; 1182 if (coprocEnvironments.isEmpty()) { 1183 return defaultResult; 1184 } 1185 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1186 regionObserverGetter, defaultResult, bypassable) { 1187 @Override 1188 public Result call(RegionObserver observer) throws IOException { 1189 return observer.preIncrement(this, increment, edit); 1190 } 1191 }); 1192 } 1193 1194 /** 1195 * Supports Coprocessor 'bypass'. 1196 * @param increment increment object 1197 * @return result to return to client if default operation should be bypassed, null otherwise 1198 * @throws IOException if an error occurred on the coprocessor 1199 */ 1200 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1201 boolean bypassable = true; 1202 Result defaultResult = null; 1203 if (coprocEnvironments.isEmpty()) { 1204 return defaultResult; 1205 } 1206 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1207 regionObserverGetter, defaultResult, bypassable) { 1208 @Override 1209 public Result call(RegionObserver observer) throws IOException { 1210 return observer.preIncrementAfterRowLock(this, increment); 1211 } 1212 }); 1213 } 1214 1215 /** 1216 * @param append Append object 1217 * @param result the result returned by the append 1218 * @param edit The WALEdit object. 1219 * @throws IOException if an error occurred on the coprocessor 1220 */ 1221 public Result postAppend(final Append append, final Result result, final WALEdit edit) 1222 throws IOException { 1223 if (this.coprocEnvironments.isEmpty()) { 1224 return result; 1225 } 1226 return execOperationWithResult( 1227 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1228 @Override 1229 public Result call(RegionObserver observer) throws IOException { 1230 return observer.postAppend(this, append, result, edit); 1231 } 1232 }); 1233 } 1234 1235 /** 1236 * @param increment increment object 1237 * @param result the result returned by postIncrement 1238 * @param edit The WALEdit object. 1239 * @throws IOException if an error occurred on the coprocessor 1240 */ 1241 public Result postIncrement(final Increment increment, Result result, final WALEdit edit) 1242 throws IOException { 1243 if (this.coprocEnvironments.isEmpty()) { 1244 return result; 1245 } 1246 return execOperationWithResult( 1247 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1248 @Override 1249 public Result call(RegionObserver observer) throws IOException { 1250 return observer.postIncrement(this, increment, getResult(), edit); 1251 } 1252 }); 1253 } 1254 1255 /** 1256 * @param scan the Scan specification 1257 * @exception IOException Exception 1258 */ 1259 public void preScannerOpen(final Scan scan) throws IOException { 1260 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1261 @Override 1262 public void call(RegionObserver observer) throws IOException { 1263 observer.preScannerOpen(this, scan); 1264 } 1265 }); 1266 } 1267 1268 /** 1269 * @param scan the Scan specification 1270 * @param s the scanner 1271 * @return the scanner instance to use 1272 * @exception IOException Exception 1273 */ 1274 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1275 if (this.coprocEnvironments.isEmpty()) { 1276 return s; 1277 } 1278 return execOperationWithResult( 1279 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1280 @Override 1281 public RegionScanner call(RegionObserver observer) throws IOException { 1282 return observer.postScannerOpen(this, scan, getResult()); 1283 } 1284 }); 1285 } 1286 1287 /** 1288 * @param s the scanner 1289 * @param results the result set returned by the region server 1290 * @param limit the maximum number of results to return 1291 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1292 * @exception IOException Exception 1293 */ 1294 public Boolean preScannerNext(final InternalScanner s, final List<Result> results, 1295 final int limit) throws IOException { 1296 boolean bypassable = true; 1297 boolean defaultResult = false; 1298 if (coprocEnvironments.isEmpty()) { 1299 return null; 1300 } 1301 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1302 regionObserverGetter, defaultResult, bypassable) { 1303 @Override 1304 public Boolean call(RegionObserver observer) throws IOException { 1305 return observer.preScannerNext(this, s, results, limit, getResult()); 1306 } 1307 }); 1308 } 1309 1310 /** 1311 * @param s the scanner 1312 * @param results the result set returned by the region server 1313 * @param limit the maximum number of results to return 1314 * @return 'has more' indication to give to client 1315 * @exception IOException Exception 1316 */ 1317 public boolean postScannerNext(final InternalScanner s, final List<Result> results, 1318 final int limit, boolean hasMore) throws IOException { 1319 if (this.coprocEnvironments.isEmpty()) { 1320 return hasMore; 1321 } 1322 return execOperationWithResult( 1323 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1324 @Override 1325 public Boolean call(RegionObserver observer) throws IOException { 1326 return observer.postScannerNext(this, s, results, limit, getResult()); 1327 } 1328 }); 1329 } 1330 1331 /** 1332 * This will be called by the scan flow when the current scanned row is being filtered out by the 1333 * filter. 1334 * @param s the scanner 1335 * @param curRowCell The cell in the current row which got filtered out 1336 * @return whether more rows are available for the scanner or not 1337 */ 1338 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1339 throws IOException { 1340 // short circuit for performance 1341 boolean defaultResult = true; 1342 if (!hasCustomPostScannerFilterRow) { 1343 return defaultResult; 1344 } 1345 if (this.coprocEnvironments.isEmpty()) { 1346 return defaultResult; 1347 } 1348 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1349 regionObserverGetter, defaultResult) { 1350 @Override 1351 public Boolean call(RegionObserver observer) throws IOException { 1352 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1353 } 1354 }); 1355 } 1356 1357 /** 1358 * Supports Coprocessor 'bypass'. 1359 * @param s the scanner 1360 * @return true if default behavior should be bypassed, false otherwise 1361 * @exception IOException Exception 1362 */ 1363 // Should this be bypassable? 1364 public boolean preScannerClose(final InternalScanner s) throws IOException { 1365 return execOperation( 1366 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1367 @Override 1368 public void call(RegionObserver observer) throws IOException { 1369 observer.preScannerClose(this, s); 1370 } 1371 }); 1372 } 1373 1374 /** 1375 * @exception IOException Exception 1376 */ 1377 public void postScannerClose(final InternalScanner s) throws IOException { 1378 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1379 @Override 1380 public void call(RegionObserver observer) throws IOException { 1381 observer.postScannerClose(this, s); 1382 } 1383 }); 1384 } 1385 1386 /** 1387 * Called before open store scanner for user scan. 1388 */ 1389 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1390 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1391 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1392 execOperation(new RegionObserverOperationWithoutResult() { 1393 @Override 1394 public void call(RegionObserver observer) throws IOException { 1395 observer.preStoreScannerOpen(this, store, builder); 1396 } 1397 }); 1398 return builder.build(); 1399 } 1400 1401 /** 1402 * @param info the RegionInfo for this region 1403 * @param edits the file of recovered edits 1404 */ 1405 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1406 execOperation( 1407 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1408 @Override 1409 public void call(RegionObserver observer) throws IOException { 1410 observer.preReplayWALs(this, info, edits); 1411 } 1412 }); 1413 } 1414 1415 /** 1416 * @param info the RegionInfo for this region 1417 * @param edits the file of recovered edits 1418 * @throws IOException Exception 1419 */ 1420 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1421 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1422 @Override 1423 public void call(RegionObserver observer) throws IOException { 1424 observer.postReplayWALs(this, info, edits); 1425 } 1426 }); 1427 } 1428 1429 /** 1430 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1431 */ 1432 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1433 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1434 @Override 1435 public void call(RegionObserver observer) throws IOException { 1436 observer.preBulkLoadHFile(this, familyPaths); 1437 } 1438 }); 1439 } 1440 1441 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1442 throws IOException { 1443 return execOperation( 1444 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1445 @Override 1446 public void call(RegionObserver observer) throws IOException { 1447 observer.preCommitStoreFile(this, family, pairs); 1448 } 1449 }); 1450 } 1451 1452 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) 1453 throws IOException { 1454 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1455 @Override 1456 public void call(RegionObserver observer) throws IOException { 1457 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1458 } 1459 }); 1460 } 1461 1462 /** 1463 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1464 * @param map Map of CF to List of file paths for the final loaded files 1465 */ 1466 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1467 Map<byte[], List<Path>> map) throws IOException { 1468 if (this.coprocEnvironments.isEmpty()) { 1469 return; 1470 } 1471 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1472 @Override 1473 public void call(RegionObserver observer) throws IOException { 1474 observer.postBulkLoadHFile(this, familyPaths, map); 1475 } 1476 }); 1477 } 1478 1479 public void postStartRegionOperation(final Operation op) throws IOException { 1480 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1481 @Override 1482 public void call(RegionObserver observer) throws IOException { 1483 observer.postStartRegionOperation(this, op); 1484 } 1485 }); 1486 } 1487 1488 public void postCloseRegionOperation(final Operation op) throws IOException { 1489 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1490 @Override 1491 public void call(RegionObserver observer) throws IOException { 1492 observer.postCloseRegionOperation(this, op); 1493 } 1494 }); 1495 } 1496 1497 /** 1498 * @param fs fileystem to read from 1499 * @param p path to the file 1500 * @param in {@link FSDataInputStreamWrapper} 1501 * @param size Full size of the file 1502 * @param r original reference file. This will be not null only when reading a split file. 1503 * @return a Reader instance to use instead of the base reader if overriding default behavior, 1504 * null otherwise 1505 */ 1506 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1507 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1508 final Reference r) throws IOException { 1509 if (coprocEnvironments.isEmpty()) { 1510 return null; 1511 } 1512 return execOperationWithResult( 1513 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1514 @Override 1515 public StoreFileReader call(RegionObserver observer) throws IOException { 1516 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1517 } 1518 }); 1519 } 1520 1521 /** 1522 * @param fs fileystem to read from 1523 * @param p path to the file 1524 * @param in {@link FSDataInputStreamWrapper} 1525 * @param size Full size of the file 1526 * @param r original reference file. This will be not null only when reading a split file. 1527 * @param reader the base reader instance 1528 * @return The reader to use 1529 */ 1530 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1531 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1532 final Reference r, final StoreFileReader reader) throws IOException { 1533 if (this.coprocEnvironments.isEmpty()) { 1534 return reader; 1535 } 1536 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>( 1537 regionObserverGetter, reader) { 1538 @Override 1539 public StoreFileReader call(RegionObserver observer) throws IOException { 1540 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1541 } 1542 }); 1543 } 1544 1545 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1546 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1547 if (this.coprocEnvironments.isEmpty()) { 1548 return cellPairs; 1549 } 1550 return execOperationWithResult( 1551 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1552 cellPairs) { 1553 @Override 1554 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1555 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1556 } 1557 }); 1558 } 1559 1560 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1561 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1562 if (this.coprocEnvironments.isEmpty()) { 1563 return cellPairs; 1564 } 1565 return execOperationWithResult( 1566 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1567 cellPairs) { 1568 @Override 1569 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1570 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1571 } 1572 }); 1573 } 1574 1575 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1576 if (this.coprocEnvironments.isEmpty()) { 1577 return; 1578 } 1579 execOperation(new RegionObserverOperationWithoutResult() { 1580 @Override 1581 public void call(RegionObserver observer) throws IOException { 1582 observer.preWALAppend(this, key, edit); 1583 } 1584 }); 1585 } 1586 1587 public Message preEndpointInvocation(final Service service, final String methodName, 1588 Message request) throws IOException { 1589 if (coprocEnvironments.isEmpty()) { 1590 return request; 1591 } 1592 return execOperationWithResult( 1593 new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) { 1594 @Override 1595 public Message call(EndpointObserver observer) throws IOException { 1596 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1597 } 1598 }); 1599 } 1600 1601 public void postEndpointInvocation(final Service service, final String methodName, 1602 final Message request, final Message.Builder responseBuilder) throws IOException { 1603 execOperation(coprocEnvironments.isEmpty() 1604 ? null 1605 : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1606 @Override 1607 public void call(EndpointObserver observer) throws IOException { 1608 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1609 } 1610 }); 1611 } 1612 1613 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1614 if (this.coprocEnvironments.isEmpty()) { 1615 return result; 1616 } 1617 return execOperationWithResult( 1618 new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) { 1619 @Override 1620 public DeleteTracker call(RegionObserver observer) throws IOException { 1621 return observer.postInstantiateDeleteTracker(this, getResult()); 1622 } 1623 }); 1624 } 1625 1626 ///////////////////////////////////////////////////////////////////////////////////////////////// 1627 // BulkLoadObserver hooks 1628 ///////////////////////////////////////////////////////////////////////////////////////////////// 1629 public void prePrepareBulkLoad(User user) throws IOException { 1630 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1631 @Override 1632 protected void call(BulkLoadObserver observer) throws IOException { 1633 observer.prePrepareBulkLoad(this); 1634 } 1635 }); 1636 } 1637 1638 public void preCleanupBulkLoad(User user) throws IOException { 1639 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1640 @Override 1641 protected void call(BulkLoadObserver observer) throws IOException { 1642 observer.preCleanupBulkLoad(this); 1643 } 1644 }); 1645 } 1646}