001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.UUID; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.stream.Collectors; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Cell; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.RawCellBuilder; 036import org.apache.hadoop.hbase.RawCellBuilderFactory; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.Append; 039import org.apache.hadoop.hbase.client.CheckAndMutate; 040import org.apache.hadoop.hbase.client.CheckAndMutateResult; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.Scan; 050import org.apache.hadoop.hbase.client.SharedConnection; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 053import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 054import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 055import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 056import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 057import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 058import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 059import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 060import org.apache.hadoop.hbase.coprocessor.ObserverContext; 061import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 062import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 063import org.apache.hadoop.hbase.coprocessor.RegionObserver; 064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 065import org.apache.hadoop.hbase.io.Reference; 066import org.apache.hadoop.hbase.io.hfile.CacheConfig; 067import org.apache.hadoop.hbase.metrics.MetricRegistry; 068import org.apache.hadoop.hbase.regionserver.Region.Operation; 069import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 070import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 071import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 072import org.apache.hadoop.hbase.security.User; 073import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 074import org.apache.hadoop.hbase.util.Pair; 075import org.apache.hadoop.hbase.wal.WALEdit; 076import org.apache.hadoop.hbase.wal.WALKey; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081import org.apache.hbase.thirdparty.com.google.protobuf.Message; 082import org.apache.hbase.thirdparty.com.google.protobuf.Service; 083import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 084import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 085 086/** 087 * Implements the coprocessor environment and runtime support for coprocessors loaded within a 088 * {@link Region}. 089 */ 090@InterfaceAudience.Private 091public class RegionCoprocessorHost 092 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 093 094 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 095 // The shared data map 096 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 097 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 098 AbstractReferenceMap.ReferenceStrength.WEAK); 099 100 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 101 private final boolean hasCustomPostScannerFilterRow; 102 103 /* 104 * Whether any configured CPs override postScannerFilterRow hook 105 */ 106 public boolean hasCustomPostScannerFilterRow() { 107 return hasCustomPostScannerFilterRow; 108 } 109 110 /** 111 * Encapsulation of the environment of each coprocessor 112 */ 113 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 114 implements RegionCoprocessorEnvironment { 115 private Region region; 116 ConcurrentMap<String, Object> sharedData; 117 private final MetricRegistry metricRegistry; 118 private final RegionServerServices services; 119 120 /** 121 * Constructor 122 * @param impl the coprocessor instance 123 * @param priority chaining priority 124 */ 125 public RegionEnvironment(final RegionCoprocessor impl, final int priority, final int seq, 126 final Configuration conf, final Region region, final RegionServerServices services, 127 final ConcurrentMap<String, Object> sharedData) { 128 super(impl, priority, seq, conf); 129 this.region = region; 130 this.sharedData = sharedData; 131 this.services = services; 132 this.metricRegistry = 133 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 134 } 135 136 /** Returns the region */ 137 @Override 138 public Region getRegion() { 139 return region; 140 } 141 142 @Override 143 public OnlineRegions getOnlineRegions() { 144 return this.services; 145 } 146 147 @Override 148 public Connection getConnection() { 149 // Mocks may have services as null at test time. 150 return services != null ? new SharedConnection(services.getConnection()) : null; 151 } 152 153 @Override 154 public Connection createConnection(Configuration conf) throws IOException { 155 return services != null ? this.services.createConnection(conf) : null; 156 } 157 158 @Override 159 public ServerName getServerName() { 160 return services != null ? services.getServerName() : null; 161 } 162 163 @Override 164 public void shutdown() { 165 super.shutdown(); 166 MetricsCoprocessor.removeRegistry(this.metricRegistry); 167 } 168 169 @Override 170 public ConcurrentMap<String, Object> getSharedData() { 171 return sharedData; 172 } 173 174 @Override 175 public RegionInfo getRegionInfo() { 176 return region.getRegionInfo(); 177 } 178 179 @Override 180 public MetricRegistry getMetricRegistryForRegionServer() { 181 return metricRegistry; 182 } 183 184 @Override 185 public RawCellBuilder getCellBuilder() { 186 // We always do a DEEP_COPY only 187 return RawCellBuilderFactory.create(); 188 } 189 } 190 191 /** 192 * Special version of RegionEnvironment that exposes RegionServerServices for Core Coprocessors 193 * only. Temporary hack until Core Coprocessors are integrated into Core. 194 */ 195 private static class RegionEnvironmentForCoreCoprocessors extends RegionEnvironment 196 implements HasRegionServerServices { 197 private final RegionServerServices rsServices; 198 199 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 200 final int seq, final Configuration conf, final Region region, 201 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 202 super(impl, priority, seq, conf, region, services, sharedData); 203 this.rsServices = services; 204 } 205 206 /** 207 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 208 * consumption. 209 */ 210 @Override 211 public RegionServerServices getRegionServerServices() { 212 return this.rsServices; 213 } 214 } 215 216 static class TableCoprocessorAttribute { 217 private Path path; 218 private String className; 219 private int priority; 220 private Configuration conf; 221 222 public TableCoprocessorAttribute(Path path, String className, int priority, 223 Configuration conf) { 224 this.path = path; 225 this.className = className; 226 this.priority = priority; 227 this.conf = conf; 228 } 229 230 public Path getPath() { 231 return path; 232 } 233 234 public String getClassName() { 235 return className; 236 } 237 238 public int getPriority() { 239 return priority; 240 } 241 242 public Configuration getConf() { 243 return conf; 244 } 245 } 246 247 /** The region server services */ 248 RegionServerServices rsServices; 249 /** The region */ 250 HRegion region; 251 252 /** 253 * Constructor 254 * @param region the region 255 * @param rsServices interface to available region server functionality 256 * @param conf the configuration 257 */ 258 @SuppressWarnings("ReturnValueIgnored") // Checking method exists as CPU optimization 259 public RegionCoprocessorHost(final HRegion region, final RegionServerServices rsServices, 260 final Configuration conf) { 261 super(rsServices); 262 this.conf = conf; 263 this.rsServices = rsServices; 264 this.region = region; 265 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 266 267 // load system default cp's from configuration. 268 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 269 270 // load system default cp's for user tables from configuration. 271 if (!region.getRegionInfo().getTable().isSystemTable()) { 272 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 273 } 274 275 // load Coprocessor From HDFS 276 loadTableCoprocessors(conf); 277 278 // now check whether any coprocessor implements postScannerFilterRow 279 boolean hasCustomPostScannerFilterRow = false; 280 out: for (RegionCoprocessorEnvironment env : coprocEnvironments) { 281 if (env.getInstance() instanceof RegionObserver) { 282 Class<?> clazz = env.getInstance().getClass(); 283 for (;;) { 284 if (clazz == Object.class) { 285 // we dont need to look postScannerFilterRow into Object class 286 break; // break the inner loop 287 } 288 try { 289 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 290 InternalScanner.class, Cell.class, boolean.class); 291 // this coprocessor has a custom version of postScannerFilterRow 292 hasCustomPostScannerFilterRow = true; 293 break out; 294 } catch (NoSuchMethodException ignore) { 295 } 296 // the deprecated signature still exists 297 try { 298 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 299 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 300 // this coprocessor has a custom version of postScannerFilterRow 301 hasCustomPostScannerFilterRow = true; 302 break out; 303 } catch (NoSuchMethodException ignore) { 304 } 305 clazz = clazz.getSuperclass(); 306 } 307 } 308 } 309 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 310 } 311 312 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 313 TableDescriptor htd) { 314 return htd.getCoprocessorDescriptors().stream().map(cp -> { 315 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 316 Configuration ourConf; 317 if (!cp.getProperties().isEmpty()) { 318 // do an explicit deep copy of the passed configuration 319 ourConf = new Configuration(false); 320 HBaseConfiguration.merge(ourConf, conf); 321 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 322 } else { 323 ourConf = conf; 324 } 325 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 326 }).collect(Collectors.toList()); 327 } 328 329 /** 330 * Sanity check the table coprocessor attributes of the supplied schema. Will throw an exception 331 * if there is a problem. 332 */ 333 public static void testTableCoprocessorAttrs(final Configuration conf, final TableDescriptor htd) 334 throws IOException { 335 String pathPrefix = UUID.randomUUID().toString(); 336 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, htd)) { 337 if (attr.getPriority() < 0) { 338 throw new IOException( 339 "Priority for coprocessor " + attr.getClassName() + " cannot be less than 0"); 340 } 341 ClassLoader old = Thread.currentThread().getContextClassLoader(); 342 try { 343 ClassLoader cl; 344 if (attr.getPath() != null) { 345 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 346 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 347 } else { 348 cl = CoprocessorHost.class.getClassLoader(); 349 } 350 Thread.currentThread().setContextClassLoader(cl); 351 if (cl instanceof CoprocessorClassLoader) { 352 String[] includedClassPrefixes = null; 353 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 354 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 355 includedClassPrefixes = prefixes.split(";"); 356 } 357 ((CoprocessorClassLoader) cl).loadClass(attr.getClassName(), includedClassPrefixes); 358 } else { 359 cl.loadClass(attr.getClassName()); 360 } 361 } catch (ClassNotFoundException e) { 362 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 363 } finally { 364 Thread.currentThread().setContextClassLoader(old); 365 } 366 } 367 } 368 369 void loadTableCoprocessors(final Configuration conf) { 370 boolean coprocessorsEnabled = 371 conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_COPROCESSORS_ENABLED); 372 boolean tableCoprocessorsEnabled = 373 conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, DEFAULT_USER_COPROCESSORS_ENABLED); 374 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 375 return; 376 } 377 378 // scan the table attributes for coprocessor load specifications 379 // initialize the coprocessors 380 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 381 for (TableCoprocessorAttribute attr : getTableCoprocessorAttrsFromSchema(conf, 382 region.getTableDescriptor())) { 383 // Load encompasses classloading and coprocessor initialization 384 try { 385 RegionCoprocessorEnvironment env = 386 load(attr.getPath(), attr.getClassName(), attr.getPriority(), attr.getConf()); 387 if (env == null) { 388 continue; 389 } 390 configured.add(env); 391 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " 392 + region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 393 } catch (Throwable t) { 394 // Coprocessor failed to load, do we abort on error? 395 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 396 abortServer(attr.getClassName(), t); 397 } else { 398 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 399 } 400 } 401 } 402 // add together to coprocessor set for COW efficiency 403 coprocEnvironments.addAll(configured); 404 } 405 406 @Override 407 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 408 Configuration conf) { 409 // If coprocessor exposes any services, register them. 410 for (Service service : instance.getServices()) { 411 region.registerService(service); 412 } 413 ConcurrentMap<String, Object> classData; 414 // make sure only one thread can add maps 415 synchronized (SHARED_DATA_MAP) { 416 // as long as at least one RegionEnvironment holds on to its classData it will 417 // remain in this map 418 classData = SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 419 k -> new ConcurrentHashMap<>()); 420 } 421 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 422 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class) 423 ? new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, rsServices, 424 classData) 425 : new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 426 } 427 428 @Override 429 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 430 throws InstantiationException, IllegalAccessException { 431 try { 432 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 433 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 434 } else { 435 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 436 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 437 return null; 438 } 439 } catch (NoSuchMethodException | InvocationTargetException e) { 440 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 441 } 442 } 443 444 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 445 RegionCoprocessor::getRegionObserver; 446 447 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 448 RegionCoprocessor::getEndpointObserver; 449 450 abstract class RegionObserverOperationWithoutResult 451 extends ObserverOperationWithoutResult<RegionObserver> { 452 public RegionObserverOperationWithoutResult() { 453 super(regionObserverGetter); 454 } 455 456 public RegionObserverOperationWithoutResult(User user) { 457 super(regionObserverGetter, user); 458 } 459 460 public RegionObserverOperationWithoutResult(boolean bypassable) { 461 super(regionObserverGetter, null, bypassable); 462 } 463 464 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 465 super(regionObserverGetter, user, bypassable); 466 } 467 } 468 469 abstract class BulkLoadObserverOperation 470 extends ObserverOperationWithoutResult<BulkLoadObserver> { 471 public BulkLoadObserverOperation(User user) { 472 super(RegionCoprocessor::getBulkLoadObserver, user); 473 } 474 } 475 476 ////////////////////////////////////////////////////////////////////////////////////////////////// 477 // Observer operations 478 ////////////////////////////////////////////////////////////////////////////////////////////////// 479 480 ////////////////////////////////////////////////////////////////////////////////////////////////// 481 // Observer operations 482 ////////////////////////////////////////////////////////////////////////////////////////////////// 483 484 /** 485 * Invoked before a region open. 486 * @throws IOException Signals that an I/O exception has occurred. 487 */ 488 public void preOpen() throws IOException { 489 if (coprocEnvironments.isEmpty()) { 490 return; 491 } 492 execOperation(new RegionObserverOperationWithoutResult() { 493 @Override 494 public void call(RegionObserver observer) throws IOException { 495 observer.preOpen(this); 496 } 497 }); 498 } 499 500 /** 501 * Invoked after a region open 502 */ 503 public void postOpen() { 504 if (coprocEnvironments.isEmpty()) { 505 return; 506 } 507 try { 508 execOperation(new RegionObserverOperationWithoutResult() { 509 @Override 510 public void call(RegionObserver observer) throws IOException { 511 observer.postOpen(this); 512 } 513 }); 514 } catch (IOException e) { 515 LOG.warn(e.toString(), e); 516 } 517 } 518 519 /** 520 * Invoked before a region is closed 521 * @param abortRequested true if the server is aborting 522 */ 523 public void preClose(final boolean abortRequested) throws IOException { 524 execOperation(new RegionObserverOperationWithoutResult() { 525 @Override 526 public void call(RegionObserver observer) throws IOException { 527 observer.preClose(this, abortRequested); 528 } 529 }); 530 } 531 532 /** 533 * Invoked after a region is closed 534 * @param abortRequested true if the server is aborting 535 */ 536 public void postClose(final boolean abortRequested) { 537 try { 538 execOperation(new RegionObserverOperationWithoutResult() { 539 @Override 540 public void call(RegionObserver observer) throws IOException { 541 observer.postClose(this, abortRequested); 542 } 543 544 @Override 545 public void postEnvCall() { 546 shutdown(this.getEnvironment()); 547 } 548 }); 549 } catch (IOException e) { 550 LOG.warn(e.toString(), e); 551 } 552 } 553 554 /** 555 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 556 * available candidates. 557 * <p> 558 * Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed the 559 * passed in <code>candidates</code>. 560 * @param store The store where compaction is being requested 561 * @param candidates The currently available store files 562 * @param tracker used to track the life cycle of a compaction 563 * @param user the user 564 */ 565 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 566 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 567 if (coprocEnvironments.isEmpty()) { 568 return false; 569 } 570 boolean bypassable = true; 571 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 572 @Override 573 public void call(RegionObserver observer) throws IOException { 574 observer.preCompactSelection(this, store, candidates, tracker); 575 } 576 }); 577 } 578 579 /** 580 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 581 * candidates. 582 * @param store The store where compaction is being requested 583 * @param selected The store files selected to compact 584 * @param tracker used to track the life cycle of a compaction 585 * @param request the compaction request 586 * @param user the user 587 */ 588 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 589 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 590 throws IOException { 591 if (coprocEnvironments.isEmpty()) { 592 return; 593 } 594 execOperation(new RegionObserverOperationWithoutResult(user) { 595 @Override 596 public void call(RegionObserver observer) throws IOException { 597 observer.postCompactSelection(this, store, selected, tracker, request); 598 } 599 }); 600 } 601 602 /** 603 * Called prior to opening store scanner for compaction. 604 */ 605 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 606 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 607 if (coprocEnvironments.isEmpty()) { 608 return store.getScanInfo(); 609 } 610 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 611 execOperation(new RegionObserverOperationWithoutResult(user) { 612 @Override 613 public void call(RegionObserver observer) throws IOException { 614 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 615 } 616 }); 617 return builder.build(); 618 } 619 620 /** 621 * Called prior to rewriting the store files selected for compaction 622 * @param store the store being compacted 623 * @param scanner the scanner used to read store data during compaction 624 * @param scanType type of Scan 625 * @param tracker used to track the life cycle of a compaction 626 * @param request the compaction request 627 * @param user the user 628 * @return Scanner to use (cannot be null!) 629 */ 630 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 631 final ScanType scanType, final CompactionLifeCycleTracker tracker, 632 final CompactionRequest request, final User user) throws IOException { 633 InternalScanner defaultResult = scanner; 634 if (coprocEnvironments.isEmpty()) { 635 return defaultResult; 636 } 637 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 638 regionObserverGetter, defaultResult, user) { 639 @Override 640 public InternalScanner call(RegionObserver observer) throws IOException { 641 InternalScanner scanner = 642 observer.preCompact(this, store, getResult(), scanType, tracker, request); 643 if (scanner == null) { 644 throw new CoprocessorException("Null Scanner return disallowed!"); 645 } 646 return scanner; 647 } 648 }); 649 } 650 651 /** 652 * Called after the store compaction has completed. 653 * @param store the store being compacted 654 * @param resultFile the new store file written during compaction 655 * @param tracker used to track the life cycle of a compaction 656 * @param request the compaction request 657 * @param user the user 658 */ 659 public void postCompact(final HStore store, final HStoreFile resultFile, 660 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 661 throws IOException { 662 execOperation( 663 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(user) { 664 @Override 665 public void call(RegionObserver observer) throws IOException { 666 observer.postCompact(this, store, resultFile, tracker, request); 667 } 668 }); 669 } 670 671 /** 672 * Invoked before create StoreScanner for flush. 673 */ 674 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 675 throws IOException { 676 if (coprocEnvironments.isEmpty()) { 677 return store.getScanInfo(); 678 } 679 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 680 execOperation(new RegionObserverOperationWithoutResult() { 681 @Override 682 public void call(RegionObserver observer) throws IOException { 683 observer.preFlushScannerOpen(this, store, builder, tracker); 684 } 685 }); 686 return builder.build(); 687 } 688 689 /** 690 * Invoked before a memstore flush 691 * @return Scanner to use (cannot be null!) 692 */ 693 public InternalScanner preFlush(HStore store, InternalScanner scanner, 694 FlushLifeCycleTracker tracker) throws IOException { 695 if (coprocEnvironments.isEmpty()) { 696 return scanner; 697 } 698 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 699 regionObserverGetter, scanner) { 700 @Override 701 public InternalScanner call(RegionObserver observer) throws IOException { 702 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 703 if (scanner == null) { 704 throw new CoprocessorException("Null Scanner return disallowed!"); 705 } 706 return scanner; 707 } 708 }); 709 } 710 711 /** 712 * Invoked before a memstore flush 713 */ 714 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 715 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 716 @Override 717 public void call(RegionObserver observer) throws IOException { 718 observer.preFlush(this, tracker); 719 } 720 }); 721 } 722 723 /** 724 * Invoked after a memstore flush 725 */ 726 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 727 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 728 @Override 729 public void call(RegionObserver observer) throws IOException { 730 observer.postFlush(this, tracker); 731 } 732 }); 733 } 734 735 /** 736 * Invoked before in memory compaction. 737 */ 738 public void preMemStoreCompaction(HStore store) throws IOException { 739 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 740 @Override 741 public void call(RegionObserver observer) throws IOException { 742 observer.preMemStoreCompaction(this, store); 743 } 744 }); 745 } 746 747 /** 748 * Invoked before create StoreScanner for in memory compaction. 749 */ 750 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 751 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 752 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 753 @Override 754 public void call(RegionObserver observer) throws IOException { 755 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 756 } 757 }); 758 return builder.build(); 759 } 760 761 /** 762 * Invoked before compacting memstore. 763 */ 764 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 765 throws IOException { 766 if (coprocEnvironments.isEmpty()) { 767 return scanner; 768 } 769 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 770 regionObserverGetter, scanner) { 771 @Override 772 public InternalScanner call(RegionObserver observer) throws IOException { 773 return observer.preMemStoreCompactionCompact(this, store, getResult()); 774 } 775 }); 776 } 777 778 /** 779 * Invoked after in memory compaction. 780 */ 781 public void postMemStoreCompaction(HStore store) throws IOException { 782 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 783 @Override 784 public void call(RegionObserver observer) throws IOException { 785 observer.postMemStoreCompaction(this, store); 786 } 787 }); 788 } 789 790 /** 791 * Invoked after a memstore flush 792 */ 793 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 794 throws IOException { 795 if (coprocEnvironments.isEmpty()) { 796 return; 797 } 798 execOperation(new RegionObserverOperationWithoutResult() { 799 @Override 800 public void call(RegionObserver observer) throws IOException { 801 observer.postFlush(this, store, storeFile, tracker); 802 } 803 }); 804 } 805 806 // RegionObserver support 807 /** 808 * Supports Coprocessor 'bypass'. 809 * @param get the Get request 810 * @param results What to return if return is true/'bypass'. 811 * @return true if default processing should be bypassed. 812 * @exception IOException Exception 813 */ 814 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 815 if (coprocEnvironments.isEmpty()) { 816 return false; 817 } 818 boolean bypassable = true; 819 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 820 @Override 821 public void call(RegionObserver observer) throws IOException { 822 observer.preGetOp(this, get, results); 823 } 824 }); 825 } 826 827 /** 828 * @param get the Get request 829 * @param results the result set 830 * @exception IOException Exception 831 */ 832 public void postGet(final Get get, final List<Cell> results) throws IOException { 833 if (coprocEnvironments.isEmpty()) { 834 return; 835 } 836 execOperation(new RegionObserverOperationWithoutResult() { 837 @Override 838 public void call(RegionObserver observer) throws IOException { 839 observer.postGetOp(this, get, results); 840 } 841 }); 842 } 843 844 /** 845 * Supports Coprocessor 'bypass'. 846 * @param get the Get request 847 * @return true or false to return to client if bypassing normal operation, or null otherwise 848 * @exception IOException Exception 849 */ 850 public Boolean preExists(final Get get) throws IOException { 851 boolean bypassable = true; 852 boolean defaultResult = false; 853 if (coprocEnvironments.isEmpty()) { 854 return null; 855 } 856 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 857 regionObserverGetter, defaultResult, bypassable) { 858 @Override 859 public Boolean call(RegionObserver observer) throws IOException { 860 return observer.preExists(this, get, getResult()); 861 } 862 }); 863 } 864 865 /** 866 * @param get the Get request 867 * @param result the result returned by the region server 868 * @return the result to return to the client 869 * @exception IOException Exception 870 */ 871 public boolean postExists(final Get get, boolean result) throws IOException { 872 if (this.coprocEnvironments.isEmpty()) { 873 return result; 874 } 875 return execOperationWithResult( 876 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 877 @Override 878 public Boolean call(RegionObserver observer) throws IOException { 879 return observer.postExists(this, get, getResult()); 880 } 881 }); 882 } 883 884 /** 885 * Supports Coprocessor 'bypass'. 886 * @param put The Put object 887 * @param edit The WALEdit object. 888 * @return true if default processing should be bypassed 889 * @exception IOException Exception 890 */ 891 public boolean prePut(final Put put, final WALEdit edit) throws IOException { 892 if (coprocEnvironments.isEmpty()) { 893 return false; 894 } 895 boolean bypassable = true; 896 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 897 @Override 898 public void call(RegionObserver observer) throws IOException { 899 observer.prePut(this, put, edit); 900 } 901 }); 902 } 903 904 /** 905 * Supports Coprocessor 'bypass'. 906 * @param mutation - the current mutation 907 * @param kv - the current cell 908 * @param byteNow - current timestamp in bytes 909 * @param get - the get that could be used Note that the get only does not specify the family 910 * and qualifier that should be used 911 * @return true if default processing should be bypassed 912 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 913 * Coprocessor for its needs only. Will be removed. 914 */ 915 @Deprecated 916 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, final Cell kv, 917 final byte[] byteNow, final Get get) throws IOException { 918 if (coprocEnvironments.isEmpty()) { 919 return false; 920 } 921 boolean bypassable = true; 922 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 923 @Override 924 public void call(RegionObserver observer) throws IOException { 925 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 926 } 927 }); 928 } 929 930 /** 931 * @param put The Put object 932 * @param edit The WALEdit object. 933 * @exception IOException Exception 934 */ 935 public void postPut(final Put put, final WALEdit edit) throws IOException { 936 if (coprocEnvironments.isEmpty()) { 937 return; 938 } 939 execOperation(new RegionObserverOperationWithoutResult() { 940 @Override 941 public void call(RegionObserver observer) throws IOException { 942 observer.postPut(this, put, edit); 943 } 944 }); 945 } 946 947 /** 948 * Supports Coprocessor 'bypass'. 949 * @param delete The Delete object 950 * @param edit The WALEdit object. 951 * @return true if default processing should be bypassed 952 * @exception IOException Exception 953 */ 954 public boolean preDelete(final Delete delete, final WALEdit edit) throws IOException { 955 if (this.coprocEnvironments.isEmpty()) { 956 return false; 957 } 958 boolean bypassable = true; 959 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 960 @Override 961 public void call(RegionObserver observer) throws IOException { 962 observer.preDelete(this, delete, edit); 963 } 964 }); 965 } 966 967 /** 968 * @param delete The Delete object 969 * @param edit The WALEdit object. 970 * @exception IOException Exception 971 */ 972 public void postDelete(final Delete delete, final WALEdit edit) throws IOException { 973 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 974 @Override 975 public void call(RegionObserver observer) throws IOException { 976 observer.postDelete(this, delete, edit); 977 } 978 }); 979 } 980 981 public void preBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 982 throws IOException { 983 if (this.coprocEnvironments.isEmpty()) { 984 return; 985 } 986 execOperation(new RegionObserverOperationWithoutResult() { 987 @Override 988 public void call(RegionObserver observer) throws IOException { 989 observer.preBatchMutate(this, miniBatchOp); 990 } 991 }); 992 } 993 994 public void postBatchMutate(final MiniBatchOperationInProgress<Mutation> miniBatchOp) 995 throws IOException { 996 if (this.coprocEnvironments.isEmpty()) { 997 return; 998 } 999 execOperation(new RegionObserverOperationWithoutResult() { 1000 @Override 1001 public void call(RegionObserver observer) throws IOException { 1002 observer.postBatchMutate(this, miniBatchOp); 1003 } 1004 }); 1005 } 1006 1007 public void postBatchMutateIndispensably(final MiniBatchOperationInProgress<Mutation> miniBatchOp, 1008 final boolean success) throws IOException { 1009 if (this.coprocEnvironments.isEmpty()) { 1010 return; 1011 } 1012 execOperation(new RegionObserverOperationWithoutResult() { 1013 @Override 1014 public void call(RegionObserver observer) throws IOException { 1015 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1016 } 1017 }); 1018 } 1019 1020 /** 1021 * Supports Coprocessor 'bypass'. 1022 * @param checkAndMutate the CheckAndMutate object 1023 * @return true or false to return to client if default processing should be bypassed, or null 1024 * otherwise 1025 * @throws IOException if an error occurred on the coprocessor 1026 */ 1027 public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) throws IOException { 1028 boolean bypassable = true; 1029 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1030 if (coprocEnvironments.isEmpty()) { 1031 return null; 1032 } 1033 return execOperationWithResult( 1034 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1035 defaultResult, bypassable) { 1036 @Override 1037 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1038 return observer.preCheckAndMutate(this, checkAndMutate, getResult()); 1039 } 1040 }); 1041 } 1042 1043 /** 1044 * Supports Coprocessor 'bypass'. 1045 * @param checkAndMutate the CheckAndMutate object 1046 * @return true or false to return to client if default processing should be bypassed, or null 1047 * otherwise 1048 * @throws IOException if an error occurred on the coprocessor 1049 */ 1050 public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) 1051 throws IOException { 1052 boolean bypassable = true; 1053 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1054 if (coprocEnvironments.isEmpty()) { 1055 return null; 1056 } 1057 return execOperationWithResult( 1058 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1059 defaultResult, bypassable) { 1060 @Override 1061 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1062 return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); 1063 } 1064 }); 1065 } 1066 1067 /** 1068 * @param checkAndMutate the CheckAndMutate object 1069 * @param result the result returned by the checkAndMutate 1070 * @return true or false to return to client if default processing should be bypassed, or null 1071 * otherwise 1072 * @throws IOException if an error occurred on the coprocessor 1073 */ 1074 public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, 1075 CheckAndMutateResult result) throws IOException { 1076 if (this.coprocEnvironments.isEmpty()) { 1077 return result; 1078 } 1079 return execOperationWithResult( 1080 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>(regionObserverGetter, 1081 result) { 1082 @Override 1083 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1084 return observer.postCheckAndMutate(this, checkAndMutate, getResult()); 1085 } 1086 }); 1087 } 1088 1089 /** 1090 * Supports Coprocessor 'bypass'. 1091 * @param append append object 1092 * @param edit The WALEdit object. 1093 * @return result to return to client if default operation should be bypassed, null otherwise 1094 * @throws IOException if an error occurred on the coprocessor 1095 */ 1096 public Result preAppend(final Append append, final WALEdit edit) throws IOException { 1097 boolean bypassable = true; 1098 Result defaultResult = null; 1099 if (this.coprocEnvironments.isEmpty()) { 1100 return defaultResult; 1101 } 1102 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1103 regionObserverGetter, defaultResult, bypassable) { 1104 @Override 1105 public Result call(RegionObserver observer) throws IOException { 1106 return observer.preAppend(this, append, edit); 1107 } 1108 }); 1109 } 1110 1111 /** 1112 * Supports Coprocessor 'bypass'. 1113 * @param append append object 1114 * @return result to return to client if default operation should be bypassed, null otherwise 1115 * @throws IOException if an error occurred on the coprocessor 1116 */ 1117 public Result preAppendAfterRowLock(final Append append) throws IOException { 1118 boolean bypassable = true; 1119 Result defaultResult = null; 1120 if (this.coprocEnvironments.isEmpty()) { 1121 return defaultResult; 1122 } 1123 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1124 regionObserverGetter, defaultResult, bypassable) { 1125 @Override 1126 public Result call(RegionObserver observer) throws IOException { 1127 return observer.preAppendAfterRowLock(this, append); 1128 } 1129 }); 1130 } 1131 1132 /** 1133 * Supports Coprocessor 'bypass'. 1134 * @param increment increment object 1135 * @param edit The WALEdit object. 1136 * @return result to return to client if default operation should be bypassed, null otherwise 1137 * @throws IOException if an error occurred on the coprocessor 1138 */ 1139 public Result preIncrement(final Increment increment, final WALEdit edit) throws IOException { 1140 boolean bypassable = true; 1141 Result defaultResult = null; 1142 if (coprocEnvironments.isEmpty()) { 1143 return defaultResult; 1144 } 1145 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1146 regionObserverGetter, defaultResult, bypassable) { 1147 @Override 1148 public Result call(RegionObserver observer) throws IOException { 1149 return observer.preIncrement(this, increment, edit); 1150 } 1151 }); 1152 } 1153 1154 /** 1155 * Supports Coprocessor 'bypass'. 1156 * @param increment increment object 1157 * @return result to return to client if default operation should be bypassed, null otherwise 1158 * @throws IOException if an error occurred on the coprocessor 1159 */ 1160 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1161 boolean bypassable = true; 1162 Result defaultResult = null; 1163 if (coprocEnvironments.isEmpty()) { 1164 return defaultResult; 1165 } 1166 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Result>( 1167 regionObserverGetter, defaultResult, bypassable) { 1168 @Override 1169 public Result call(RegionObserver observer) throws IOException { 1170 return observer.preIncrementAfterRowLock(this, increment); 1171 } 1172 }); 1173 } 1174 1175 /** 1176 * @param append Append object 1177 * @param result the result returned by the append 1178 * @param edit The WALEdit object. 1179 * @throws IOException if an error occurred on the coprocessor 1180 */ 1181 public Result postAppend(final Append append, final Result result, final WALEdit edit) 1182 throws IOException { 1183 if (this.coprocEnvironments.isEmpty()) { 1184 return result; 1185 } 1186 return execOperationWithResult( 1187 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1188 @Override 1189 public Result call(RegionObserver observer) throws IOException { 1190 return observer.postAppend(this, append, result, edit); 1191 } 1192 }); 1193 } 1194 1195 /** 1196 * @param increment increment object 1197 * @param result the result returned by postIncrement 1198 * @param edit The WALEdit object. 1199 * @throws IOException if an error occurred on the coprocessor 1200 */ 1201 public Result postIncrement(final Increment increment, Result result, final WALEdit edit) 1202 throws IOException { 1203 if (this.coprocEnvironments.isEmpty()) { 1204 return result; 1205 } 1206 return execOperationWithResult( 1207 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1208 @Override 1209 public Result call(RegionObserver observer) throws IOException { 1210 return observer.postIncrement(this, increment, getResult(), edit); 1211 } 1212 }); 1213 } 1214 1215 /** 1216 * @param scan the Scan specification 1217 * @exception IOException Exception 1218 */ 1219 public void preScannerOpen(final Scan scan) throws IOException { 1220 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1221 @Override 1222 public void call(RegionObserver observer) throws IOException { 1223 observer.preScannerOpen(this, scan); 1224 } 1225 }); 1226 } 1227 1228 /** 1229 * @param scan the Scan specification 1230 * @param s the scanner 1231 * @return the scanner instance to use 1232 * @exception IOException Exception 1233 */ 1234 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1235 if (this.coprocEnvironments.isEmpty()) { 1236 return s; 1237 } 1238 return execOperationWithResult( 1239 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1240 @Override 1241 public RegionScanner call(RegionObserver observer) throws IOException { 1242 return observer.postScannerOpen(this, scan, getResult()); 1243 } 1244 }); 1245 } 1246 1247 /** 1248 * @param s the scanner 1249 * @param results the result set returned by the region server 1250 * @param limit the maximum number of results to return 1251 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1252 * @exception IOException Exception 1253 */ 1254 public Boolean preScannerNext(final InternalScanner s, final List<Result> results, 1255 final int limit) throws IOException { 1256 boolean bypassable = true; 1257 boolean defaultResult = false; 1258 if (coprocEnvironments.isEmpty()) { 1259 return null; 1260 } 1261 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1262 regionObserverGetter, defaultResult, bypassable) { 1263 @Override 1264 public Boolean call(RegionObserver observer) throws IOException { 1265 return observer.preScannerNext(this, s, results, limit, getResult()); 1266 } 1267 }); 1268 } 1269 1270 /** 1271 * @param s the scanner 1272 * @param results the result set returned by the region server 1273 * @param limit the maximum number of results to return 1274 * @return 'has more' indication to give to client 1275 * @exception IOException Exception 1276 */ 1277 public boolean postScannerNext(final InternalScanner s, final List<Result> results, 1278 final int limit, boolean hasMore) throws IOException { 1279 if (this.coprocEnvironments.isEmpty()) { 1280 return hasMore; 1281 } 1282 return execOperationWithResult( 1283 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1284 @Override 1285 public Boolean call(RegionObserver observer) throws IOException { 1286 return observer.postScannerNext(this, s, results, limit, getResult()); 1287 } 1288 }); 1289 } 1290 1291 /** 1292 * This will be called by the scan flow when the current scanned row is being filtered out by the 1293 * filter. 1294 * @param s the scanner 1295 * @param curRowCell The cell in the current row which got filtered out 1296 * @return whether more rows are available for the scanner or not 1297 */ 1298 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1299 throws IOException { 1300 // short circuit for performance 1301 boolean defaultResult = true; 1302 if (!hasCustomPostScannerFilterRow) { 1303 return defaultResult; 1304 } 1305 if (this.coprocEnvironments.isEmpty()) { 1306 return defaultResult; 1307 } 1308 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1309 regionObserverGetter, defaultResult) { 1310 @Override 1311 public Boolean call(RegionObserver observer) throws IOException { 1312 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1313 } 1314 }); 1315 } 1316 1317 /** 1318 * Supports Coprocessor 'bypass'. 1319 * @param s the scanner 1320 * @return true if default behavior should be bypassed, false otherwise 1321 * @exception IOException Exception 1322 */ 1323 // Should this be bypassable? 1324 public boolean preScannerClose(final InternalScanner s) throws IOException { 1325 return execOperation( 1326 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1327 @Override 1328 public void call(RegionObserver observer) throws IOException { 1329 observer.preScannerClose(this, s); 1330 } 1331 }); 1332 } 1333 1334 /** 1335 * @exception IOException Exception 1336 */ 1337 public void postScannerClose(final InternalScanner s) throws IOException { 1338 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1339 @Override 1340 public void call(RegionObserver observer) throws IOException { 1341 observer.postScannerClose(this, s); 1342 } 1343 }); 1344 } 1345 1346 /** 1347 * Called before open store scanner for user scan. 1348 */ 1349 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1350 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1351 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1352 execOperation(new RegionObserverOperationWithoutResult() { 1353 @Override 1354 public void call(RegionObserver observer) throws IOException { 1355 observer.preStoreScannerOpen(this, store, builder); 1356 } 1357 }); 1358 return builder.build(); 1359 } 1360 1361 /** 1362 * @param info the RegionInfo for this region 1363 * @param edits the file of recovered edits 1364 */ 1365 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1366 execOperation( 1367 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1368 @Override 1369 public void call(RegionObserver observer) throws IOException { 1370 observer.preReplayWALs(this, info, edits); 1371 } 1372 }); 1373 } 1374 1375 /** 1376 * @param info the RegionInfo for this region 1377 * @param edits the file of recovered edits 1378 * @throws IOException Exception 1379 */ 1380 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1381 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1382 @Override 1383 public void call(RegionObserver observer) throws IOException { 1384 observer.postReplayWALs(this, info, edits); 1385 } 1386 }); 1387 } 1388 1389 /** 1390 * Supports Coprocessor 'bypass'. 1391 * @return true if default behavior should be bypassed, false otherwise 1392 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with 1393 * something that doesn't expose IntefaceAudience.Private classes. 1394 */ 1395 @Deprecated 1396 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1397 throws IOException { 1398 return execOperation( 1399 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { 1400 @Override 1401 public void call(RegionObserver observer) throws IOException { 1402 observer.preWALRestore(this, info, logKey, logEdit); 1403 } 1404 }); 1405 } 1406 1407 /** 1408 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with 1409 * something that doesn't expose IntefaceAudience.Private classes. 1410 */ 1411 @Deprecated 1412 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1413 throws IOException { 1414 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1415 @Override 1416 public void call(RegionObserver observer) throws IOException { 1417 observer.postWALRestore(this, info, logKey, logEdit); 1418 } 1419 }); 1420 } 1421 1422 /** 1423 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1424 */ 1425 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1426 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1427 @Override 1428 public void call(RegionObserver observer) throws IOException { 1429 observer.preBulkLoadHFile(this, familyPaths); 1430 } 1431 }); 1432 } 1433 1434 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1435 throws IOException { 1436 return execOperation( 1437 coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1438 @Override 1439 public void call(RegionObserver observer) throws IOException { 1440 observer.preCommitStoreFile(this, family, pairs); 1441 } 1442 }); 1443 } 1444 1445 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) 1446 throws IOException { 1447 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1448 @Override 1449 public void call(RegionObserver observer) throws IOException { 1450 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1451 } 1452 }); 1453 } 1454 1455 /** 1456 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1457 * @param map Map of CF to List of file paths for the final loaded files 1458 */ 1459 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1460 Map<byte[], List<Path>> map) throws IOException { 1461 if (this.coprocEnvironments.isEmpty()) { 1462 return; 1463 } 1464 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1465 @Override 1466 public void call(RegionObserver observer) throws IOException { 1467 observer.postBulkLoadHFile(this, familyPaths, map); 1468 } 1469 }); 1470 } 1471 1472 public void postStartRegionOperation(final Operation op) throws IOException { 1473 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1474 @Override 1475 public void call(RegionObserver observer) throws IOException { 1476 observer.postStartRegionOperation(this, op); 1477 } 1478 }); 1479 } 1480 1481 public void postCloseRegionOperation(final Operation op) throws IOException { 1482 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 1483 @Override 1484 public void call(RegionObserver observer) throws IOException { 1485 observer.postCloseRegionOperation(this, op); 1486 } 1487 }); 1488 } 1489 1490 /** 1491 * @param fs fileystem to read from 1492 * @param p path to the file 1493 * @param in {@link FSDataInputStreamWrapper} 1494 * @param size Full size of the file 1495 * @param r original reference file. This will be not null only when reading a split file. 1496 * @return a Reader instance to use instead of the base reader if overriding default behavior, 1497 * null otherwise 1498 */ 1499 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1500 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1501 final Reference r) throws IOException { 1502 if (coprocEnvironments.isEmpty()) { 1503 return null; 1504 } 1505 return execOperationWithResult( 1506 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1507 @Override 1508 public StoreFileReader call(RegionObserver observer) throws IOException { 1509 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1510 } 1511 }); 1512 } 1513 1514 /** 1515 * @param fs fileystem to read from 1516 * @param p path to the file 1517 * @param in {@link FSDataInputStreamWrapper} 1518 * @param size Full size of the file 1519 * @param r original reference file. This will be not null only when reading a split file. 1520 * @param reader the base reader instance 1521 * @return The reader to use 1522 */ 1523 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1524 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1525 final Reference r, final StoreFileReader reader) throws IOException { 1526 if (this.coprocEnvironments.isEmpty()) { 1527 return reader; 1528 } 1529 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, StoreFileReader>( 1530 regionObserverGetter, reader) { 1531 @Override 1532 public StoreFileReader call(RegionObserver observer) throws IOException { 1533 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, getResult()); 1534 } 1535 }); 1536 } 1537 1538 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1539 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1540 if (this.coprocEnvironments.isEmpty()) { 1541 return cellPairs; 1542 } 1543 return execOperationWithResult( 1544 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1545 cellPairs) { 1546 @Override 1547 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1548 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1549 } 1550 }); 1551 } 1552 1553 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1554 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1555 if (this.coprocEnvironments.isEmpty()) { 1556 return cellPairs; 1557 } 1558 return execOperationWithResult( 1559 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(regionObserverGetter, 1560 cellPairs) { 1561 @Override 1562 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1563 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1564 } 1565 }); 1566 } 1567 1568 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1569 if (this.coprocEnvironments.isEmpty()) { 1570 return; 1571 } 1572 execOperation(new RegionObserverOperationWithoutResult() { 1573 @Override 1574 public void call(RegionObserver observer) throws IOException { 1575 observer.preWALAppend(this, key, edit); 1576 } 1577 }); 1578 } 1579 1580 public Message preEndpointInvocation(final Service service, final String methodName, 1581 Message request) throws IOException { 1582 if (coprocEnvironments.isEmpty()) { 1583 return request; 1584 } 1585 return execOperationWithResult( 1586 new ObserverOperationWithResult<EndpointObserver, Message>(endpointObserverGetter, request) { 1587 @Override 1588 public Message call(EndpointObserver observer) throws IOException { 1589 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1590 } 1591 }); 1592 } 1593 1594 public void postEndpointInvocation(final Service service, final String methodName, 1595 final Message request, final Message.Builder responseBuilder) throws IOException { 1596 execOperation(coprocEnvironments.isEmpty() 1597 ? null 1598 : new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1599 @Override 1600 public void call(EndpointObserver observer) throws IOException { 1601 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1602 } 1603 }); 1604 } 1605 1606 /** 1607 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1608 */ 1609 @Deprecated 1610 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1611 if (this.coprocEnvironments.isEmpty()) { 1612 return result; 1613 } 1614 return execOperationWithResult( 1615 new ObserverOperationWithResult<RegionObserver, DeleteTracker>(regionObserverGetter, result) { 1616 @Override 1617 public DeleteTracker call(RegionObserver observer) throws IOException { 1618 return observer.postInstantiateDeleteTracker(this, getResult()); 1619 } 1620 }); 1621 } 1622 1623 ///////////////////////////////////////////////////////////////////////////////////////////////// 1624 // BulkLoadObserver hooks 1625 ///////////////////////////////////////////////////////////////////////////////////////////////// 1626 public void prePrepareBulkLoad(User user) throws IOException { 1627 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1628 @Override 1629 protected void call(BulkLoadObserver observer) throws IOException { 1630 observer.prePrepareBulkLoad(this); 1631 } 1632 }); 1633 } 1634 1635 public void preCleanupBulkLoad(User user) throws IOException { 1636 execOperation(coprocEnvironments.isEmpty() ? null : new BulkLoadObserverOperation(user) { 1637 @Override 1638 protected void call(BulkLoadObserver observer) throws IOException { 1639 observer.preCleanupBulkLoad(this); 1640 } 1641 }); 1642 } 1643}