001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import com.google.protobuf.Message; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.RawCellBuilder; 040import org.apache.hadoop.hbase.RawCellBuilderFactory; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.SharedConnection; 043import org.apache.hadoop.hbase.client.Append; 044import org.apache.hadoop.hbase.client.CheckAndMutate; 045import org.apache.hadoop.hbase.client.CheckAndMutateResult; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.Delete; 048import org.apache.hadoop.hbase.client.Durability; 049import org.apache.hadoop.hbase.client.Get; 050import org.apache.hadoop.hbase.client.Increment; 051import org.apache.hadoop.hbase.client.Mutation; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.Result; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 058import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 061import org.apache.hadoop.hbase.coprocessor.CoprocessorService; 062import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 063import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 064import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 065import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 066import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 067import org.apache.hadoop.hbase.coprocessor.ObserverContext; 068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 069import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 070import org.apache.hadoop.hbase.coprocessor.RegionObserver; 071import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 072import org.apache.hadoop.hbase.io.Reference; 073import org.apache.hadoop.hbase.io.hfile.CacheConfig; 074import org.apache.hadoop.hbase.metrics.MetricRegistry; 075import org.apache.hadoop.hbase.regionserver.Region.Operation; 076import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 078import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 079import org.apache.hadoop.hbase.security.User; 080import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 081import org.apache.hadoop.hbase.util.Pair; 082import org.apache.hadoop.hbase.wal.WALEdit; 083import org.apache.hadoop.hbase.wal.WALKey; 084import org.apache.yetus.audience.InterfaceAudience; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 090 091/** 092 * Implements the coprocessor environment and runtime support for coprocessors 093 * loaded within a {@link Region}. 094 */ 095@InterfaceAudience.Private 096public class RegionCoprocessorHost 097 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 098 099 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 100 // The shared data map 101 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 102 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 103 AbstractReferenceMap.ReferenceStrength.WEAK); 104 105 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 106 private final boolean hasCustomPostScannerFilterRow; 107 108 /** 109 * 110 * Encapsulation of the environment of each coprocessor 111 */ 112 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 113 implements RegionCoprocessorEnvironment { 114 private Region region; 115 ConcurrentMap<String, Object> sharedData; 116 private final MetricRegistry metricRegistry; 117 private final RegionServerServices services; 118 119 /** 120 * Constructor 121 * @param impl the coprocessor instance 122 * @param priority chaining priority 123 */ 124 public RegionEnvironment(final RegionCoprocessor impl, final int priority, 125 final int seq, final Configuration conf, final Region region, 126 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 127 super(impl, priority, seq, conf); 128 this.region = region; 129 this.sharedData = sharedData; 130 this.services = services; 131 this.metricRegistry = 132 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 133 } 134 135 /** @return the region */ 136 @Override 137 public Region getRegion() { 138 return region; 139 } 140 141 @Override 142 public OnlineRegions getOnlineRegions() { 143 return this.services; 144 } 145 146 @Override 147 public Connection getConnection() { 148 // Mocks may have services as null at test time. 149 return services != null ? new SharedConnection(services.getConnection()) : null; 150 } 151 152 @Override 153 public Connection createConnection(Configuration conf) throws IOException { 154 return services != null ? this.services.createConnection(conf) : null; 155 } 156 157 @Override 158 public ServerName getServerName() { 159 return services != null? services.getServerName(): null; 160 } 161 162 @Override 163 public void shutdown() { 164 super.shutdown(); 165 MetricsCoprocessor.removeRegistry(this.metricRegistry); 166 } 167 168 @Override 169 public ConcurrentMap<String, Object> getSharedData() { 170 return sharedData; 171 } 172 173 @Override 174 public RegionInfo getRegionInfo() { 175 return region.getRegionInfo(); 176 } 177 178 @Override 179 public MetricRegistry getMetricRegistryForRegionServer() { 180 return metricRegistry; 181 } 182 183 @Override 184 public RawCellBuilder getCellBuilder() { 185 // We always do a DEEP_COPY only 186 return RawCellBuilderFactory.create(); 187 } 188 } 189 190 /** 191 * Special version of RegionEnvironment that exposes RegionServerServices for Core 192 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 193 */ 194 private static class RegionEnvironmentForCoreCoprocessors extends 195 RegionEnvironment implements HasRegionServerServices { 196 private final RegionServerServices rsServices; 197 198 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 199 final int seq, final Configuration conf, final Region region, 200 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 201 super(impl, priority, seq, conf, region, services, sharedData); 202 this.rsServices = services; 203 } 204 205 /** 206 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 207 * consumption. 208 */ 209 @Override 210 public RegionServerServices getRegionServerServices() { 211 return this.rsServices; 212 } 213 } 214 215 static class TableCoprocessorAttribute { 216 private Path path; 217 private String className; 218 private int priority; 219 private Configuration conf; 220 221 public TableCoprocessorAttribute(Path path, String className, int priority, 222 Configuration conf) { 223 this.path = path; 224 this.className = className; 225 this.priority = priority; 226 this.conf = conf; 227 } 228 229 public Path getPath() { 230 return path; 231 } 232 233 public String getClassName() { 234 return className; 235 } 236 237 public int getPriority() { 238 return priority; 239 } 240 241 public Configuration getConf() { 242 return conf; 243 } 244 } 245 246 /** The region server services */ 247 RegionServerServices rsServices; 248 /** The region */ 249 HRegion region; 250 251 /** 252 * Constructor 253 * @param region the region 254 * @param rsServices interface to available region server functionality 255 * @param conf the configuration 256 */ 257 public RegionCoprocessorHost(final HRegion region, 258 final RegionServerServices rsServices, final Configuration conf) { 259 super(rsServices); 260 this.conf = conf; 261 this.rsServices = rsServices; 262 this.region = region; 263 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 264 265 // load system default cp's from configuration. 266 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 267 268 // load system default cp's for user tables from configuration. 269 if (!region.getRegionInfo().getTable().isSystemTable()) { 270 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 271 } 272 273 // load Coprocessor From HDFS 274 loadTableCoprocessors(conf); 275 276 // now check whether any coprocessor implements postScannerFilterRow 277 boolean hasCustomPostScannerFilterRow = false; 278 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { 279 if (env.getInstance() instanceof RegionObserver) { 280 Class<?> clazz = env.getInstance().getClass(); 281 for(;;) { 282 if (clazz == null) { 283 // we must have directly implemented RegionObserver 284 hasCustomPostScannerFilterRow = true; 285 break out; 286 } 287 try { 288 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 289 InternalScanner.class, Cell.class, boolean.class); 290 // this coprocessor has a custom version of postScannerFilterRow 291 hasCustomPostScannerFilterRow = true; 292 break out; 293 } catch (NoSuchMethodException ignore) { 294 } 295 // the deprecated signature still exists 296 try { 297 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 298 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 299 // this coprocessor has a custom version of postScannerFilterRow 300 hasCustomPostScannerFilterRow = true; 301 break out; 302 } catch (NoSuchMethodException ignore) { 303 } 304 clazz = clazz.getSuperclass(); 305 } 306 } 307 } 308 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 309 } 310 311 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 312 TableDescriptor htd) { 313 return htd.getCoprocessorDescriptors().stream().map(cp -> { 314 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 315 Configuration ourConf; 316 if (!cp.getProperties().isEmpty()) { 317 // do an explicit deep copy of the passed configuration 318 ourConf = new Configuration(false); 319 HBaseConfiguration.merge(ourConf, conf); 320 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 321 } else { 322 ourConf = conf; 323 } 324 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 325 }).collect(Collectors.toList()); 326 } 327 328 /** 329 * Sanity check the table coprocessor attributes of the supplied schema. Will 330 * throw an exception if there is a problem. 331 * @param conf 332 * @param htd 333 * @throws IOException 334 */ 335 public static void testTableCoprocessorAttrs(final Configuration conf, 336 final TableDescriptor htd) throws IOException { 337 String pathPrefix = UUID.randomUUID().toString(); 338 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { 339 if (attr.getPriority() < 0) { 340 throw new IOException("Priority for coprocessor " + attr.getClassName() + 341 " cannot be less than 0"); 342 } 343 ClassLoader old = Thread.currentThread().getContextClassLoader(); 344 try { 345 ClassLoader cl; 346 if (attr.getPath() != null) { 347 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 348 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 349 } else { 350 cl = CoprocessorHost.class.getClassLoader(); 351 } 352 Thread.currentThread().setContextClassLoader(cl); 353 if (cl instanceof CoprocessorClassLoader) { 354 String[] includedClassPrefixes = null; 355 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 356 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 357 includedClassPrefixes = prefixes.split(";"); 358 } 359 ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes); 360 } else { 361 cl.loadClass(attr.getClassName()); 362 } 363 } catch (ClassNotFoundException e) { 364 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 365 } finally { 366 Thread.currentThread().setContextClassLoader(old); 367 } 368 } 369 } 370 371 void loadTableCoprocessors(final Configuration conf) { 372 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 373 DEFAULT_COPROCESSORS_ENABLED); 374 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 375 DEFAULT_USER_COPROCESSORS_ENABLED); 376 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 377 return; 378 } 379 380 // scan the table attributes for coprocessor load specifications 381 // initialize the coprocessors 382 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 383 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 384 region.getTableDescriptor())) { 385 // Load encompasses classloading and coprocessor initialization 386 try { 387 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), 388 attr.getPriority(), attr.getConf()); 389 if (env == null) { 390 continue; 391 } 392 configured.add(env); 393 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + 394 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 395 } catch (Throwable t) { 396 // Coprocessor failed to load, do we abort on error? 397 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 398 abortServer(attr.getClassName(), t); 399 } else { 400 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 401 } 402 } 403 } 404 // add together to coprocessor set for COW efficiency 405 coprocEnvironments.addAll(configured); 406 } 407 408 @Override 409 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 410 Configuration conf) { 411 // If coprocessor exposes any services, register them. 412 for (Service service : instance.getServices()) { 413 region.registerService(service); 414 } 415 ConcurrentMap<String, Object> classData; 416 // make sure only one thread can add maps 417 synchronized (SHARED_DATA_MAP) { 418 // as long as at least one RegionEnvironment holds on to its classData it will 419 // remain in this map 420 classData = 421 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 422 k -> new ConcurrentHashMap<>()); 423 } 424 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 425 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 426 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, 427 rsServices, classData): 428 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 429 } 430 431 @Override 432 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 433 throws InstantiationException, IllegalAccessException { 434 try { 435 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 436 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 437 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { 438 // For backward compatibility with old CoprocessorService impl which don't extend 439 // RegionCoprocessor. 440 CoprocessorService cs; 441 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); 442 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); 443 } else { 444 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 445 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 446 return null; 447 } 448 } catch (NoSuchMethodException | InvocationTargetException e) { 449 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 450 } 451 } 452 453 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 454 RegionCoprocessor::getRegionObserver; 455 456 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 457 RegionCoprocessor::getEndpointObserver; 458 459 abstract class RegionObserverOperationWithoutResult extends 460 ObserverOperationWithoutResult<RegionObserver> { 461 public RegionObserverOperationWithoutResult() { 462 super(regionObserverGetter); 463 } 464 465 public RegionObserverOperationWithoutResult(User user) { 466 super(regionObserverGetter, user); 467 } 468 469 public RegionObserverOperationWithoutResult(boolean bypassable) { 470 super(regionObserverGetter, null, bypassable); 471 } 472 473 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 474 super(regionObserverGetter, user, bypassable); 475 } 476 } 477 478 abstract class BulkLoadObserverOperation extends 479 ObserverOperationWithoutResult<BulkLoadObserver> { 480 public BulkLoadObserverOperation(User user) { 481 super(RegionCoprocessor::getBulkLoadObserver, user); 482 } 483 } 484 485 486 ////////////////////////////////////////////////////////////////////////////////////////////////// 487 // Observer operations 488 ////////////////////////////////////////////////////////////////////////////////////////////////// 489 490 ////////////////////////////////////////////////////////////////////////////////////////////////// 491 // Observer operations 492 ////////////////////////////////////////////////////////////////////////////////////////////////// 493 494 /** 495 * Invoked before a region open. 496 * 497 * @throws IOException Signals that an I/O exception has occurred. 498 */ 499 public void preOpen() throws IOException { 500 if (coprocEnvironments.isEmpty()) { 501 return; 502 } 503 execOperation(new RegionObserverOperationWithoutResult() { 504 @Override 505 public void call(RegionObserver observer) throws IOException { 506 observer.preOpen(this); 507 } 508 }); 509 } 510 511 512 /** 513 * Invoked after a region open 514 */ 515 public void postOpen() { 516 if (coprocEnvironments.isEmpty()) { 517 return; 518 } 519 try { 520 execOperation(new RegionObserverOperationWithoutResult() { 521 @Override 522 public void call(RegionObserver observer) throws IOException { 523 observer.postOpen(this); 524 } 525 }); 526 } catch (IOException e) { 527 LOG.warn(e.toString(), e); 528 } 529 } 530 531 /** 532 * Invoked before a region is closed 533 * @param abortRequested true if the server is aborting 534 */ 535 public void preClose(final boolean abortRequested) throws IOException { 536 execOperation(new RegionObserverOperationWithoutResult() { 537 @Override 538 public void call(RegionObserver observer) throws IOException { 539 observer.preClose(this, abortRequested); 540 } 541 }); 542 } 543 544 /** 545 * Invoked after a region is closed 546 * @param abortRequested true if the server is aborting 547 */ 548 public void postClose(final boolean abortRequested) { 549 try { 550 execOperation(new RegionObserverOperationWithoutResult() { 551 @Override 552 public void call(RegionObserver observer) throws IOException { 553 observer.postClose(this, abortRequested); 554 } 555 556 @Override 557 public void postEnvCall() { 558 shutdown(this.getEnvironment()); 559 } 560 }); 561 } catch (IOException e) { 562 LOG.warn(e.toString(), e); 563 } 564 } 565 566 /** 567 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 568 * available candidates. 569 * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed 570 * the passed in <code>candidates</code>. 571 * @param store The store where compaction is being requested 572 * @param candidates The currently available store files 573 * @param tracker used to track the life cycle of a compaction 574 * @param user the user 575 * @throws IOException 576 */ 577 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 578 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 579 if (coprocEnvironments.isEmpty()) { 580 return false; 581 } 582 boolean bypassable = true; 583 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 584 @Override 585 public void call(RegionObserver observer) throws IOException { 586 observer.preCompactSelection(this, store, candidates, tracker); 587 } 588 }); 589 } 590 591 /** 592 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 593 * candidates. 594 * @param store The store where compaction is being requested 595 * @param selected The store files selected to compact 596 * @param tracker used to track the life cycle of a compaction 597 * @param request the compaction request 598 * @param user the user 599 */ 600 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 601 final CompactionLifeCycleTracker tracker, final CompactionRequest request, 602 final User user) throws IOException { 603 if (coprocEnvironments.isEmpty()) { 604 return; 605 } 606 execOperation(new RegionObserverOperationWithoutResult(user) { 607 @Override 608 public void call(RegionObserver observer) throws IOException { 609 observer.postCompactSelection(this, store, selected, tracker, request); 610 } 611 }); 612 } 613 614 /** 615 * Called prior to opening store scanner for compaction. 616 */ 617 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 618 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 619 if (coprocEnvironments.isEmpty()) { 620 return store.getScanInfo(); 621 } 622 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 623 execOperation(new RegionObserverOperationWithoutResult(user) { 624 @Override 625 public void call(RegionObserver observer) throws IOException { 626 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 627 } 628 }); 629 return builder.build(); 630 } 631 632 /** 633 * Called prior to rewriting the store files selected for compaction 634 * @param store the store being compacted 635 * @param scanner the scanner used to read store data during compaction 636 * @param scanType type of Scan 637 * @param tracker used to track the life cycle of a compaction 638 * @param request the compaction request 639 * @param user the user 640 * @return Scanner to use (cannot be null!) 641 * @throws IOException 642 */ 643 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 644 final ScanType scanType, final CompactionLifeCycleTracker tracker, 645 final CompactionRequest request, final User user) throws IOException { 646 InternalScanner defaultResult = scanner; 647 if (coprocEnvironments.isEmpty()) { 648 return defaultResult; 649 } 650 return execOperationWithResult( 651 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, 652 defaultResult, user) { 653 @Override 654 public InternalScanner call(RegionObserver observer) throws IOException { 655 InternalScanner scanner = 656 observer.preCompact(this, store, getResult(), scanType, tracker, request); 657 if (scanner == null) { 658 throw new CoprocessorException("Null Scanner return disallowed!"); 659 } 660 return scanner; 661 } 662 }); 663 } 664 665 /** 666 * Called after the store compaction has completed. 667 * @param store the store being compacted 668 * @param resultFile the new store file written during compaction 669 * @param tracker used to track the life cycle of a compaction 670 * @param request the compaction request 671 * @param user the user 672 * @throws IOException 673 */ 674 public void postCompact(final HStore store, final HStoreFile resultFile, 675 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 676 throws IOException { 677 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { 678 @Override 679 public void call(RegionObserver observer) throws IOException { 680 observer.postCompact(this, store, resultFile, tracker, request); 681 } 682 }); 683 } 684 685 /** 686 * Invoked before create StoreScanner for flush. 687 */ 688 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 689 throws IOException { 690 if (coprocEnvironments.isEmpty()) { 691 return store.getScanInfo(); 692 } 693 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 694 execOperation(new RegionObserverOperationWithoutResult() { 695 @Override 696 public void call(RegionObserver observer) throws IOException { 697 observer.preFlushScannerOpen(this, store, builder, tracker); 698 } 699 }); 700 return builder.build(); 701 } 702 703 /** 704 * Invoked before a memstore flush 705 * @return Scanner to use (cannot be null!) 706 * @throws IOException 707 */ 708 public InternalScanner preFlush(HStore store, InternalScanner scanner, 709 FlushLifeCycleTracker tracker) throws IOException { 710 if (coprocEnvironments.isEmpty()) { 711 return scanner; 712 } 713 return execOperationWithResult( 714 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) { 715 @Override 716 public InternalScanner call(RegionObserver observer) throws IOException { 717 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 718 if (scanner == null) { 719 throw new CoprocessorException("Null Scanner return disallowed!"); 720 } 721 return scanner; 722 } 723 }); 724 } 725 726 /** 727 * Invoked before a memstore flush 728 * @throws IOException 729 */ 730 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 731 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 732 @Override 733 public void call(RegionObserver observer) throws IOException { 734 observer.preFlush(this, tracker); 735 } 736 }); 737 } 738 739 /** 740 * Invoked after a memstore flush 741 * @throws IOException 742 */ 743 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 744 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 745 @Override 746 public void call(RegionObserver observer) throws IOException { 747 observer.postFlush(this, tracker); 748 } 749 }); 750 } 751 752 /** 753 * Invoked before in memory compaction. 754 */ 755 public void preMemStoreCompaction(HStore store) throws IOException { 756 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 757 @Override 758 public void call(RegionObserver observer) throws IOException { 759 observer.preMemStoreCompaction(this, store); 760 } 761 }); 762 } 763 764 /** 765 * Invoked before create StoreScanner for in memory compaction. 766 */ 767 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 768 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 769 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 770 @Override 771 public void call(RegionObserver observer) throws IOException { 772 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 773 } 774 }); 775 return builder.build(); 776 } 777 778 /** 779 * Invoked before compacting memstore. 780 */ 781 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 782 throws IOException { 783 if (coprocEnvironments.isEmpty()) { 784 return scanner; 785 } 786 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 787 regionObserverGetter, scanner) { 788 @Override 789 public InternalScanner call(RegionObserver observer) throws IOException { 790 return observer.preMemStoreCompactionCompact(this, store, getResult()); 791 } 792 }); 793 } 794 795 /** 796 * Invoked after in memory compaction. 797 */ 798 public void postMemStoreCompaction(HStore store) throws IOException { 799 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 800 @Override 801 public void call(RegionObserver observer) throws IOException { 802 observer.postMemStoreCompaction(this, store); 803 } 804 }); 805 } 806 807 /** 808 * Invoked after a memstore flush 809 * @throws IOException 810 */ 811 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 812 throws IOException { 813 if (coprocEnvironments.isEmpty()) { 814 return; 815 } 816 execOperation(new RegionObserverOperationWithoutResult() { 817 @Override 818 public void call(RegionObserver observer) throws IOException { 819 observer.postFlush(this, store, storeFile, tracker); 820 } 821 }); 822 } 823 824 // RegionObserver support 825 /** 826 * Supports Coprocessor 'bypass'. 827 * @param get the Get request 828 * @param results What to return if return is true/'bypass'. 829 * @return true if default processing should be bypassed. 830 * @exception IOException Exception 831 */ 832 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 833 if (coprocEnvironments.isEmpty()) { 834 return false; 835 } 836 boolean bypassable = true; 837 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 838 @Override 839 public void call(RegionObserver observer) throws IOException { 840 observer.preGetOp(this, get, results); 841 } 842 }); 843 } 844 845 /** 846 * @param get the Get request 847 * @param results the result set 848 * @exception IOException Exception 849 */ 850 public void postGet(final Get get, final List<Cell> results) 851 throws IOException { 852 if (coprocEnvironments.isEmpty()) { 853 return; 854 } 855 execOperation(new RegionObserverOperationWithoutResult() { 856 @Override 857 public void call(RegionObserver observer) throws IOException { 858 observer.postGetOp(this, get, results); 859 } 860 }); 861 } 862 863 /** 864 * Supports Coprocessor 'bypass'. 865 * @param get the Get request 866 * @return true or false to return to client if bypassing normal operation, or null otherwise 867 * @exception IOException Exception 868 */ 869 public Boolean preExists(final Get get) throws IOException { 870 boolean bypassable = true; 871 boolean defaultResult = false; 872 if (coprocEnvironments.isEmpty()) { 873 return null; 874 } 875 return execOperationWithResult( 876 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 877 defaultResult, bypassable) { 878 @Override 879 public Boolean call(RegionObserver observer) throws IOException { 880 return observer.preExists(this, get, getResult()); 881 } 882 }); 883 } 884 885 /** 886 * @param get the Get request 887 * @param result the result returned by the region server 888 * @return the result to return to the client 889 * @exception IOException Exception 890 */ 891 public boolean postExists(final Get get, boolean result) 892 throws IOException { 893 if (this.coprocEnvironments.isEmpty()) { 894 return result; 895 } 896 return execOperationWithResult( 897 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 898 @Override 899 public Boolean call(RegionObserver observer) throws IOException { 900 return observer.postExists(this, get, getResult()); 901 } 902 }); 903 } 904 905 /** 906 * Supports Coprocessor 'bypass'. 907 * @param put The Put object 908 * @param edit The WALEdit object. 909 * @param durability The durability used 910 * @return true if default processing should be bypassed 911 * @exception IOException Exception 912 */ 913 public boolean prePut(final Put put, final WALEdit edit, final Durability durability) 914 throws IOException { 915 if (coprocEnvironments.isEmpty()) { 916 return false; 917 } 918 boolean bypassable = true; 919 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 920 @Override 921 public void call(RegionObserver observer) throws IOException { 922 observer.prePut(this, put, edit, durability); 923 } 924 }); 925 } 926 927 /** 928 * Supports Coprocessor 'bypass'. 929 * @param mutation - the current mutation 930 * @param kv - the current cell 931 * @param byteNow - current timestamp in bytes 932 * @param get - the get that could be used 933 * Note that the get only does not specify the family and qualifier that should be used 934 * @return true if default processing should be bypassed 935 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 936 * Coprocessor for its needs only. Will be removed. 937 */ 938 @Deprecated 939 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, 940 final Cell kv, final byte[] byteNow, final Get get) throws IOException { 941 if (coprocEnvironments.isEmpty()) { 942 return false; 943 } 944 boolean bypassable = true; 945 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 946 @Override 947 public void call(RegionObserver observer) throws IOException { 948 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 949 } 950 }); 951 } 952 953 /** 954 * @param put The Put object 955 * @param edit The WALEdit object. 956 * @param durability The durability used 957 * @exception IOException Exception 958 */ 959 public void postPut(final Put put, final WALEdit edit, final Durability durability) 960 throws IOException { 961 if (coprocEnvironments.isEmpty()) { 962 return; 963 } 964 execOperation(new RegionObserverOperationWithoutResult() { 965 @Override 966 public void call(RegionObserver observer) throws IOException { 967 observer.postPut(this, put, edit, durability); 968 } 969 }); 970 } 971 972 /** 973 * Supports Coprocessor 'bypass'. 974 * @param delete The Delete object 975 * @param edit The WALEdit object. 976 * @param durability The durability used 977 * @return true if default processing should be bypassed 978 * @exception IOException Exception 979 */ 980 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) 981 throws IOException { 982 if (this.coprocEnvironments.isEmpty()) { 983 return false; 984 } 985 boolean bypassable = true; 986 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 987 @Override 988 public void call(RegionObserver observer) throws IOException { 989 observer.preDelete(this, delete, edit, durability); 990 } 991 }); 992 } 993 994 /** 995 * @param delete The Delete object 996 * @param edit The WALEdit object. 997 * @param durability The durability used 998 * @exception IOException Exception 999 */ 1000 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) 1001 throws IOException { 1002 execOperation(coprocEnvironments.isEmpty()? null: 1003 new RegionObserverOperationWithoutResult() { 1004 @Override 1005 public void call(RegionObserver observer) throws IOException { 1006 observer.postDelete(this, delete, edit, durability); 1007 } 1008 }); 1009 } 1010 1011 public void preBatchMutate( 1012 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1013 if(this.coprocEnvironments.isEmpty()) { 1014 return; 1015 } 1016 execOperation(new RegionObserverOperationWithoutResult() { 1017 @Override 1018 public void call(RegionObserver observer) throws IOException { 1019 observer.preBatchMutate(this, miniBatchOp); 1020 } 1021 }); 1022 } 1023 1024 public void postBatchMutate( 1025 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1026 if (this.coprocEnvironments.isEmpty()) { 1027 return; 1028 } 1029 execOperation(new RegionObserverOperationWithoutResult() { 1030 @Override 1031 public void call(RegionObserver observer) throws IOException { 1032 observer.postBatchMutate(this, miniBatchOp); 1033 } 1034 }); 1035 } 1036 1037 public void postBatchMutateIndispensably( 1038 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) 1039 throws IOException { 1040 if (this.coprocEnvironments.isEmpty()) { 1041 return; 1042 } 1043 execOperation(new RegionObserverOperationWithoutResult() { 1044 @Override 1045 public void call(RegionObserver observer) throws IOException { 1046 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1047 } 1048 }); 1049 } 1050 1051 /** 1052 * Supports Coprocessor 'bypass'. 1053 * @param checkAndMutate the CheckAndMutate object 1054 * @return true or false to return to client if default processing should be bypassed, or null 1055 * otherwise 1056 * @throws IOException if an error occurred on the coprocessor 1057 */ 1058 public CheckAndMutateResult preCheckAndMutate(CheckAndMutate checkAndMutate) 1059 throws IOException { 1060 boolean bypassable = true; 1061 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1062 if (coprocEnvironments.isEmpty()) { 1063 return null; 1064 } 1065 return execOperationWithResult( 1066 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>( 1067 regionObserverGetter, defaultResult, bypassable) { 1068 @Override 1069 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1070 return observer.preCheckAndMutate(this, checkAndMutate, getResult()); 1071 } 1072 }); 1073 } 1074 1075 /** 1076 * Supports Coprocessor 'bypass'. 1077 * @param checkAndMutate the CheckAndMutate object 1078 * @return true or false to return to client if default processing should be bypassed, or null 1079 * otherwise 1080 * @throws IOException if an error occurred on the coprocessor 1081 */ 1082 public CheckAndMutateResult preCheckAndMutateAfterRowLock(CheckAndMutate checkAndMutate) 1083 throws IOException { 1084 boolean bypassable = true; 1085 CheckAndMutateResult defaultResult = new CheckAndMutateResult(false, null); 1086 if (coprocEnvironments.isEmpty()) { 1087 return null; 1088 } 1089 return execOperationWithResult( 1090 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>( 1091 regionObserverGetter, defaultResult, bypassable) { 1092 @Override 1093 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1094 return observer.preCheckAndMutateAfterRowLock(this, checkAndMutate, getResult()); 1095 } 1096 }); 1097 } 1098 1099 /** 1100 * @param checkAndMutate the CheckAndMutate object 1101 * @param result the result returned by the checkAndMutate 1102 * @return true or false to return to client if default processing should be bypassed, or null 1103 * otherwise 1104 * @throws IOException if an error occurred on the coprocessor 1105 */ 1106 public CheckAndMutateResult postCheckAndMutate(CheckAndMutate checkAndMutate, 1107 CheckAndMutateResult result) throws IOException { 1108 if (this.coprocEnvironments.isEmpty()) { 1109 return result; 1110 } 1111 return execOperationWithResult( 1112 new ObserverOperationWithResult<RegionObserver, CheckAndMutateResult>( 1113 regionObserverGetter, result) { 1114 @Override 1115 public CheckAndMutateResult call(RegionObserver observer) throws IOException { 1116 return observer.postCheckAndMutate(this, checkAndMutate, getResult()); 1117 } 1118 }); 1119 } 1120 1121 /** 1122 * Supports Coprocessor 'bypass'. 1123 * @param append append object 1124 * @return result to return to client if default operation should be bypassed, null otherwise 1125 * @throws IOException if an error occurred on the coprocessor 1126 */ 1127 public Result preAppend(final Append append) throws IOException { 1128 boolean bypassable = true; 1129 Result defaultResult = null; 1130 if (this.coprocEnvironments.isEmpty()) { 1131 return defaultResult; 1132 } 1133 return execOperationWithResult( 1134 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1135 bypassable) { 1136 @Override 1137 public Result call(RegionObserver observer) throws IOException { 1138 return observer.preAppend(this, append); 1139 } 1140 }); 1141 } 1142 1143 /** 1144 * Supports Coprocessor 'bypass'. 1145 * @param append append object 1146 * @return result to return to client if default operation should be bypassed, null otherwise 1147 * @throws IOException if an error occurred on the coprocessor 1148 */ 1149 public Result preAppendAfterRowLock(final Append append) throws IOException { 1150 boolean bypassable = true; 1151 Result defaultResult = null; 1152 if (this.coprocEnvironments.isEmpty()) { 1153 return defaultResult; 1154 } 1155 return execOperationWithResult( 1156 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, 1157 defaultResult, bypassable) { 1158 @Override 1159 public Result call(RegionObserver observer) throws IOException { 1160 return observer.preAppendAfterRowLock(this, append); 1161 } 1162 }); 1163 } 1164 1165 /** 1166 * Supports Coprocessor 'bypass'. 1167 * @param increment increment object 1168 * @return result to return to client if default operation should be bypassed, null otherwise 1169 * @throws IOException if an error occurred on the coprocessor 1170 */ 1171 public Result preIncrement(final Increment increment) throws IOException { 1172 boolean bypassable = true; 1173 Result defaultResult = null; 1174 if (coprocEnvironments.isEmpty()) { 1175 return defaultResult; 1176 } 1177 return execOperationWithResult( 1178 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1179 bypassable) { 1180 @Override 1181 public Result call(RegionObserver observer) throws IOException { 1182 return observer.preIncrement(this, increment); 1183 } 1184 }); 1185 } 1186 1187 /** 1188 * Supports Coprocessor 'bypass'. 1189 * @param increment increment object 1190 * @return result to return to client if default operation should be bypassed, null otherwise 1191 * @throws IOException if an error occurred on the coprocessor 1192 */ 1193 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1194 boolean bypassable = true; 1195 Result defaultResult = null; 1196 if (coprocEnvironments.isEmpty()) { 1197 return defaultResult; 1198 } 1199 return execOperationWithResult( 1200 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1201 bypassable) { 1202 @Override 1203 public Result call(RegionObserver observer) throws IOException { 1204 return observer.preIncrementAfterRowLock(this, increment); 1205 } 1206 }); 1207 } 1208 1209 /** 1210 * @param append Append object 1211 * @param result the result returned by the append 1212 * @throws IOException if an error occurred on the coprocessor 1213 */ 1214 public Result postAppend(final Append append, final Result result) throws IOException { 1215 if (this.coprocEnvironments.isEmpty()) { 1216 return result; 1217 } 1218 return execOperationWithResult( 1219 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1220 @Override 1221 public Result call(RegionObserver observer) throws IOException { 1222 return observer.postAppend(this, append, result); 1223 } 1224 }); 1225 } 1226 1227 /** 1228 * @param increment increment object 1229 * @param result the result returned by postIncrement 1230 * @throws IOException if an error occurred on the coprocessor 1231 */ 1232 public Result postIncrement(final Increment increment, Result result) throws IOException { 1233 if (this.coprocEnvironments.isEmpty()) { 1234 return result; 1235 } 1236 return execOperationWithResult( 1237 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1238 @Override 1239 public Result call(RegionObserver observer) throws IOException { 1240 return observer.postIncrement(this, increment, getResult()); 1241 } 1242 }); 1243 } 1244 1245 /** 1246 * @param scan the Scan specification 1247 * @exception IOException Exception 1248 */ 1249 public void preScannerOpen(final Scan scan) throws IOException { 1250 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1251 @Override 1252 public void call(RegionObserver observer) throws IOException { 1253 observer.preScannerOpen(this, scan); 1254 } 1255 }); 1256 } 1257 1258 /** 1259 * @param scan the Scan specification 1260 * @param s the scanner 1261 * @return the scanner instance to use 1262 * @exception IOException Exception 1263 */ 1264 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1265 if (this.coprocEnvironments.isEmpty()) { 1266 return s; 1267 } 1268 return execOperationWithResult( 1269 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1270 @Override 1271 public RegionScanner call(RegionObserver observer) throws IOException { 1272 return observer.postScannerOpen(this, scan, getResult()); 1273 } 1274 }); 1275 } 1276 1277 /** 1278 * @param s the scanner 1279 * @param results the result set returned by the region server 1280 * @param limit the maximum number of results to return 1281 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1282 * @exception IOException Exception 1283 */ 1284 public Boolean preScannerNext(final InternalScanner s, 1285 final List<Result> results, final int limit) throws IOException { 1286 boolean bypassable = true; 1287 boolean defaultResult = false; 1288 if (coprocEnvironments.isEmpty()) { 1289 return null; 1290 } 1291 return execOperationWithResult( 1292 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1293 defaultResult, bypassable) { 1294 @Override 1295 public Boolean call(RegionObserver observer) throws IOException { 1296 return observer.preScannerNext(this, s, results, limit, getResult()); 1297 } 1298 }); 1299 } 1300 1301 /** 1302 * @param s the scanner 1303 * @param results the result set returned by the region server 1304 * @param limit the maximum number of results to return 1305 * @param hasMore 1306 * @return 'has more' indication to give to client 1307 * @exception IOException Exception 1308 */ 1309 public boolean postScannerNext(final InternalScanner s, 1310 final List<Result> results, final int limit, boolean hasMore) 1311 throws IOException { 1312 if (this.coprocEnvironments.isEmpty()) { 1313 return hasMore; 1314 } 1315 return execOperationWithResult( 1316 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1317 @Override 1318 public Boolean call(RegionObserver observer) throws IOException { 1319 return observer.postScannerNext(this, s, results, limit, getResult()); 1320 } 1321 }); 1322 } 1323 1324 /** 1325 * This will be called by the scan flow when the current scanned row is being filtered out by the 1326 * filter. 1327 * @param s the scanner 1328 * @param curRowCell The cell in the current row which got filtered out 1329 * @return whether more rows are available for the scanner or not 1330 * @throws IOException 1331 */ 1332 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1333 throws IOException { 1334 // short circuit for performance 1335 boolean defaultResult = true; 1336 if (!hasCustomPostScannerFilterRow) { 1337 return defaultResult; 1338 } 1339 if (this.coprocEnvironments.isEmpty()) { 1340 return defaultResult; 1341 } 1342 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1343 regionObserverGetter, defaultResult) { 1344 @Override 1345 public Boolean call(RegionObserver observer) throws IOException { 1346 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1347 } 1348 }); 1349 } 1350 1351 /** 1352 * Supports Coprocessor 'bypass'. 1353 * @param s the scanner 1354 * @return true if default behavior should be bypassed, false otherwise 1355 * @exception IOException Exception 1356 */ 1357 // Should this be bypassable? 1358 public boolean preScannerClose(final InternalScanner s) throws IOException { 1359 return execOperation(coprocEnvironments.isEmpty()? null: 1360 new RegionObserverOperationWithoutResult(true) { 1361 @Override 1362 public void call(RegionObserver observer) throws IOException { 1363 observer.preScannerClose(this, s); 1364 } 1365 }); 1366 } 1367 1368 /** 1369 * @exception IOException Exception 1370 */ 1371 public void postScannerClose(final InternalScanner s) throws IOException { 1372 execOperation(coprocEnvironments.isEmpty()? null: 1373 new RegionObserverOperationWithoutResult() { 1374 @Override 1375 public void call(RegionObserver observer) throws IOException { 1376 observer.postScannerClose(this, s); 1377 } 1378 }); 1379 } 1380 1381 /** 1382 * Called before open store scanner for user scan. 1383 */ 1384 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1385 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1386 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1387 execOperation(new RegionObserverOperationWithoutResult() { 1388 @Override 1389 public void call(RegionObserver observer) throws IOException { 1390 observer.preStoreScannerOpen(this, store, builder); 1391 } 1392 }); 1393 return builder.build(); 1394 } 1395 1396 /** 1397 * @param info the RegionInfo for this region 1398 * @param edits the file of recovered edits 1399 */ 1400 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1401 execOperation(coprocEnvironments.isEmpty()? null: 1402 new RegionObserverOperationWithoutResult(true) { 1403 @Override 1404 public void call(RegionObserver observer) throws IOException { 1405 observer.preReplayWALs(this, info, edits); 1406 } 1407 }); 1408 } 1409 1410 /** 1411 * @param info the RegionInfo for this region 1412 * @param edits the file of recovered edits 1413 * @throws IOException Exception 1414 */ 1415 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1416 execOperation(coprocEnvironments.isEmpty()? null: 1417 new RegionObserverOperationWithoutResult() { 1418 @Override 1419 public void call(RegionObserver observer) throws IOException { 1420 observer.postReplayWALs(this, info, edits); 1421 } 1422 }); 1423 } 1424 1425 /** 1426 * Supports Coprocessor 'bypass'. 1427 * @return true if default behavior should be bypassed, false otherwise 1428 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1429 * with something that doesn't expose IntefaceAudience.Private classes. 1430 */ 1431 @Deprecated 1432 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1433 throws IOException { 1434 return execOperation(coprocEnvironments.isEmpty()? null: 1435 new RegionObserverOperationWithoutResult(true) { 1436 @Override 1437 public void call(RegionObserver observer) throws IOException { 1438 observer.preWALRestore(this, info, logKey, logEdit); 1439 } 1440 }); 1441 } 1442 1443 /** 1444 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1445 * with something that doesn't expose IntefaceAudience.Private classes. 1446 */ 1447 @Deprecated 1448 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1449 throws IOException { 1450 execOperation(coprocEnvironments.isEmpty()? null: 1451 new RegionObserverOperationWithoutResult() { 1452 @Override 1453 public void call(RegionObserver observer) throws IOException { 1454 observer.postWALRestore(this, info, logKey, logEdit); 1455 } 1456 }); 1457 } 1458 1459 /** 1460 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1461 */ 1462 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1463 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1464 @Override 1465 public void call(RegionObserver observer) throws IOException { 1466 observer.preBulkLoadHFile(this, familyPaths); 1467 } 1468 }); 1469 } 1470 1471 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1472 throws IOException { 1473 return execOperation(coprocEnvironments.isEmpty()? null: 1474 new RegionObserverOperationWithoutResult() { 1475 @Override 1476 public void call(RegionObserver observer) throws IOException { 1477 observer.preCommitStoreFile(this, family, pairs); 1478 } 1479 }); 1480 } 1481 1482 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { 1483 execOperation(coprocEnvironments.isEmpty()? null: 1484 new RegionObserverOperationWithoutResult() { 1485 @Override 1486 public void call(RegionObserver observer) throws IOException { 1487 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1488 } 1489 }); 1490 } 1491 1492 /** 1493 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1494 * @param map Map of CF to List of file paths for the final loaded files 1495 * @throws IOException 1496 */ 1497 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1498 Map<byte[], List<Path>> map) throws IOException { 1499 if (this.coprocEnvironments.isEmpty()) { 1500 return; 1501 } 1502 execOperation(coprocEnvironments.isEmpty()? null: 1503 new RegionObserverOperationWithoutResult() { 1504 @Override 1505 public void call(RegionObserver observer) throws IOException { 1506 observer.postBulkLoadHFile(this, familyPaths, map); 1507 } 1508 }); 1509 } 1510 1511 public void postStartRegionOperation(final Operation op) throws IOException { 1512 execOperation(coprocEnvironments.isEmpty()? null: 1513 new RegionObserverOperationWithoutResult() { 1514 @Override 1515 public void call(RegionObserver observer) throws IOException { 1516 observer.postStartRegionOperation(this, op); 1517 } 1518 }); 1519 } 1520 1521 public void postCloseRegionOperation(final Operation op) throws IOException { 1522 execOperation(coprocEnvironments.isEmpty()? null: 1523 new RegionObserverOperationWithoutResult() { 1524 @Override 1525 public void call(RegionObserver observer) throws IOException { 1526 observer.postCloseRegionOperation(this, op); 1527 } 1528 }); 1529 } 1530 1531 /** 1532 * @param fs fileystem to read from 1533 * @param p path to the file 1534 * @param in {@link FSDataInputStreamWrapper} 1535 * @param size Full size of the file 1536 * @param cacheConf 1537 * @param r original reference file. This will be not null only when reading a split file. 1538 * @return a Reader instance to use instead of the base reader if overriding 1539 * default behavior, null otherwise 1540 * @throws IOException 1541 */ 1542 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1543 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1544 final Reference r) throws IOException { 1545 if (coprocEnvironments.isEmpty()) { 1546 return null; 1547 } 1548 return execOperationWithResult( 1549 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1550 @Override 1551 public StoreFileReader call(RegionObserver observer) throws IOException { 1552 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1553 getResult()); 1554 } 1555 }); 1556 } 1557 1558 /** 1559 * @param fs fileystem to read from 1560 * @param p path to the file 1561 * @param in {@link FSDataInputStreamWrapper} 1562 * @param size Full size of the file 1563 * @param cacheConf 1564 * @param r original reference file. This will be not null only when reading a split file. 1565 * @param reader the base reader instance 1566 * @return The reader to use 1567 * @throws IOException 1568 */ 1569 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1570 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1571 final Reference r, final StoreFileReader reader) throws IOException { 1572 if (this.coprocEnvironments.isEmpty()) { 1573 return reader; 1574 } 1575 return execOperationWithResult( 1576 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) { 1577 @Override 1578 public StoreFileReader call(RegionObserver observer) throws IOException { 1579 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1580 getResult()); 1581 } 1582 }); 1583 } 1584 1585 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1586 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1587 if (this.coprocEnvironments.isEmpty()) { 1588 return cellPairs; 1589 } 1590 return execOperationWithResult( 1591 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1592 regionObserverGetter, cellPairs) { 1593 @Override 1594 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1595 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1596 } 1597 }); 1598 } 1599 1600 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1601 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1602 if (this.coprocEnvironments.isEmpty()) { 1603 return cellPairs; 1604 } 1605 return execOperationWithResult( 1606 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1607 regionObserverGetter, cellPairs) { 1608 @Override 1609 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1610 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1611 } 1612 }); 1613 } 1614 1615 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1616 if (this.coprocEnvironments.isEmpty()){ 1617 return; 1618 } 1619 execOperation(new RegionObserverOperationWithoutResult() { 1620 @Override 1621 public void call(RegionObserver observer) throws IOException { 1622 observer.preWALAppend(this, key, edit); 1623 } 1624 }); 1625 } 1626 1627 public Message preEndpointInvocation(final Service service, final String methodName, 1628 Message request) throws IOException { 1629 if (coprocEnvironments.isEmpty()) { 1630 return request; 1631 } 1632 return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver, 1633 Message>(endpointObserverGetter, request) { 1634 @Override 1635 public Message call(EndpointObserver observer) throws IOException { 1636 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1637 } 1638 }); 1639 } 1640 1641 public void postEndpointInvocation(final Service service, final String methodName, 1642 final Message request, final Message.Builder responseBuilder) throws IOException { 1643 execOperation(coprocEnvironments.isEmpty() ? null : 1644 new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1645 @Override 1646 public void call(EndpointObserver observer) throws IOException { 1647 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1648 } 1649 }); 1650 } 1651 1652 /** 1653 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1654 */ 1655 @Deprecated 1656 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1657 if (this.coprocEnvironments.isEmpty()) { 1658 return result; 1659 } 1660 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>( 1661 regionObserverGetter, result) { 1662 @Override 1663 public DeleteTracker call(RegionObserver observer) throws IOException { 1664 return observer.postInstantiateDeleteTracker(this, getResult()); 1665 } 1666 }); 1667 } 1668 1669 ///////////////////////////////////////////////////////////////////////////////////////////////// 1670 // BulkLoadObserver hooks 1671 ///////////////////////////////////////////////////////////////////////////////////////////////// 1672 public void prePrepareBulkLoad(User user) throws IOException { 1673 execOperation(coprocEnvironments.isEmpty() ? null : 1674 new BulkLoadObserverOperation(user) { 1675 @Override protected void call(BulkLoadObserver observer) throws IOException { 1676 observer.prePrepareBulkLoad(this); 1677 } 1678 }); 1679 } 1680 1681 public void preCleanupBulkLoad(User user) throws IOException { 1682 execOperation(coprocEnvironments.isEmpty() ? null : 1683 new BulkLoadObserverOperation(user) { 1684 @Override protected void call(BulkLoadObserver observer) throws IOException { 1685 observer.preCleanupBulkLoad(this); 1686 } 1687 }); 1688 } 1689}