001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.lang.reflect.InvocationTargetException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.UUID; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.RawCellBuilder; 037import org.apache.hadoop.hbase.RawCellBuilderFactory; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.client.Append; 040import org.apache.hadoop.hbase.client.CheckAndMutate; 041import org.apache.hadoop.hbase.client.CheckAndMutateResult; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Mutation; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.SharedConnection; 053import org.apache.hadoop.hbase.client.TableDescriptor; 054import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 055import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 056import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 057import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 058import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 059import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 060import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 061import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.ObserverContext; 063import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 064import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 065import org.apache.hadoop.hbase.coprocessor.RegionObserver; 066import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 067import org.apache.hadoop.hbase.io.Reference; 068import org.apache.hadoop.hbase.io.hfile.CacheConfig; 069import org.apache.hadoop.hbase.metrics.MetricRegistry; 070import org.apache.hadoop.hbase.regionserver.Region.Operation; 071import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 072import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 073import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 074import org.apache.hadoop.hbase.security.User; 075import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 076import org.apache.hadoop.hbase.util.Pair; 077import org.apache.hadoop.hbase.wal.WALEdit; 078import org.apache.hadoop.hbase.wal.WALKey; 079import org.apache.yetus.audience.InterfaceAudience; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082import org.apache.hbase.thirdparty.com.google.protobuf.Message; 083import org.apache.hbase.thirdparty.com.google.protobuf.Service; 084import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 085import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 086 087/** 088 * Implements the coprocessor environment and runtime support for coprocessors 089 * loaded within a {@link Region}. 090 */ 091@InterfaceAudience.Private 092public class RegionCoprocessorHost 093 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 094 095 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 096 // The shared data map 097 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 098 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 099 AbstractReferenceMap.ReferenceStrength.WEAK); 100 101 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 102 private final boolean hasCustomPostScannerFilterRow; 103 104 /* 105 * Whether any configured CPs override postScannerFilterRow hook 106 */ 107 public boolean hasCustomPostScannerFilterRow() { 108 return hasCustomPostScannerFilterRow; 109 } 110 111 /** 112 * 113 * Encapsulation of the environment of each coprocessor 114 */ 115 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 116 implements RegionCoprocessorEnvironment { 117 private Region region; 118 ConcurrentMap<String, Object> sharedData; 119 private final MetricRegistry metricRegistry; 120 private final RegionServerServices services; 121 122 /** 123 * Constructor 124 * @param impl the coprocessor instance 125 * @param priority chaining priority 126 */ 127 public RegionEnvironment(final RegionCoprocessor impl, final int priority, 128 final int seq, final Configuration conf, final Region region, 129 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 130 super(impl, priority, seq, conf); 131 this.region = region; 132 this.sharedData = sharedData; 133 this.services = services; 134 this.metricRegistry = 135 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 136 } 137 138 /** @return the region */ 139 @Override 140 public Region getRegion() { 141 return region; 142 } 143 144 @Override 145 public OnlineRegions getOnlineRegions() { 146 return this.services; 147 } 148 149 @Override 150 public Connection getConnection() { 151 // Mocks may have services as null at test time. 152 return services != null ? new SharedConnection(services.getConnection()) : null; 153 } 154 155 @Override 156 public Connection createConnection(Configuration conf) throws IOException { 157 return services != null ? this.services.createConnection(conf) : null; 158 } 159 160 @Override 161 public ServerName getServerName() { 162 return services != null? services.getServerName(): null; 163 } 164 165 @Override 166 public void shutdown() { 167 super.shutdown(); 168 MetricsCoprocessor.removeRegistry(this.metricRegistry); 169 } 170 171 @Override 172 public ConcurrentMap<String, Object> getSharedData() { 173 return sharedData; 174 } 175 176 @Override 177 public RegionInfo getRegionInfo() { 178 return region.getRegionInfo(); 179 } 180 181 @Override 182 public MetricRegistry getMetricRegistryForRegionServer() { 183 return metricRegistry; 184 } 185 186 @Override 187 public RawCellBuilder getCellBuilder() { 188 // We always do a DEEP_COPY only 189 return RawCellBuilderFactory.create(); 190 } 191 } 192 193 /** 194 * Special version of RegionEnvironment that exposes RegionServerServices for Core 195 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 196 */ 197 private static class RegionEnvironmentForCoreCoprocessors extends 198 RegionEnvironment implements HasRegionServerServices { 199 private final RegionServerServices rsServices; 200 201 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 202 final int seq, final Configuration conf, final Region region, 203 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 204 super(impl, priority, seq, conf, region, services, sharedData); 205 this.rsServices = services; 206 } 207 208 /** 209 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 210 * consumption. 211 */ 212 @Override 213 public RegionServerServices getRegionServerServices() { 214 return this.rsServices; 215 } 216 } 217 218 static class TableCoprocessorAttribute { 219 private Path path; 220 private String className; 221 private int priority; 222 private Configuration conf; 223 224 public TableCoprocessorAttribute(Path path, String className, int priority, 225 Configuration conf) { 226 this.path = path; 227 this.className = className; 228 this.priority = priority; 229 this.conf = conf; 230 } 231 232 public Path getPath() { 233 return path; 234 } 235 236 public String getClassName() { 237 return className; 238 } 239 240 public int getPriority() { 241 return priority; 242 } 243 244 public Configuration getConf() { 245 return conf; 246 } 247 } 248 249 /** The region server services */ 250 RegionServerServices rsServices; 251 /** The region */ 252 HRegion region; 253 254 /** 255 * Constructor 256 * @param region the region 257 * @param rsServices interface to available region server functionality 258 * @param conf the configuration 259 */ 260 public RegionCoprocessorHost(final HRegion region, 261 final RegionServerServices rsServices, final Configuration conf) { 262 super(rsServices); 263 this.conf = conf; 264 this.rsServices = rsServices; 265 this.region = region; 266 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 267 268 // load system default cp's from configuration. 269 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 270 271 // load system default cp's for user tables from configuration. 272 if (!region.getRegionInfo().getTable().isSystemTable()) { 273 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 274 } 275 276 // load Coprocessor From HDFS 277 loadTableCoprocessors(conf); 278 279 // now check whether any coprocessor implements postScannerFilterRow 280 boolean hasCustomPostScannerFilterRow = false; 281 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { 282 if (env.getInstance() instanceof RegionObserver) { 283 Class<?> clazz = env.getInstance().getClass(); 284 for (;;) { 285 if (clazz == Object.class) { 286 // we dont need to look postScannerFilterRow into Object class 287 break; // break the inner loop 288 } 289 try { 290 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 291 InternalScanner.class, Cell.class, boolean.class); 292 // this coprocessor has a custom version of postScannerFilterRow 293 hasCustomPostScannerFilterRow = true; 294 break out; 295 } catch (NoSuchMethodException ignore) { 296 } 297 // the deprecated signature still exists 298 try { 299 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 300 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 301 // this coprocessor has a custom version of postScannerFilterRow 302 hasCustomPostScannerFilterRow = true; 303 break out; 304 } catch (NoSuchMethodException ignore) { 305 } 306 clazz = clazz.getSuperclass(); 307 } 308 } 309 } 310 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 311 } 312 313 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 314 TableDescriptor htd) { 315 return htd.getCoprocessorDescriptors().stream().map(cp -> { 316 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 317 Configuration ourConf; 318 if (!cp.getProperties().isEmpty()) { 319 // do an explicit deep copy of the passed configuration 320 ourConf = new Configuration(false); 321 HBaseConfiguration.merge(ourConf, conf); 322 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 323 } else { 324 ourConf = conf; 325 } 326 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 327 }).collect(Collectors.toList()); 328 } 329 330 /** 331 * Sanity check the table coprocessor attributes of the supplied schema. Will 332 * throw an exception if there is a problem. 333 * @param conf 334 * @param htd 335 * @throws IOException 336 */ 337 public static void testTableCoprocessorAttrs(final Configuration conf, 338 final TableDescriptor htd) throws IOException { 339 String pathPrefix = UUID.randomUUID().toString(); 340 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { 341 if (attr.getPriority() < 0) { 342 throw new IOException("Priority for coprocessor " + attr.getClassName() + 343 " cannot be less than 0"); 344 } 345 ClassLoader old = Thread.currentThread().getContextClassLoader(); 346 try { 347 ClassLoader cl; 348 if (attr.getPath() != null) { 349 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 350 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 351 } else { 352 cl = CoprocessorHost.class.getClassLoader(); 353 } 354 Thread.currentThread().setContextClassLoader(cl); 355 if (cl instanceof CoprocessorClassLoader) { 356 String[] includedClassPrefixes = null; 357 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 358 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 359 includedClassPrefixes = prefixes.split(";"); 360 } 361 ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes); 362 } else { 363 cl.loadClass(attr.getClassName()); 364 } 365 } catch (ClassNotFoundException e) { 366 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 367 } finally { 368 Thread.currentThread().setContextClassLoader(old); 369 } 370 } 371 } 372 373 void loadTableCoprocessors(final Configuration conf) { 374 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 375 DEFAULT_COPROCESSORS_ENABLED); 376 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 377 DEFAULT_USER_COPROCESSORS_ENABLED); 378 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 379 return; 380 } 381 382 // scan the table attributes for coprocessor load specifications 383 // initialize the coprocessors 384 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 385 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 386 region.getTableDescriptor())) { 387 // Load encompasses classloading and coprocessor initialization 388 try { 389 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), 390 attr.getPriority(), attr.getConf()); 391 if (env == null) { 392 continue; 393 } 394 configured.add(env); 395 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + 396 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 397 } catch (Throwable t) { 398 // Coprocessor failed to load, do we abort on error? 399 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 400 abortServer(attr.getClassName(), t); 401 } else { 402 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 403 } 404 } 405 } 406 // add together to coprocessor set for COW efficiency 407 coprocEnvironments.addAll(configured); 408 } 409 410 @Override 411 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 412 Configuration conf) { 413 // If coprocessor exposes any services, register them. 414 for (Service service : instance.getServices()) { 415 region.registerService(service); 416 } 417 ConcurrentMap<String, Object> classData; 418 // make sure only one thread can add maps 419 synchronized (SHARED_DATA_MAP) { 420 // as long as at least one RegionEnvironment holds on to its classData it will 421 // remain in this map 422 classData = 423 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 424 k -> new ConcurrentHashMap<>()); 425 } 426 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 427 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 428 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, 429 rsServices, classData): 430 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 431 } 432 433 @Override 434 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 435 throws InstantiationException, IllegalAccessException { 436 try { 437 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 438 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 439 } else { 440 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 441 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 442 return null; 443 } 444 } catch (NoSuchMethodException | InvocationTargetException e) { 445 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 446 } 447 } 448 449 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 450 RegionCoprocessor::getRegionObserver; 451 452 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 453 RegionCoprocessor::getEndpointObserver; 454 455 abstract class RegionObserverOperationWithoutResult extends 456 ObserverOperationWithoutResult<RegionObserver> { 457 public RegionObserverOperationWithoutResult() { 458 super(regionObserverGetter); 459 } 460 461 public RegionObserverOperationWithoutResult(User user) { 462 super(regionObserverGetter, user); 463 } 464 465 public RegionObserverOperationWithoutResult(boolean bypassable) { 466 super(regionObserverGetter, null, bypassable); 467 } 468 469 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 470 super(regionObserverGetter, user, bypassable); 471 } 472 } 473 474 abstract class BulkLoadObserverOperation extends 475 ObserverOperationWithoutResult<BulkLoadObserver> { 476 public BulkLoadObserverOperation(User user) { 477 super(RegionCoprocessor::getBulkLoadObserver, user); 478 } 479 } 480 481 482 ////////////////////////////////////////////////////////////////////////////////////////////////// 483 // Observer operations 484 ////////////////////////////////////////////////////////////////////////////////////////////////// 485 486 ////////////////////////////////////////////////////////////////////////////////////////////////// 487 // Observer operations 488 ////////////////////////////////////////////////////////////////////////////////////////////////// 489 490 /** 491 * Invoked before a region open. 492 * 493 * @throws IOException Signals that an I/O exception has occurred. 494 */ 495 public void preOpen() throws IOException { 496 if (coprocEnvironments.isEmpty()) { 497 return; 498 } 499 execOperation(new RegionObserverOperationWithoutResult() { 500 @Override 501 public void call(RegionObserver observer) throws IOException { 502 observer.preOpen(this); 503 } 504 }); 505 } 506 507 508 /** 509 * Invoked after a region open 510 */ 511 public void postOpen() { 512 if (coprocEnvironments.isEmpty()) { 513 return; 514 } 515 try { 516 execOperation(new RegionObserverOperationWithoutResult() { 517 @Override 518 public void call(RegionObserver observer) throws IOException { 519 observer.postOpen(this); 520 } 521 }); 522 } catch (IOException e) { 523 LOG.warn(e.toString(), e); 524 } 525 } 526 527 /** 528 * Invoked before a region is closed 529 * @param abortRequested true if the server is aborting 530 */ 531 public void preClose(final boolean abortRequested) throws IOException { 532 execOperation(new RegionObserverOperationWithoutResult() { 533 @Override 534 public void call(RegionObserver observer) throws IOException { 535 observer.preClose(this, abortRequested); 536 } 537 }); 538 } 539 540 /** 541 * Invoked after a region is closed 542 * @param abortRequested true if the server is aborting 543 */ 544 public void postClose(final boolean abortRequested) { 545 try { 546 execOperation(new RegionObserverOperationWithoutResult() { 547 @Override 548 public void call(RegionObserver observer) throws IOException { 549 observer.postClose(this, abortRequested); 550 } 551 552 @Override 553 public void postEnvCall() { 554 shutdown(this.getEnvironment()); 555 } 556 }); 557 } catch (IOException e) { 558 LOG.warn(e.toString(), e); 559 } 560 } 561 562 /** 563 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 564 * available candidates. 565 * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed 566 * the passed in <code>candidates</code>. 567 * @param store The store where compaction is being requested 568 * @param candidates The currently available store files 569 * @param tracker used to track the life cycle of a compaction 570 * @param user the user 571 * @throws IOException 572 */ 573 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 574 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 575 if (coprocEnvironments.isEmpty()) { 576 return false; 577 } 578 boolean bypassable = true; 579 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 580 @Override 581 public void call(RegionObserver observer) throws IOException { 582 observer.preCompactSelection(this, store, candidates, tracker); 583 } 584 }); 585 } 586 587 /** 588 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 589 * candidates. 590 * @param store The store where compaction is being requested 591 * @param selected The store files selected to compact 592 * @param tracker used to track the life cycle of a compaction 593 * @param request the compaction request 594 * @param user the user 595 */ 596 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 597 final CompactionLifeCycleTracker tracker, final CompactionRequest request, 598 final User user) throws IOException { 599 if (coprocEnvironments.isEmpty()) { 600 return; 601 } 602 execOperation(new RegionObserverOperationWithoutResult(user) { 603 @Override 604 public void call(RegionObserver observer) throws IOException { 605 observer.postCompactSelection(this, store, selected, tracker, request); 606 } 607 }); 608 } 609 610 /** 611 * Called prior to opening store scanner for compaction. 612 */ 613 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 614 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 615 if (coprocEnvironments.isEmpty()) { 616 return store.getScanInfo(); 617 } 618 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 619 execOperation(new RegionObserverOperationWithoutResult(user) { 620 @Override 621 public void call(RegionObserver observer) throws IOException { 622 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 623 } 624 }); 625 return builder.build(); 626 } 627 628 /** 629 * Called prior to rewriting the store files selected for compaction 630 * @param store the store being compacted 631 * @param scanner the scanner used to read store data during compaction 632 * @param scanType type of Scan 633 * @param tracker used to track the life cycle of a compaction 634 * @param request the compaction request 635 * @param user the user 636 * @return Scanner to use (cannot be null!) 637 * @throws IOException 638 */ 639 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 640 final ScanType scanType, final CompactionLifeCycleTracker tracker, 641 final CompactionRequest request, final User user) throws IOException { 642 InternalScanner defaultResult = scanner; 643 if (coprocEnvironments.isEmpty()) { 644 return defaultResult; 645 } 646 return execOperationWithResult( 647 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, 648 defaultResult, user) { 649 @Override 650 public InternalScanner call(RegionObserver observer) throws IOException { 651 InternalScanner scanner = 652 observer.preCompact(this, store, getResult(), scanType, tracker, request); 653 if (scanner == null) { 654 throw new CoprocessorException("Null Scanner return disallowed!"); 655 } 656 return scanner; 657 } 658 }); 659 } 660 661 /** 662 * Called after the store compaction has completed. 663 * @param store the store being compacted 664 * @param resultFile the new store file written during compaction 665 * @param tracker used to track the life cycle of a compaction 666 * @param request the compaction request 667 * @param user the user 668 * @throws IOException 669 */ 670 public void postCompact(final HStore store, final HStoreFile resultFile, 671 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 672 throws IOException { 673 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { 674 @Override 675 public void call(RegionObserver observer) throws IOException { 676 observer.postCompact(this, store, resultFile, tracker, request); 677 } 678 }); 679 } 680 681 /** 682 * Invoked before create StoreScanner for flush. 683 */ 684 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 685 throws IOException { 686 if (coprocEnvironments.isEmpty()) { 687 return store.getScanInfo(); 688 } 689 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 690 execOperation(new RegionObserverOperationWithoutResult() { 691 @Override 692 public void call(RegionObserver observer) throws IOException { 693 observer.preFlushScannerOpen(this, store, builder, tracker); 694 } 695 }); 696 return builder.build(); 697 } 698 699 /** 700 * Invoked before a memstore flush 701 * @return Scanner to use (cannot be null!) 702 * @throws IOException 703 */ 704 public InternalScanner preFlush(HStore store, InternalScanner scanner, 705 FlushLifeCycleTracker tracker) throws IOException { 706 if (coprocEnvironments.isEmpty()) { 707 return scanner; 708 } 709 return execOperationWithResult( 710 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) { 711 @Override 712 public InternalScanner call(RegionObserver observer) throws IOException { 713 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 714 if (scanner == null) { 715 throw new CoprocessorException("Null Scanner return disallowed!"); 716 } 717 return scanner; 718 } 719 }); 720 } 721 722 /** 723 * Invoked before a memstore flush 724 * @throws IOException 725 */ 726 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 727 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 728 @Override 729 public void call(RegionObserver observer) throws IOException { 730 observer.preFlush(this, tracker); 731 } 732 }); 733 } 734 735 /** 736 * Invoked after a memstore flush 737 * @throws IOException 738 */ 739 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 740 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 741 @Override 742 public void call(RegionObserver observer) throws IOException { 743 observer.postFlush(this, tracker); 744 } 745 }); 746 } 747 748 /** 749 * Invoked before in memory compaction. 750 */ 751 public void preMemStoreCompaction(HStore store) throws IOException { 752 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 753 @Override 754 public void call(RegionObserver observer) throws IOException { 755 observer.preMemStoreCompaction(this, store); 756 } 757 }); 758 } 759 760 /** 761 * Invoked before create StoreScanner for in memory compaction. 762 */ 763 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 764 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 765 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 766 @Override 767 public void call(RegionObserver observer) throws IOException { 768 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 769 } 770 }); 771 return builder.build(); 772 } 773 774 /** 775 * Invoked before compacting memstore. 776 */ 777 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 778 throws IOException { 779 if (coprocEnvironments.isEmpty()) { 780 return scanner; 781 } 782 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 783 regionObserverGetter, scanner) { 784 @Override 785 public InternalScanner call(RegionObserver observer) throws IOException { 786 return observer.preMemStoreCompactionCompact(this, store, getResult()); 787 } 788 }); 789 } 790 791 /** 792 * Invoked after in memory compaction. 793 */ 794 public void postMemStoreCompaction(HStore store) throws IOException { 795 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 796 @Override 797 public void call(RegionObserver observer) throws IOException { 798 observer.postMemStoreCompaction(this, store); 799 } 800 }); 801 } 802 803 /** 804 * Invoked after a memstore flush 805 * @throws IOException 806 */ 807 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 808 throws IOException { 809 if (coprocEnvironments.isEmpty()) { 810 return; 811 } 812 execOperation(new RegionObserverOperationWithoutResult() { 813 @Override 814 public void call(RegionObserver observer) throws IOException { 815 observer.postFlush(this, store, storeFile, tracker); 816 } 817 }); 818 } 819 820 // RegionObserver support 821 /** 822 * Supports Coprocessor 'bypass'. 823 * @param get the Get request 824 * @param results What to return if return is true/'bypass'. 825 * @return true if default processing should be bypassed. 826 * @exception IOException Exception 827 */ 828 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 829 if (coprocEnvironments.isEmpty()) { 830 return false; 831 } 832 boolean bypassable = true; 833 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 834 @Override 835 public void call(RegionObserver observer) throws IOException { 836 observer.preGetOp(this, get, results); 837 } 838 }); 839 } 840 841 /** 842 * @param get the Get request 843 * @param results the result set 844 * @exception IOException Exception 845 */ 846 public void postGet(final Get get, final List<Cell> results) 847 throws IOException { 848 if (coprocEnvironments.isEmpty()) { 849 return; 850 } 851 execOperation(new RegionObserverOperationWithoutResult() { 852 @Override 853 public void call(RegionObserver observer) throws IOException { 854 observer.postGetOp(this, get, results); 855 } 856 }); 857 } 858 859 /** 860 * Supports Coprocessor 'bypass'. 861 * @param get the Get request 862 * @return true or false to return to client if bypassing normal operation, or null otherwise 863 * @exception IOException Exception 864 */ 865 public Boolean preExists(final Get get) throws IOException { 866 boolean bypassable = true; 867 boolean defaultResult = false; 868 if (coprocEnvironments.isEmpty()) { 869 return null; 870 } 871 return execOperationWithResult( 872 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 873 defaultResult, bypassable) { 874 @Override 875 public Boolean call(RegionObserver observer) throws IOException { 876 return observer.preExists(this, get, getResult()); 877 } 878 }); 879 } 880 881 /** 882 * @param get the Get request 883 * @param result the result returned by the region server 884 * @return the result to return to the client 885 * @exception IOException Exception 886 */ 887 public boolean postExists(final Get get, boolean result) 888 throws IOException { 889 if (this.coprocEnvironments.isEmpty()) { 890 return result; 891 } 892 return execOperationWithResult( 893 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 894 @Override 895 public Boolean call(RegionObserver observer) throws IOException { 896 return observer.postExists(this, get, getResult()); 897 } 898 }); 899 } 900 901 /** 902 * Supports Coprocessor 'bypass'. 903 * @param put The Put object 904 * @param edit The WALEdit object. 905 * @param durability The durability used 906 * @return true if default processing should be bypassed 907 * @exception IOException Exception 908 */ 909 public boolean prePut(final Put put, final WALEdit edit, final Durability durability) 910 throws IOException { 911 if (coprocEnvironments.isEmpty()) { 912 return false; 913 } 914 boolean bypassable = true; 915 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 916 @Override 917 public void call(RegionObserver observer) throws IOException { 918 observer.prePut(this, put, edit, durability); 919 } 920 }); 921 } 922 923 /** 924 * Supports Coprocessor 'bypass'. 925 * @param mutation - the current mutation 926 * @param kv - the current cell 927 * @param byteNow - current timestamp in bytes 928 * @param get - the get that could be used 929 * Note that the get only does not specify the family and qualifier that should be used 930 * @return true if default processing should be bypassed 931 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 932 * Coprocessor for its needs only. Will be removed. 933 */ 934 @Deprecated 935 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, 936 final Cell kv, final byte[] byteNow, final Get get) throws IOException { 937 if (coprocEnvironments.isEmpty()) { 938 return false; 939 } 940 boolean bypassable = true; 941 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 942 @Override 943 public void call(RegionObserver observer) throws IOException { 944 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 945 } 946 }); 947 } 948 949 /** 950 * @param put The Put object 951 * @param edit The WALEdit object. 952 * @param durability The durability used 953 * @exception IOException Exception 954 */ 955 public void postPut(final Put put, final WALEdit edit, final Durability durability) 956 throws IOException { 957 if (coprocEnvironments.isEmpty()) { 958 return; 959 } 960 execOperation(new RegionObserverOperationWithoutResult() { 961 @Override 962 public void call(RegionObserver observer) throws IOException { 963 observer.postPut(this, put, edit, durability); 964 } 965 }); 966 } 967 968 /** 969 * Supports Coprocessor 'bypass'. 970 * @param delete The Delete object 971 * @param edit The WALEdit object. 972 * @param durability The durability used 973 * @return true if default processing should be bypassed 974 * @exception IOException Exception 975 */ 976 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) 977 throws IOException { 978 if (this.coprocEnvironments.isEmpty()) { 979 return false; 980 } 981 boolean bypassable = true; 982 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 983 @Override 984 public void call(RegionObserver observer) throws IOException { 985 observer.preDelete(this, delete, edit, durability); 986 } 987 }); 988 } 989 990 /** 991 * @param delete The Delete object 992 * @param edit The WALEdit object. 993 * @param durability The durability used 994 * @exception IOException Exception 995 */ 996 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) 997 throws IOException { 998 execOperation(coprocEnvironments.isEmpty()? null: 999 new RegionObserverOperationWithoutResult() { 1000 @Override 1001 public void call(RegionObserver observer) throws IOException { 1002 observer.postDelete(this, delete, edit, durability); 1003 } 1004 }); 1005 } 1006 1007 public void preBatchMutate( 1008 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1009 if(this.coprocEnvironments.isEmpty()) { 1010 return; 1011 } 1012 execOperation(new RegionObserverOperationWithoutResult() { 1013 @Override 1014 public void call(RegionObserver observer) throws IOException { 1015 observer.preBatchMutate(this, miniBatchOp); 1016 } 1017 }); 1018 } 1019 1020 public void postBatchMutate( 1021 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1022 if (this.coprocEnvironments.isEmpty()) { 1023 return; 1024 } 1025 execOperation(new RegionObserverOperationWithoutResult() { 1026 @Override 1027 public void call(RegionObserver observer) throws IOException { 1028 observer.postBatchMutate(this, miniBatchOp); 1029 } 1030 }); 1031 } 1032 1033 public void postBatchMutateIndispensably( 1034 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) 1035 throws IOException { 1036 if (this.coprocEnvironments.isEmpty()) { 1037 return; 1038 } 1039 execOperation(new RegionObserverOperationWithoutResult() { 1040 @Override 1041 public void call(RegionObserver observer) throws IOException { 1042 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1043 } 1044 }); 1045 } 1046 1047 /** 1048 * Supports Coprocessor 'bypass'. 1049 * @param checkAndMutate the CheckAndMutate object 1050 * @return true or false to return to client if default processing should be bypassed, or null 1051 * otherwise 1052 * @throws IOException if an error occurred on the coprocessor 1053 */ 1054 public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) 1055 throws IOException { 1056 boolean bypassable = true; 1057 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1058 if (coprocEnvironments.isEmpty()) { 1059 return null; 1060 } 1061 return execOperationWithResult( 1062 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>( 1063 regionObserverGetter, defaultResult, bypassable) { 1064 @Override 1065 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1066 return observer.preCheckAndMutate(this, checkAndMutate, getResult()); 1067 } 1068 }); 1069 } 1070 1071 /** 1072 * Supports Coprocessor 'bypass'. 1073 * @param checkAndMutate the CheckAndMutate object 1074 * @return true or false to return to client if default processing should be bypassed, or null 1075 * otherwise 1076 * @throws IOException if an error occurred on the coprocessor 1077 */ 1078 public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) 1079 throws IOException { 1080 boolean bypassable = true; 1081 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1082 if (coprocEnvironments.isEmpty()) { 1083 return null; 1084 } 1085 return execOperationWithResult( 1086 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>( 1087 regionObserverGetter, defaultResult, bypassable) { 1088 @Override 1089 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1090 return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); 1091 } 1092 }); 1093 } 1094 1095 /** 1096 * @param checkAndMutate the CheckAndMutate object 1097 * @param result the result returned by the checkAndMutate 1098 * @return true or false to return to client if default processing should be bypassed, or null 1099 * otherwise 1100 * @throws IOException if an error occurred on the coprocessor 1101 */ 1102 public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, 1103 CheckAndMutateResult result) throws IOException { 1104 if (this.coprocEnvironments.isEmpty()) { 1105 return result; 1106 } 1107 return execOperationWithResult( 1108 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>( 1109 regionObserverGetter, result) { 1110 @Override 1111 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1112 return observer.postCheckAndMutate(this, checkAndMutate, getResult()); 1113 } 1114 }); 1115 } 1116 1117 /** 1118 * Supports Coprocessor 'bypass'. 1119 * @param append append object 1120 * @return result to return to client if default operation should be bypassed, null otherwise 1121 * @throws IOException if an error occurred on the coprocessor 1122 */ 1123 public Result preAppend(final Append append) throws IOException { 1124 boolean bypassable = true; 1125 Result defaultResult = null; 1126 if (this.coprocEnvironments.isEmpty()) { 1127 return defaultResult; 1128 } 1129 return execOperationWithResult( 1130 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1131 bypassable) { 1132 @Override 1133 public Result call(RegionObserver observer) throws IOException { 1134 return observer.preAppend(this, append); 1135 } 1136 }); 1137 } 1138 1139 /** 1140 * Supports Coprocessor 'bypass'. 1141 * @param append append object 1142 * @return result to return to client if default operation should be bypassed, null otherwise 1143 * @throws IOException if an error occurred on the coprocessor 1144 */ 1145 public Result preAppendAfterRowLock(final Append append) throws IOException { 1146 boolean bypassable = true; 1147 Result defaultResult = null; 1148 if (this.coprocEnvironments.isEmpty()) { 1149 return defaultResult; 1150 } 1151 return execOperationWithResult( 1152 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, 1153 defaultResult, bypassable) { 1154 @Override 1155 public Result call(RegionObserver observer) throws IOException { 1156 return observer.preAppendAfterRowLock(this, append); 1157 } 1158 }); 1159 } 1160 1161 /** 1162 * Supports Coprocessor 'bypass'. 1163 * @param increment increment object 1164 * @return result to return to client if default operation should be bypassed, null otherwise 1165 * @throws IOException if an error occurred on the coprocessor 1166 */ 1167 public Result preIncrement(final Increment increment) throws IOException { 1168 boolean bypassable = true; 1169 Result defaultResult = null; 1170 if (coprocEnvironments.isEmpty()) { 1171 return defaultResult; 1172 } 1173 return execOperationWithResult( 1174 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1175 bypassable) { 1176 @Override 1177 public Result call(RegionObserver observer) throws IOException { 1178 return observer.preIncrement(this, increment); 1179 } 1180 }); 1181 } 1182 1183 /** 1184 * Supports Coprocessor 'bypass'. 1185 * @param increment increment object 1186 * @return result to return to client if default operation should be bypassed, null otherwise 1187 * @throws IOException if an error occurred on the coprocessor 1188 */ 1189 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1190 boolean bypassable = true; 1191 Result defaultResult = null; 1192 if (coprocEnvironments.isEmpty()) { 1193 return defaultResult; 1194 } 1195 return execOperationWithResult( 1196 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1197 bypassable) { 1198 @Override 1199 public Result call(RegionObserver observer) throws IOException { 1200 return observer.preIncrementAfterRowLock(this, increment); 1201 } 1202 }); 1203 } 1204 1205 /** 1206 * @param append Append object 1207 * @param result the result returned by the append 1208 * @throws IOException if an error occurred on the coprocessor 1209 */ 1210 public Result postAppend(final Append append, final Result result) throws IOException { 1211 if (this.coprocEnvironments.isEmpty()) { 1212 return result; 1213 } 1214 return execOperationWithResult( 1215 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1216 @Override 1217 public Result call(RegionObserver observer) throws IOException { 1218 return observer.postAppend(this, append, result); 1219 } 1220 }); 1221 } 1222 1223 /** 1224 * @param increment increment object 1225 * @param result the result returned by postIncrement 1226 * @throws IOException if an error occurred on the coprocessor 1227 */ 1228 public Result postIncrement(final Increment increment, Result result) throws IOException { 1229 if (this.coprocEnvironments.isEmpty()) { 1230 return result; 1231 } 1232 return execOperationWithResult( 1233 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1234 @Override 1235 public Result call(RegionObserver observer) throws IOException { 1236 return observer.postIncrement(this, increment, getResult()); 1237 } 1238 }); 1239 } 1240 1241 /** 1242 * @param scan the Scan specification 1243 * @exception IOException Exception 1244 */ 1245 public void preScannerOpen(final Scan scan) throws IOException { 1246 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1247 @Override 1248 public void call(RegionObserver observer) throws IOException { 1249 observer.preScannerOpen(this, scan); 1250 } 1251 }); 1252 } 1253 1254 /** 1255 * @param scan the Scan specification 1256 * @param s the scanner 1257 * @return the scanner instance to use 1258 * @exception IOException Exception 1259 */ 1260 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1261 if (this.coprocEnvironments.isEmpty()) { 1262 return s; 1263 } 1264 return execOperationWithResult( 1265 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1266 @Override 1267 public RegionScanner call(RegionObserver observer) throws IOException { 1268 return observer.postScannerOpen(this, scan, getResult()); 1269 } 1270 }); 1271 } 1272 1273 /** 1274 * @param s the scanner 1275 * @param results the result set returned by the region server 1276 * @param limit the maximum number of results to return 1277 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1278 * @exception IOException Exception 1279 */ 1280 public Boolean preScannerNext(final InternalScanner s, 1281 final List<Result> results, final int limit) throws IOException { 1282 boolean bypassable = true; 1283 boolean defaultResult = false; 1284 if (coprocEnvironments.isEmpty()) { 1285 return null; 1286 } 1287 return execOperationWithResult( 1288 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1289 defaultResult, bypassable) { 1290 @Override 1291 public Boolean call(RegionObserver observer) throws IOException { 1292 return observer.preScannerNext(this, s, results, limit, getResult()); 1293 } 1294 }); 1295 } 1296 1297 /** 1298 * @param s the scanner 1299 * @param results the result set returned by the region server 1300 * @param limit the maximum number of results to return 1301 * @param hasMore 1302 * @return 'has more' indication to give to client 1303 * @exception IOException Exception 1304 */ 1305 public boolean postScannerNext(final InternalScanner s, 1306 final List<Result> results, final int limit, boolean hasMore) 1307 throws IOException { 1308 if (this.coprocEnvironments.isEmpty()) { 1309 return hasMore; 1310 } 1311 return execOperationWithResult( 1312 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1313 @Override 1314 public Boolean call(RegionObserver observer) throws IOException { 1315 return observer.postScannerNext(this, s, results, limit, getResult()); 1316 } 1317 }); 1318 } 1319 1320 /** 1321 * This will be called by the scan flow when the current scanned row is being filtered out by the 1322 * filter. 1323 * @param s the scanner 1324 * @param curRowCell The cell in the current row which got filtered out 1325 * @return whether more rows are available for the scanner or not 1326 * @throws IOException 1327 */ 1328 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1329 throws IOException { 1330 // short circuit for performance 1331 boolean defaultResult = true; 1332 if (!hasCustomPostScannerFilterRow) { 1333 return defaultResult; 1334 } 1335 if (this.coprocEnvironments.isEmpty()) { 1336 return defaultResult; 1337 } 1338 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1339 regionObserverGetter, defaultResult) { 1340 @Override 1341 public Boolean call(RegionObserver observer) throws IOException { 1342 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1343 } 1344 }); 1345 } 1346 1347 /** 1348 * Supports Coprocessor 'bypass'. 1349 * @param s the scanner 1350 * @return true if default behavior should be bypassed, false otherwise 1351 * @exception IOException Exception 1352 */ 1353 // Should this be bypassable? 1354 public boolean preScannerClose(final InternalScanner s) throws IOException { 1355 return execOperation(coprocEnvironments.isEmpty()? null: 1356 new RegionObserverOperationWithoutResult(true) { 1357 @Override 1358 public void call(RegionObserver observer) throws IOException { 1359 observer.preScannerClose(this, s); 1360 } 1361 }); 1362 } 1363 1364 /** 1365 * @exception IOException Exception 1366 */ 1367 public void postScannerClose(final InternalScanner s) throws IOException { 1368 execOperation(coprocEnvironments.isEmpty()? null: 1369 new RegionObserverOperationWithoutResult() { 1370 @Override 1371 public void call(RegionObserver observer) throws IOException { 1372 observer.postScannerClose(this, s); 1373 } 1374 }); 1375 } 1376 1377 /** 1378 * Called before open store scanner for user scan. 1379 */ 1380 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1381 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1382 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1383 execOperation(new RegionObserverOperationWithoutResult() { 1384 @Override 1385 public void call(RegionObserver observer) throws IOException { 1386 observer.preStoreScannerOpen(this, store, builder); 1387 } 1388 }); 1389 return builder.build(); 1390 } 1391 1392 /** 1393 * @param info the RegionInfo for this region 1394 * @param edits the file of recovered edits 1395 */ 1396 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1397 execOperation(coprocEnvironments.isEmpty()? null: 1398 new RegionObserverOperationWithoutResult(true) { 1399 @Override 1400 public void call(RegionObserver observer) throws IOException { 1401 observer.preReplayWALs(this, info, edits); 1402 } 1403 }); 1404 } 1405 1406 /** 1407 * @param info the RegionInfo for this region 1408 * @param edits the file of recovered edits 1409 * @throws IOException Exception 1410 */ 1411 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1412 execOperation(coprocEnvironments.isEmpty()? null: 1413 new RegionObserverOperationWithoutResult() { 1414 @Override 1415 public void call(RegionObserver observer) throws IOException { 1416 observer.postReplayWALs(this, info, edits); 1417 } 1418 }); 1419 } 1420 1421 /** 1422 * Supports Coprocessor 'bypass'. 1423 * @return true if default behavior should be bypassed, false otherwise 1424 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1425 * with something that doesn't expose IntefaceAudience.Private classes. 1426 */ 1427 @Deprecated 1428 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1429 throws IOException { 1430 return execOperation(coprocEnvironments.isEmpty()? null: 1431 new RegionObserverOperationWithoutResult(true) { 1432 @Override 1433 public void call(RegionObserver observer) throws IOException { 1434 observer.preWALRestore(this, info, logKey, logEdit); 1435 } 1436 }); 1437 } 1438 1439 /** 1440 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1441 * with something that doesn't expose IntefaceAudience.Private classes. 1442 */ 1443 @Deprecated 1444 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1445 throws IOException { 1446 execOperation(coprocEnvironments.isEmpty()? null: 1447 new RegionObserverOperationWithoutResult() { 1448 @Override 1449 public void call(RegionObserver observer) throws IOException { 1450 observer.postWALRestore(this, info, logKey, logEdit); 1451 } 1452 }); 1453 } 1454 1455 /** 1456 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1457 */ 1458 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1459 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1460 @Override 1461 public void call(RegionObserver observer) throws IOException { 1462 observer.preBulkLoadHFile(this, familyPaths); 1463 } 1464 }); 1465 } 1466 1467 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1468 throws IOException { 1469 return execOperation(coprocEnvironments.isEmpty()? null: 1470 new RegionObserverOperationWithoutResult() { 1471 @Override 1472 public void call(RegionObserver observer) throws IOException { 1473 observer.preCommitStoreFile(this, family, pairs); 1474 } 1475 }); 1476 } 1477 1478 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { 1479 execOperation(coprocEnvironments.isEmpty()? null: 1480 new RegionObserverOperationWithoutResult() { 1481 @Override 1482 public void call(RegionObserver observer) throws IOException { 1483 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1484 } 1485 }); 1486 } 1487 1488 /** 1489 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1490 * @param map Map of CF to List of file paths for the final loaded files 1491 * @throws IOException 1492 */ 1493 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1494 Map<byte[], List<Path>> map) throws IOException { 1495 if (this.coprocEnvironments.isEmpty()) { 1496 return; 1497 } 1498 execOperation(coprocEnvironments.isEmpty()? null: 1499 new RegionObserverOperationWithoutResult() { 1500 @Override 1501 public void call(RegionObserver observer) throws IOException { 1502 observer.postBulkLoadHFile(this, familyPaths, map); 1503 } 1504 }); 1505 } 1506 1507 public void postStartRegionOperation(final Operation op) throws IOException { 1508 execOperation(coprocEnvironments.isEmpty()? null: 1509 new RegionObserverOperationWithoutResult() { 1510 @Override 1511 public void call(RegionObserver observer) throws IOException { 1512 observer.postStartRegionOperation(this, op); 1513 } 1514 }); 1515 } 1516 1517 public void postCloseRegionOperation(final Operation op) throws IOException { 1518 execOperation(coprocEnvironments.isEmpty()? null: 1519 new RegionObserverOperationWithoutResult() { 1520 @Override 1521 public void call(RegionObserver observer) throws IOException { 1522 observer.postCloseRegionOperation(this, op); 1523 } 1524 }); 1525 } 1526 1527 /** 1528 * @param fs fileystem to read from 1529 * @param p path to the file 1530 * @param in {@link FSDataInputStreamWrapper} 1531 * @param size Full size of the file 1532 * @param cacheConf 1533 * @param r original reference file. This will be not null only when reading a split file. 1534 * @return a Reader instance to use instead of the base reader if overriding 1535 * default behavior, null otherwise 1536 * @throws IOException 1537 */ 1538 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1539 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1540 final Reference r) throws IOException { 1541 if (coprocEnvironments.isEmpty()) { 1542 return null; 1543 } 1544 return execOperationWithResult( 1545 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1546 @Override 1547 public StoreFileReader call(RegionObserver observer) throws IOException { 1548 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1549 getResult()); 1550 } 1551 }); 1552 } 1553 1554 /** 1555 * @param fs fileystem to read from 1556 * @param p path to the file 1557 * @param in {@link FSDataInputStreamWrapper} 1558 * @param size Full size of the file 1559 * @param cacheConf 1560 * @param r original reference file. This will be not null only when reading a split file. 1561 * @param reader the base reader instance 1562 * @return The reader to use 1563 * @throws IOException 1564 */ 1565 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1566 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1567 final Reference r, final StoreFileReader reader) throws IOException { 1568 if (this.coprocEnvironments.isEmpty()) { 1569 return reader; 1570 } 1571 return execOperationWithResult( 1572 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) { 1573 @Override 1574 public StoreFileReader call(RegionObserver observer) throws IOException { 1575 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1576 getResult()); 1577 } 1578 }); 1579 } 1580 1581 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1582 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1583 if (this.coprocEnvironments.isEmpty()) { 1584 return cellPairs; 1585 } 1586 return execOperationWithResult( 1587 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1588 regionObserverGetter, cellPairs) { 1589 @Override 1590 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1591 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1592 } 1593 }); 1594 } 1595 1596 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1597 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1598 if (this.coprocEnvironments.isEmpty()) { 1599 return cellPairs; 1600 } 1601 return execOperationWithResult( 1602 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1603 regionObserverGetter, cellPairs) { 1604 @Override 1605 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1606 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1607 } 1608 }); 1609 } 1610 1611 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1612 if (this.coprocEnvironments.isEmpty()){ 1613 return; 1614 } 1615 execOperation(new RegionObserverOperationWithoutResult() { 1616 @Override 1617 public void call(RegionObserver observer) throws IOException { 1618 observer.preWALAppend(this, key, edit); 1619 } 1620 }); 1621 } 1622 1623 public Message preEndpointInvocation(final Service service, final String methodName, 1624 Message request) throws IOException { 1625 if (coprocEnvironments.isEmpty()) { 1626 return request; 1627 } 1628 return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver, 1629 Message>(endpointObserverGetter, request) { 1630 @Override 1631 public Message call(EndpointObserver observer) throws IOException { 1632 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1633 } 1634 }); 1635 } 1636 1637 public void postEndpointInvocation(final Service service, final String methodName, 1638 final Message request, final Message.Builder responseBuilder) throws IOException { 1639 execOperation(coprocEnvironments.isEmpty() ? null : 1640 new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1641 @Override 1642 public void call(EndpointObserver observer) throws IOException { 1643 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1644 } 1645 }); 1646 } 1647 1648 /** 1649 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1650 */ 1651 @Deprecated 1652 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1653 if (this.coprocEnvironments.isEmpty()) { 1654 return result; 1655 } 1656 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>( 1657 regionObserverGetter, result) { 1658 @Override 1659 public DeleteTracker call(RegionObserver observer) throws IOException { 1660 return observer.postInstantiateDeleteTracker(this, getResult()); 1661 } 1662 }); 1663 } 1664 1665 ///////////////////////////////////////////////////////////////////////////////////////////////// 1666 // BulkLoadObserver hooks 1667 ///////////////////////////////////////////////////////////////////////////////////////////////// 1668 public void prePrepareBulkLoad(User user) throws IOException { 1669 execOperation(coprocEnvironments.isEmpty() ? null : 1670 new BulkLoadObserverOperation(user) { 1671 @Override protected void call(BulkLoadObserver observer) throws IOException { 1672 observer.prePrepareBulkLoad(this); 1673 } 1674 }); 1675 } 1676 1677 public void preCleanupBulkLoad(User user) throws IOException { 1678 execOperation(coprocEnvironments.isEmpty() ? null : 1679 new BulkLoadObserverOperation(user) { 1680 @Override protected void call(BulkLoadObserver observer) throws IOException { 1681 observer.preCleanupBulkLoad(this); 1682 } 1683 }); 1684 } 1685}