001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import com.google.protobuf.Message; 022import com.google.protobuf.Service; 023 024import java.io.IOException; 025import java.lang.reflect.InvocationTargetException; 026import java.util.ArrayList; 027import java.util.List; 028import java.util.Map; 029import java.util.UUID; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CompareOperator; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.RawCellBuilder; 041import org.apache.hadoop.hbase.RawCellBuilderFactory; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.SharedConnection; 044import org.apache.hadoop.hbase.client.Append; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.Delete; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Get; 049import org.apache.hadoop.hbase.client.Increment; 050import org.apache.hadoop.hbase.client.Mutation; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; 057import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; 058import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorService; 061import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; 062import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 063import org.apache.hadoop.hbase.coprocessor.EndpointObserver; 064import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; 065import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor; 066import org.apache.hadoop.hbase.coprocessor.ObserverContext; 067import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 068import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 069import org.apache.hadoop.hbase.coprocessor.RegionObserver; 070import org.apache.hadoop.hbase.filter.ByteArrayComparable; 071import org.apache.hadoop.hbase.filter.Filter; 072import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 073import org.apache.hadoop.hbase.io.Reference; 074import org.apache.hadoop.hbase.io.hfile.CacheConfig; 075import org.apache.hadoop.hbase.metrics.MetricRegistry; 076import org.apache.hadoop.hbase.regionserver.Region.Operation; 077import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 078import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 079import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; 080import org.apache.hadoop.hbase.security.User; 081import org.apache.hadoop.hbase.util.CoprocessorClassLoader; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hbase.wal.WALEdit; 084import org.apache.hadoop.hbase.wal.WALKey; 085import org.apache.yetus.audience.InterfaceAudience; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; 090import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; 091 092/** 093 * Implements the coprocessor environment and runtime support for coprocessors 094 * loaded within a {@link Region}. 095 */ 096@InterfaceAudience.Private 097public class RegionCoprocessorHost 098 extends CoprocessorHost<RegionCoprocessor, RegionCoprocessorEnvironment> { 099 100 private static final Logger LOG = LoggerFactory.getLogger(RegionCoprocessorHost.class); 101 // The shared data map 102 private static final ReferenceMap<String, ConcurrentMap<String, Object>> SHARED_DATA_MAP = 103 new ReferenceMap<>(AbstractReferenceMap.ReferenceStrength.HARD, 104 AbstractReferenceMap.ReferenceStrength.WEAK); 105 106 // optimization: no need to call postScannerFilterRow, if no coprocessor implements it 107 private final boolean hasCustomPostScannerFilterRow; 108 109 /** 110 * 111 * Encapsulation of the environment of each coprocessor 112 */ 113 private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor> 114 implements RegionCoprocessorEnvironment { 115 private Region region; 116 ConcurrentMap<String, Object> sharedData; 117 private final MetricRegistry metricRegistry; 118 private final RegionServerServices services; 119 120 /** 121 * Constructor 122 * @param impl the coprocessor instance 123 * @param priority chaining priority 124 */ 125 public RegionEnvironment(final RegionCoprocessor impl, final int priority, 126 final int seq, final Configuration conf, final Region region, 127 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 128 super(impl, priority, seq, conf); 129 this.region = region; 130 this.sharedData = sharedData; 131 this.services = services; 132 this.metricRegistry = 133 MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); 134 } 135 136 /** @return the region */ 137 @Override 138 public Region getRegion() { 139 return region; 140 } 141 142 @Override 143 public OnlineRegions getOnlineRegions() { 144 return this.services; 145 } 146 147 @Override 148 public Connection getConnection() { 149 // Mocks may have services as null at test time. 150 return services != null ? new SharedConnection(services.getConnection()) : null; 151 } 152 153 @Override 154 public Connection createConnection(Configuration conf) throws IOException { 155 return services != null ? this.services.createConnection(conf) : null; 156 } 157 158 @Override 159 public ServerName getServerName() { 160 return services != null? services.getServerName(): null; 161 } 162 163 @Override 164 public void shutdown() { 165 super.shutdown(); 166 MetricsCoprocessor.removeRegistry(this.metricRegistry); 167 } 168 169 @Override 170 public ConcurrentMap<String, Object> getSharedData() { 171 return sharedData; 172 } 173 174 @Override 175 public RegionInfo getRegionInfo() { 176 return region.getRegionInfo(); 177 } 178 179 @Override 180 public MetricRegistry getMetricRegistryForRegionServer() { 181 return metricRegistry; 182 } 183 184 @Override 185 public RawCellBuilder getCellBuilder() { 186 // We always do a DEEP_COPY only 187 return RawCellBuilderFactory.create(); 188 } 189 } 190 191 /** 192 * Special version of RegionEnvironment that exposes RegionServerServices for Core 193 * Coprocessors only. Temporary hack until Core Coprocessors are integrated into Core. 194 */ 195 private static class RegionEnvironmentForCoreCoprocessors extends 196 RegionEnvironment implements HasRegionServerServices { 197 private final RegionServerServices rsServices; 198 199 public RegionEnvironmentForCoreCoprocessors(final RegionCoprocessor impl, final int priority, 200 final int seq, final Configuration conf, final Region region, 201 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { 202 super(impl, priority, seq, conf, region, services, sharedData); 203 this.rsServices = services; 204 } 205 206 /** 207 * @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor 208 * consumption. 209 */ 210 @Override 211 public RegionServerServices getRegionServerServices() { 212 return this.rsServices; 213 } 214 } 215 216 static class TableCoprocessorAttribute { 217 private Path path; 218 private String className; 219 private int priority; 220 private Configuration conf; 221 222 public TableCoprocessorAttribute(Path path, String className, int priority, 223 Configuration conf) { 224 this.path = path; 225 this.className = className; 226 this.priority = priority; 227 this.conf = conf; 228 } 229 230 public Path getPath() { 231 return path; 232 } 233 234 public String getClassName() { 235 return className; 236 } 237 238 public int getPriority() { 239 return priority; 240 } 241 242 public Configuration getConf() { 243 return conf; 244 } 245 } 246 247 /** The region server services */ 248 RegionServerServices rsServices; 249 /** The region */ 250 HRegion region; 251 252 /** 253 * Constructor 254 * @param region the region 255 * @param rsServices interface to available region server functionality 256 * @param conf the configuration 257 */ 258 public RegionCoprocessorHost(final HRegion region, 259 final RegionServerServices rsServices, final Configuration conf) { 260 super(rsServices); 261 this.conf = conf; 262 this.rsServices = rsServices; 263 this.region = region; 264 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode()); 265 266 // load system default cp's from configuration. 267 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY); 268 269 // load system default cp's for user tables from configuration. 270 if (!region.getRegionInfo().getTable().isSystemTable()) { 271 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY); 272 } 273 274 // load Coprocessor From HDFS 275 loadTableCoprocessors(conf); 276 277 // now check whether any coprocessor implements postScannerFilterRow 278 boolean hasCustomPostScannerFilterRow = false; 279 out: for (RegionCoprocessorEnvironment env: coprocEnvironments) { 280 if (env.getInstance() instanceof RegionObserver) { 281 Class<?> clazz = env.getInstance().getClass(); 282 for(;;) { 283 if (clazz == null) { 284 // we must have directly implemented RegionObserver 285 hasCustomPostScannerFilterRow = true; 286 break out; 287 } 288 try { 289 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 290 InternalScanner.class, Cell.class, boolean.class); 291 // this coprocessor has a custom version of postScannerFilterRow 292 hasCustomPostScannerFilterRow = true; 293 break out; 294 } catch (NoSuchMethodException ignore) { 295 } 296 // the deprecated signature still exists 297 try { 298 clazz.getDeclaredMethod("postScannerFilterRow", ObserverContext.class, 299 InternalScanner.class, byte[].class, int.class, short.class, boolean.class); 300 // this coprocessor has a custom version of postScannerFilterRow 301 hasCustomPostScannerFilterRow = true; 302 break out; 303 } catch (NoSuchMethodException ignore) { 304 } 305 clazz = clazz.getSuperclass(); 306 } 307 } 308 } 309 this.hasCustomPostScannerFilterRow = hasCustomPostScannerFilterRow; 310 } 311 312 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf, 313 TableDescriptor htd) { 314 return htd.getCoprocessorDescriptors().stream().map(cp -> { 315 Path path = cp.getJarPath().map(p -> new Path(p)).orElse(null); 316 Configuration ourConf; 317 if (!cp.getProperties().isEmpty()) { 318 // do an explicit deep copy of the passed configuration 319 ourConf = new Configuration(false); 320 HBaseConfiguration.merge(ourConf, conf); 321 cp.getProperties().forEach((k, v) -> ourConf.set(k, v)); 322 } else { 323 ourConf = conf; 324 } 325 return new TableCoprocessorAttribute(path, cp.getClassName(), cp.getPriority(), ourConf); 326 }).collect(Collectors.toList()); 327 } 328 329 /** 330 * Sanity check the table coprocessor attributes of the supplied schema. Will 331 * throw an exception if there is a problem. 332 * @param conf 333 * @param htd 334 * @throws IOException 335 */ 336 public static void testTableCoprocessorAttrs(final Configuration conf, 337 final TableDescriptor htd) throws IOException { 338 String pathPrefix = UUID.randomUUID().toString(); 339 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) { 340 if (attr.getPriority() < 0) { 341 throw new IOException("Priority for coprocessor " + attr.getClassName() + 342 " cannot be less than 0"); 343 } 344 ClassLoader old = Thread.currentThread().getContextClassLoader(); 345 try { 346 ClassLoader cl; 347 if (attr.getPath() != null) { 348 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(), 349 CoprocessorHost.class.getClassLoader(), pathPrefix, conf); 350 } else { 351 cl = CoprocessorHost.class.getClassLoader(); 352 } 353 Thread.currentThread().setContextClassLoader(cl); 354 if (cl instanceof CoprocessorClassLoader) { 355 String[] includedClassPrefixes = null; 356 if (conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY) != null) { 357 String prefixes = attr.conf.get(HConstants.CP_HTD_ATTR_INCLUSION_KEY); 358 includedClassPrefixes = prefixes.split(";"); 359 } 360 ((CoprocessorClassLoader)cl).loadClass(attr.getClassName(), includedClassPrefixes); 361 } else { 362 cl.loadClass(attr.getClassName()); 363 } 364 } catch (ClassNotFoundException e) { 365 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e); 366 } finally { 367 Thread.currentThread().setContextClassLoader(old); 368 } 369 } 370 } 371 372 void loadTableCoprocessors(final Configuration conf) { 373 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY, 374 DEFAULT_COPROCESSORS_ENABLED); 375 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, 376 DEFAULT_USER_COPROCESSORS_ENABLED); 377 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) { 378 return; 379 } 380 381 // scan the table attributes for coprocessor load specifications 382 // initialize the coprocessors 383 List<RegionCoprocessorEnvironment> configured = new ArrayList<>(); 384 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 385 region.getTableDescriptor())) { 386 // Load encompasses classloading and coprocessor initialization 387 try { 388 RegionCoprocessorEnvironment env = load(attr.getPath(), attr.getClassName(), 389 attr.getPriority(), attr.getConf()); 390 if (env == null) { 391 continue; 392 } 393 configured.add(env); 394 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " + 395 region.getTableDescriptor().getTableName().getNameAsString() + " successfully."); 396 } catch (Throwable t) { 397 // Coprocessor failed to load, do we abort on error? 398 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) { 399 abortServer(attr.getClassName(), t); 400 } else { 401 LOG.error("Failed to load coprocessor " + attr.getClassName(), t); 402 } 403 } 404 } 405 // add together to coprocessor set for COW efficiency 406 coprocEnvironments.addAll(configured); 407 } 408 409 @Override 410 public RegionEnvironment createEnvironment(RegionCoprocessor instance, int priority, int seq, 411 Configuration conf) { 412 // If coprocessor exposes any services, register them. 413 for (Service service : instance.getServices()) { 414 region.registerService(service); 415 } 416 ConcurrentMap<String, Object> classData; 417 // make sure only one thread can add maps 418 synchronized (SHARED_DATA_MAP) { 419 // as long as at least one RegionEnvironment holds on to its classData it will 420 // remain in this map 421 classData = 422 SHARED_DATA_MAP.computeIfAbsent(instance.getClass().getName(), 423 k -> new ConcurrentHashMap<>()); 424 } 425 // If a CoreCoprocessor, return a 'richer' environment, one laden with RegionServerServices. 426 return instance.getClass().isAnnotationPresent(CoreCoprocessor.class)? 427 new RegionEnvironmentForCoreCoprocessors(instance, priority, seq, conf, region, 428 rsServices, classData): 429 new RegionEnvironment(instance, priority, seq, conf, region, rsServices, classData); 430 } 431 432 @Override 433 public RegionCoprocessor checkAndGetInstance(Class<?> implClass) 434 throws InstantiationException, IllegalAccessException { 435 try { 436 if (RegionCoprocessor.class.isAssignableFrom(implClass)) { 437 return implClass.asSubclass(RegionCoprocessor.class).getDeclaredConstructor().newInstance(); 438 } else if (CoprocessorService.class.isAssignableFrom(implClass)) { 439 // For backward compatibility with old CoprocessorService impl which don't extend 440 // RegionCoprocessor. 441 CoprocessorService cs; 442 cs = implClass.asSubclass(CoprocessorService.class).getDeclaredConstructor().newInstance(); 443 return new CoprocessorServiceBackwardCompatiblity.RegionCoprocessorService(cs); 444 } else { 445 LOG.error("{} is not of type RegionCoprocessor. Check the configuration of {}", 446 implClass.getName(), CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); 447 return null; 448 } 449 } catch (NoSuchMethodException | InvocationTargetException e) { 450 throw (InstantiationException) new InstantiationException(implClass.getName()).initCause(e); 451 } 452 } 453 454 private ObserverGetter<RegionCoprocessor, RegionObserver> regionObserverGetter = 455 RegionCoprocessor::getRegionObserver; 456 457 private ObserverGetter<RegionCoprocessor, EndpointObserver> endpointObserverGetter = 458 RegionCoprocessor::getEndpointObserver; 459 460 abstract class RegionObserverOperationWithoutResult extends 461 ObserverOperationWithoutResult<RegionObserver> { 462 public RegionObserverOperationWithoutResult() { 463 super(regionObserverGetter); 464 } 465 466 public RegionObserverOperationWithoutResult(User user) { 467 super(regionObserverGetter, user); 468 } 469 470 public RegionObserverOperationWithoutResult(boolean bypassable) { 471 super(regionObserverGetter, null, bypassable); 472 } 473 474 public RegionObserverOperationWithoutResult(User user, boolean bypassable) { 475 super(regionObserverGetter, user, bypassable); 476 } 477 } 478 479 abstract class BulkLoadObserverOperation extends 480 ObserverOperationWithoutResult<BulkLoadObserver> { 481 public BulkLoadObserverOperation(User user) { 482 super(RegionCoprocessor::getBulkLoadObserver, user); 483 } 484 } 485 486 487 ////////////////////////////////////////////////////////////////////////////////////////////////// 488 // Observer operations 489 ////////////////////////////////////////////////////////////////////////////////////////////////// 490 491 ////////////////////////////////////////////////////////////////////////////////////////////////// 492 // Observer operations 493 ////////////////////////////////////////////////////////////////////////////////////////////////// 494 495 /** 496 * Invoked before a region open. 497 * 498 * @throws IOException Signals that an I/O exception has occurred. 499 */ 500 public void preOpen() throws IOException { 501 if (coprocEnvironments.isEmpty()) { 502 return; 503 } 504 execOperation(new RegionObserverOperationWithoutResult() { 505 @Override 506 public void call(RegionObserver observer) throws IOException { 507 observer.preOpen(this); 508 } 509 }); 510 } 511 512 513 /** 514 * Invoked after a region open 515 */ 516 public void postOpen() { 517 if (coprocEnvironments.isEmpty()) { 518 return; 519 } 520 try { 521 execOperation(new RegionObserverOperationWithoutResult() { 522 @Override 523 public void call(RegionObserver observer) throws IOException { 524 observer.postOpen(this); 525 } 526 }); 527 } catch (IOException e) { 528 LOG.warn(e.toString(), e); 529 } 530 } 531 532 /** 533 * Invoked before a region is closed 534 * @param abortRequested true if the server is aborting 535 */ 536 public void preClose(final boolean abortRequested) throws IOException { 537 execOperation(new RegionObserverOperationWithoutResult() { 538 @Override 539 public void call(RegionObserver observer) throws IOException { 540 observer.preClose(this, abortRequested); 541 } 542 }); 543 } 544 545 /** 546 * Invoked after a region is closed 547 * @param abortRequested true if the server is aborting 548 */ 549 public void postClose(final boolean abortRequested) { 550 try { 551 execOperation(new RegionObserverOperationWithoutResult() { 552 @Override 553 public void call(RegionObserver observer) throws IOException { 554 observer.postClose(this, abortRequested); 555 } 556 557 @Override 558 public void postEnvCall() { 559 shutdown(this.getEnvironment()); 560 } 561 }); 562 } catch (IOException e) { 563 LOG.warn(e.toString(), e); 564 } 565 } 566 567 /** 568 * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently 569 * available candidates. 570 * <p>Supports Coprocessor 'bypass' -- 'bypass' is how this method indicates that it changed 571 * the passed in <code>candidates</code>. 572 * @param store The store where compaction is being requested 573 * @param candidates The currently available store files 574 * @param tracker used to track the life cycle of a compaction 575 * @param user the user 576 * @throws IOException 577 */ 578 public boolean preCompactSelection(final HStore store, final List<HStoreFile> candidates, 579 final CompactionLifeCycleTracker tracker, final User user) throws IOException { 580 if (coprocEnvironments.isEmpty()) { 581 return false; 582 } 583 boolean bypassable = true; 584 return execOperation(new RegionObserverOperationWithoutResult(user, bypassable) { 585 @Override 586 public void call(RegionObserver observer) throws IOException { 587 observer.preCompactSelection(this, store, candidates, tracker); 588 } 589 }); 590 } 591 592 /** 593 * Called after the {@link HStoreFile}s to be compacted have been selected from the available 594 * candidates. 595 * @param store The store where compaction is being requested 596 * @param selected The store files selected to compact 597 * @param tracker used to track the life cycle of a compaction 598 * @param request the compaction request 599 * @param user the user 600 */ 601 public void postCompactSelection(final HStore store, final List<HStoreFile> selected, 602 final CompactionLifeCycleTracker tracker, final CompactionRequest request, 603 final User user) throws IOException { 604 if (coprocEnvironments.isEmpty()) { 605 return; 606 } 607 execOperation(new RegionObserverOperationWithoutResult(user) { 608 @Override 609 public void call(RegionObserver observer) throws IOException { 610 observer.postCompactSelection(this, store, selected, tracker, request); 611 } 612 }); 613 } 614 615 /** 616 * Called prior to opening store scanner for compaction. 617 */ 618 public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, 619 CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { 620 if (coprocEnvironments.isEmpty()) { 621 return store.getScanInfo(); 622 } 623 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 624 execOperation(new RegionObserverOperationWithoutResult(user) { 625 @Override 626 public void call(RegionObserver observer) throws IOException { 627 observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); 628 } 629 }); 630 return builder.build(); 631 } 632 633 /** 634 * Called prior to rewriting the store files selected for compaction 635 * @param store the store being compacted 636 * @param scanner the scanner used to read store data during compaction 637 * @param scanType type of Scan 638 * @param tracker used to track the life cycle of a compaction 639 * @param request the compaction request 640 * @param user the user 641 * @return Scanner to use (cannot be null!) 642 * @throws IOException 643 */ 644 public InternalScanner preCompact(final HStore store, final InternalScanner scanner, 645 final ScanType scanType, final CompactionLifeCycleTracker tracker, 646 final CompactionRequest request, final User user) throws IOException { 647 InternalScanner defaultResult = scanner; 648 if (coprocEnvironments.isEmpty()) { 649 return defaultResult; 650 } 651 return execOperationWithResult( 652 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, 653 defaultResult, user) { 654 @Override 655 public InternalScanner call(RegionObserver observer) throws IOException { 656 InternalScanner scanner = 657 observer.preCompact(this, store, getResult(), scanType, tracker, request); 658 if (scanner == null) { 659 throw new CoprocessorException("Null Scanner return disallowed!"); 660 } 661 return scanner; 662 } 663 }); 664 } 665 666 /** 667 * Called after the store compaction has completed. 668 * @param store the store being compacted 669 * @param resultFile the new store file written during compaction 670 * @param tracker used to track the life cycle of a compaction 671 * @param request the compaction request 672 * @param user the user 673 * @throws IOException 674 */ 675 public void postCompact(final HStore store, final HStoreFile resultFile, 676 final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) 677 throws IOException { 678 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult(user) { 679 @Override 680 public void call(RegionObserver observer) throws IOException { 681 observer.postCompact(this, store, resultFile, tracker, request); 682 } 683 }); 684 } 685 686 /** 687 * Invoked before create StoreScanner for flush. 688 */ 689 public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) 690 throws IOException { 691 if (coprocEnvironments.isEmpty()) { 692 return store.getScanInfo(); 693 } 694 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 695 execOperation(new RegionObserverOperationWithoutResult() { 696 @Override 697 public void call(RegionObserver observer) throws IOException { 698 observer.preFlushScannerOpen(this, store, builder, tracker); 699 } 700 }); 701 return builder.build(); 702 } 703 704 /** 705 * Invoked before a memstore flush 706 * @return Scanner to use (cannot be null!) 707 * @throws IOException 708 */ 709 public InternalScanner preFlush(HStore store, InternalScanner scanner, 710 FlushLifeCycleTracker tracker) throws IOException { 711 if (coprocEnvironments.isEmpty()) { 712 return scanner; 713 } 714 return execOperationWithResult( 715 new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) { 716 @Override 717 public InternalScanner call(RegionObserver observer) throws IOException { 718 InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); 719 if (scanner == null) { 720 throw new CoprocessorException("Null Scanner return disallowed!"); 721 } 722 return scanner; 723 } 724 }); 725 } 726 727 /** 728 * Invoked before a memstore flush 729 * @throws IOException 730 */ 731 public void preFlush(FlushLifeCycleTracker tracker) throws IOException { 732 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 733 @Override 734 public void call(RegionObserver observer) throws IOException { 735 observer.preFlush(this, tracker); 736 } 737 }); 738 } 739 740 /** 741 * Invoked after a memstore flush 742 * @throws IOException 743 */ 744 public void postFlush(FlushLifeCycleTracker tracker) throws IOException { 745 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 746 @Override 747 public void call(RegionObserver observer) throws IOException { 748 observer.postFlush(this, tracker); 749 } 750 }); 751 } 752 753 /** 754 * Invoked before in memory compaction. 755 */ 756 public void preMemStoreCompaction(HStore store) throws IOException { 757 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 758 @Override 759 public void call(RegionObserver observer) throws IOException { 760 observer.preMemStoreCompaction(this, store); 761 } 762 }); 763 } 764 765 /** 766 * Invoked before create StoreScanner for in memory compaction. 767 */ 768 public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { 769 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); 770 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 771 @Override 772 public void call(RegionObserver observer) throws IOException { 773 observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); 774 } 775 }); 776 return builder.build(); 777 } 778 779 /** 780 * Invoked before compacting memstore. 781 */ 782 public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) 783 throws IOException { 784 if (coprocEnvironments.isEmpty()) { 785 return scanner; 786 } 787 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>( 788 regionObserverGetter, scanner) { 789 @Override 790 public InternalScanner call(RegionObserver observer) throws IOException { 791 return observer.preMemStoreCompactionCompact(this, store, getResult()); 792 } 793 }); 794 } 795 796 /** 797 * Invoked after in memory compaction. 798 */ 799 public void postMemStoreCompaction(HStore store) throws IOException { 800 execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { 801 @Override 802 public void call(RegionObserver observer) throws IOException { 803 observer.postMemStoreCompaction(this, store); 804 } 805 }); 806 } 807 808 /** 809 * Invoked after a memstore flush 810 * @throws IOException 811 */ 812 public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) 813 throws IOException { 814 if (coprocEnvironments.isEmpty()) { 815 return; 816 } 817 execOperation(new RegionObserverOperationWithoutResult() { 818 @Override 819 public void call(RegionObserver observer) throws IOException { 820 observer.postFlush(this, store, storeFile, tracker); 821 } 822 }); 823 } 824 825 // RegionObserver support 826 /** 827 * Supports Coprocessor 'bypass'. 828 * @param get the Get request 829 * @param results What to return if return is true/'bypass'. 830 * @return true if default processing should be bypassed. 831 * @exception IOException Exception 832 */ 833 public boolean preGet(final Get get, final List<Cell> results) throws IOException { 834 if (coprocEnvironments.isEmpty()) { 835 return false; 836 } 837 boolean bypassable = true; 838 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 839 @Override 840 public void call(RegionObserver observer) throws IOException { 841 observer.preGetOp(this, get, results); 842 } 843 }); 844 } 845 846 /** 847 * @param get the Get request 848 * @param results the result set 849 * @exception IOException Exception 850 */ 851 public void postGet(final Get get, final List<Cell> results) 852 throws IOException { 853 if (coprocEnvironments.isEmpty()) { 854 return; 855 } 856 execOperation(new RegionObserverOperationWithoutResult() { 857 @Override 858 public void call(RegionObserver observer) throws IOException { 859 observer.postGetOp(this, get, results); 860 } 861 }); 862 } 863 864 /** 865 * Supports Coprocessor 'bypass'. 866 * @param get the Get request 867 * @return true or false to return to client if bypassing normal operation, or null otherwise 868 * @exception IOException Exception 869 */ 870 public Boolean preExists(final Get get) throws IOException { 871 boolean bypassable = true; 872 boolean defaultResult = false; 873 if (coprocEnvironments.isEmpty()) { 874 return null; 875 } 876 return execOperationWithResult( 877 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 878 defaultResult, bypassable) { 879 @Override 880 public Boolean call(RegionObserver observer) throws IOException { 881 return observer.preExists(this, get, getResult()); 882 } 883 }); 884 } 885 886 /** 887 * @param get the Get request 888 * @param result the result returned by the region server 889 * @return the result to return to the client 890 * @exception IOException Exception 891 */ 892 public boolean postExists(final Get get, boolean result) 893 throws IOException { 894 if (this.coprocEnvironments.isEmpty()) { 895 return result; 896 } 897 return execOperationWithResult( 898 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 899 @Override 900 public Boolean call(RegionObserver observer) throws IOException { 901 return observer.postExists(this, get, getResult()); 902 } 903 }); 904 } 905 906 /** 907 * Supports Coprocessor 'bypass'. 908 * @param put The Put object 909 * @param edit The WALEdit object. 910 * @param durability The durability used 911 * @return true if default processing should be bypassed 912 * @exception IOException Exception 913 */ 914 public boolean prePut(final Put put, final WALEdit edit, final Durability durability) 915 throws IOException { 916 if (coprocEnvironments.isEmpty()) { 917 return false; 918 } 919 boolean bypassable = true; 920 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 921 @Override 922 public void call(RegionObserver observer) throws IOException { 923 observer.prePut(this, put, edit, durability); 924 } 925 }); 926 } 927 928 /** 929 * Supports Coprocessor 'bypass'. 930 * @param mutation - the current mutation 931 * @param kv - the current cell 932 * @param byteNow - current timestamp in bytes 933 * @param get - the get that could be used 934 * Note that the get only does not specify the family and qualifier that should be used 935 * @return true if default processing should be bypassed 936 * @deprecated In hbase-2.0.0. Will be removed in hbase-3.0.0. Added explicitly for a single 937 * Coprocessor for its needs only. Will be removed. 938 */ 939 @Deprecated 940 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, 941 final Cell kv, final byte[] byteNow, final Get get) throws IOException { 942 if (coprocEnvironments.isEmpty()) { 943 return false; 944 } 945 boolean bypassable = true; 946 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 947 @Override 948 public void call(RegionObserver observer) throws IOException { 949 observer.prePrepareTimeStampForDeleteVersion(this, mutation, kv, byteNow, get); 950 } 951 }); 952 } 953 954 /** 955 * @param put The Put object 956 * @param edit The WALEdit object. 957 * @param durability The durability used 958 * @exception IOException Exception 959 */ 960 public void postPut(final Put put, final WALEdit edit, final Durability durability) 961 throws IOException { 962 if (coprocEnvironments.isEmpty()) { 963 return; 964 } 965 execOperation(new RegionObserverOperationWithoutResult() { 966 @Override 967 public void call(RegionObserver observer) throws IOException { 968 observer.postPut(this, put, edit, durability); 969 } 970 }); 971 } 972 973 /** 974 * Supports Coprocessor 'bypass'. 975 * @param delete The Delete object 976 * @param edit The WALEdit object. 977 * @param durability The durability used 978 * @return true if default processing should be bypassed 979 * @exception IOException Exception 980 */ 981 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) 982 throws IOException { 983 if (this.coprocEnvironments.isEmpty()) { 984 return false; 985 } 986 boolean bypassable = true; 987 return execOperation(new RegionObserverOperationWithoutResult(bypassable) { 988 @Override 989 public void call(RegionObserver observer) throws IOException { 990 observer.preDelete(this, delete, edit, durability); 991 } 992 }); 993 } 994 995 /** 996 * @param delete The Delete object 997 * @param edit The WALEdit object. 998 * @param durability The durability used 999 * @exception IOException Exception 1000 */ 1001 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) 1002 throws IOException { 1003 execOperation(coprocEnvironments.isEmpty()? null: 1004 new RegionObserverOperationWithoutResult() { 1005 @Override 1006 public void call(RegionObserver observer) throws IOException { 1007 observer.postDelete(this, delete, edit, durability); 1008 } 1009 }); 1010 } 1011 1012 public void preBatchMutate( 1013 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1014 if(this.coprocEnvironments.isEmpty()) { 1015 return; 1016 } 1017 execOperation(new RegionObserverOperationWithoutResult() { 1018 @Override 1019 public void call(RegionObserver observer) throws IOException { 1020 observer.preBatchMutate(this, miniBatchOp); 1021 } 1022 }); 1023 } 1024 1025 public void postBatchMutate( 1026 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 1027 if (this.coprocEnvironments.isEmpty()) { 1028 return; 1029 } 1030 execOperation(new RegionObserverOperationWithoutResult() { 1031 @Override 1032 public void call(RegionObserver observer) throws IOException { 1033 observer.postBatchMutate(this, miniBatchOp); 1034 } 1035 }); 1036 } 1037 1038 public void postBatchMutateIndispensably( 1039 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) 1040 throws IOException { 1041 if (this.coprocEnvironments.isEmpty()) { 1042 return; 1043 } 1044 execOperation(new RegionObserverOperationWithoutResult() { 1045 @Override 1046 public void call(RegionObserver observer) throws IOException { 1047 observer.postBatchMutateIndispensably(this, miniBatchOp, success); 1048 } 1049 }); 1050 } 1051 1052 /** 1053 * Supports Coprocessor 'bypass'. 1054 * @param row row to check 1055 * @param family column family 1056 * @param qualifier column qualifier 1057 * @param op the comparison operation 1058 * @param comparator the comparator 1059 * @param put data to put if check succeeds 1060 * @return true or false to return to client if default processing should be bypassed, or null 1061 * otherwise 1062 */ 1063 public Boolean preCheckAndPut(final byte [] row, final byte [] family, 1064 final byte [] qualifier, final CompareOperator op, 1065 final ByteArrayComparable comparator, final Put put) 1066 throws IOException { 1067 boolean bypassable = true; 1068 boolean defaultResult = false; 1069 if (coprocEnvironments.isEmpty()) { 1070 return null; 1071 } 1072 return execOperationWithResult( 1073 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1074 defaultResult, bypassable) { 1075 @Override 1076 public Boolean call(RegionObserver observer) throws IOException { 1077 return observer.preCheckAndPut(this, row, family, qualifier, 1078 op, comparator, put, getResult()); 1079 } 1080 }); 1081 } 1082 1083 /** 1084 * Supports Coprocessor 'bypass'. 1085 * @param row row to check 1086 * @param filter filter 1087 * @param put data to put if check succeeds 1088 * @return true or false to return to client if default processing should be bypassed, or null 1089 * otherwise 1090 */ 1091 public Boolean preCheckAndPut(final byte [] row, final Filter filter, final Put put) 1092 throws IOException { 1093 boolean bypassable = true; 1094 boolean defaultResult = false; 1095 if (coprocEnvironments.isEmpty()) { 1096 return null; 1097 } 1098 return execOperationWithResult( 1099 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1100 defaultResult, bypassable) { 1101 @Override 1102 public Boolean call(RegionObserver observer) throws IOException { 1103 return observer.preCheckAndPut(this, row, filter, put, getResult()); 1104 } 1105 }); 1106 } 1107 1108 /** 1109 * Supports Coprocessor 'bypass'. 1110 * @param row row to check 1111 * @param family column family 1112 * @param qualifier column qualifier 1113 * @param op the comparison operation 1114 * @param comparator the comparator 1115 * @param put data to put if check succeeds 1116 * @return true or false to return to client if default processing should be bypassed, or null 1117 * otherwise 1118 */ 1119 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1120 justification="Null is legit") 1121 public Boolean preCheckAndPutAfterRowLock( 1122 final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, 1123 final ByteArrayComparable comparator, final Put put) throws IOException { 1124 boolean bypassable = true; 1125 boolean defaultResult = false; 1126 if (coprocEnvironments.isEmpty()) { 1127 return null; 1128 } 1129 return execOperationWithResult( 1130 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1131 defaultResult, bypassable) { 1132 @Override 1133 public Boolean call(RegionObserver observer) throws IOException { 1134 return observer.preCheckAndPutAfterRowLock(this, row, family, qualifier, 1135 op, comparator, put, getResult()); 1136 } 1137 }); 1138 } 1139 1140 /** 1141 * Supports Coprocessor 'bypass'. 1142 * @param row row to check 1143 * @param filter filter 1144 * @param put data to put if check succeeds 1145 * @return true or false to return to client if default processing should be bypassed, or null 1146 * otherwise 1147 */ 1148 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1149 justification="Null is legit") 1150 public Boolean preCheckAndPutAfterRowLock( 1151 final byte[] row, final Filter filter, final Put put) throws IOException { 1152 boolean bypassable = true; 1153 boolean defaultResult = false; 1154 if (coprocEnvironments.isEmpty()) { 1155 return null; 1156 } 1157 return execOperationWithResult( 1158 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1159 defaultResult, bypassable) { 1160 @Override 1161 public Boolean call(RegionObserver observer) throws IOException { 1162 return observer.preCheckAndPutAfterRowLock(this, row, filter, put, getResult()); 1163 } 1164 }); 1165 } 1166 1167 /** 1168 * @param row row to check 1169 * @param family column family 1170 * @param qualifier column qualifier 1171 * @param op the comparison operation 1172 * @param comparator the comparator 1173 * @param put data to put if check succeeds 1174 * @throws IOException e 1175 */ 1176 public boolean postCheckAndPut(final byte [] row, final byte [] family, 1177 final byte [] qualifier, final CompareOperator op, 1178 final ByteArrayComparable comparator, final Put put, 1179 boolean result) throws IOException { 1180 if (this.coprocEnvironments.isEmpty()) { 1181 return result; 1182 } 1183 return execOperationWithResult( 1184 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1185 @Override 1186 public Boolean call(RegionObserver observer) throws IOException { 1187 return observer.postCheckAndPut(this, row, family, qualifier, 1188 op, comparator, put, getResult()); 1189 } 1190 }); 1191 } 1192 1193 /** 1194 * @param row row to check 1195 * @param filter filter 1196 * @param put data to put if check succeeds 1197 * @throws IOException e 1198 */ 1199 public boolean postCheckAndPut(final byte [] row, final Filter filter, final Put put, 1200 boolean result) throws IOException { 1201 if (this.coprocEnvironments.isEmpty()) { 1202 return result; 1203 } 1204 return execOperationWithResult( 1205 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1206 @Override 1207 public Boolean call(RegionObserver observer) throws IOException { 1208 return observer.postCheckAndPut(this, row, filter, put, getResult()); 1209 } 1210 }); 1211 } 1212 1213 /** 1214 * Supports Coprocessor 'bypass'. 1215 * @param row row to check 1216 * @param family column family 1217 * @param qualifier column qualifier 1218 * @param op the comparison operation 1219 * @param comparator the comparator 1220 * @param delete delete to commit if check succeeds 1221 * @return true or false to return to client if default processing should be bypassed, or null 1222 * otherwise 1223 */ 1224 public Boolean preCheckAndDelete(final byte [] row, final byte [] family, 1225 final byte [] qualifier, final CompareOperator op, 1226 final ByteArrayComparable comparator, final Delete delete) 1227 throws IOException { 1228 boolean bypassable = true; 1229 boolean defaultResult = false; 1230 if (coprocEnvironments.isEmpty()) { 1231 return null; 1232 } 1233 return execOperationWithResult( 1234 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1235 defaultResult, bypassable) { 1236 @Override 1237 public Boolean call(RegionObserver observer) throws IOException { 1238 return observer.preCheckAndDelete(this, row, family, 1239 qualifier, op, comparator, delete, getResult()); 1240 } 1241 }); 1242 } 1243 1244 /** 1245 * Supports Coprocessor 'bypass'. 1246 * @param row row to check 1247 * @param filter filter 1248 * @param delete delete to commit if check succeeds 1249 * @return true or false to return to client if default processing should be bypassed, or null 1250 * otherwise 1251 */ 1252 public Boolean preCheckAndDelete(final byte [] row, final Filter filter, final Delete delete) 1253 throws IOException { 1254 boolean bypassable = true; 1255 boolean defaultResult = false; 1256 if (coprocEnvironments.isEmpty()) { 1257 return null; 1258 } 1259 return execOperationWithResult( 1260 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1261 defaultResult, bypassable) { 1262 @Override 1263 public Boolean call(RegionObserver observer) throws IOException { 1264 return observer.preCheckAndDelete(this, row, filter, delete, getResult()); 1265 } 1266 }); 1267 } 1268 1269 /** 1270 * Supports Coprocessor 'bypass'. 1271 * @param row row to check 1272 * @param family column family 1273 * @param qualifier column qualifier 1274 * @param op the comparison operation 1275 * @param comparator the comparator 1276 * @param delete delete to commit if check succeeds 1277 * @return true or false to return to client if default processing should be bypassed, 1278 * or null otherwise 1279 */ 1280 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1281 justification="Null is legit") 1282 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, 1283 final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator, 1284 final Delete delete) throws IOException { 1285 boolean bypassable = true; 1286 boolean defaultResult = false; 1287 if (coprocEnvironments.isEmpty()) { 1288 return null; 1289 } 1290 return execOperationWithResult( 1291 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1292 defaultResult, bypassable) { 1293 @Override 1294 public Boolean call(RegionObserver observer) throws IOException { 1295 return observer.preCheckAndDeleteAfterRowLock(this, row, 1296 family, qualifier, op, comparator, delete, getResult()); 1297 } 1298 }); 1299 } 1300 1301 /** 1302 * Supports Coprocessor 'bypass'. 1303 * @param row row to check 1304 * @param filter filter 1305 * @param delete delete to commit if check succeeds 1306 * @return true or false to return to client if default processing should be bypassed, 1307 * or null otherwise 1308 */ 1309 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL", 1310 justification="Null is legit") 1311 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final Filter filter, 1312 final Delete delete) throws IOException { 1313 boolean bypassable = true; 1314 boolean defaultResult = false; 1315 if (coprocEnvironments.isEmpty()) { 1316 return null; 1317 } 1318 return execOperationWithResult( 1319 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1320 defaultResult, bypassable) { 1321 @Override 1322 public Boolean call(RegionObserver observer) throws IOException { 1323 return observer.preCheckAndDeleteAfterRowLock(this, row, filter, delete, getResult()); 1324 } 1325 }); 1326 } 1327 1328 /** 1329 * @param row row to check 1330 * @param family column family 1331 * @param qualifier column qualifier 1332 * @param op the comparison operation 1333 * @param comparator the comparator 1334 * @param delete delete to commit if check succeeds 1335 * @throws IOException e 1336 */ 1337 public boolean postCheckAndDelete(final byte [] row, final byte [] family, 1338 final byte [] qualifier, final CompareOperator op, 1339 final ByteArrayComparable comparator, final Delete delete, 1340 boolean result) throws IOException { 1341 if (this.coprocEnvironments.isEmpty()) { 1342 return result; 1343 } 1344 return execOperationWithResult( 1345 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1346 @Override 1347 public Boolean call(RegionObserver observer) throws IOException { 1348 return observer.postCheckAndDelete(this, row, family, 1349 qualifier, op, comparator, delete, getResult()); 1350 } 1351 }); 1352 } 1353 1354 /** 1355 * @param row row to check 1356 * @param filter filter 1357 * @param delete delete to commit if check succeeds 1358 * @throws IOException e 1359 */ 1360 public boolean postCheckAndDelete(final byte [] row, final Filter filter, final Delete delete, 1361 boolean result) throws IOException { 1362 if (this.coprocEnvironments.isEmpty()) { 1363 return result; 1364 } 1365 return execOperationWithResult( 1366 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, result) { 1367 @Override 1368 public Boolean call(RegionObserver observer) throws IOException { 1369 return observer.postCheckAndDelete(this, row, filter, delete, getResult()); 1370 } 1371 }); 1372 } 1373 1374 /** 1375 * Supports Coprocessor 'bypass'. 1376 * @param append append object 1377 * @return result to return to client if default operation should be bypassed, null otherwise 1378 * @throws IOException if an error occurred on the coprocessor 1379 */ 1380 public Result preAppend(final Append append) throws IOException { 1381 boolean bypassable = true; 1382 Result defaultResult = null; 1383 if (this.coprocEnvironments.isEmpty()) { 1384 return defaultResult; 1385 } 1386 return execOperationWithResult( 1387 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1388 bypassable) { 1389 @Override 1390 public Result call(RegionObserver observer) throws IOException { 1391 return observer.preAppend(this, append); 1392 } 1393 }); 1394 } 1395 1396 /** 1397 * Supports Coprocessor 'bypass'. 1398 * @param append append object 1399 * @return result to return to client if default operation should be bypassed, null otherwise 1400 * @throws IOException if an error occurred on the coprocessor 1401 */ 1402 public Result preAppendAfterRowLock(final Append append) throws IOException { 1403 boolean bypassable = true; 1404 Result defaultResult = null; 1405 if (this.coprocEnvironments.isEmpty()) { 1406 return defaultResult; 1407 } 1408 return execOperationWithResult( 1409 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, 1410 defaultResult, bypassable) { 1411 @Override 1412 public Result call(RegionObserver observer) throws IOException { 1413 return observer.preAppendAfterRowLock(this, append); 1414 } 1415 }); 1416 } 1417 1418 /** 1419 * Supports Coprocessor 'bypass'. 1420 * @param increment increment object 1421 * @return result to return to client if default operation should be bypassed, null otherwise 1422 * @throws IOException if an error occurred on the coprocessor 1423 */ 1424 public Result preIncrement(final Increment increment) throws IOException { 1425 boolean bypassable = true; 1426 Result defaultResult = null; 1427 if (coprocEnvironments.isEmpty()) { 1428 return defaultResult; 1429 } 1430 return execOperationWithResult( 1431 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1432 bypassable) { 1433 @Override 1434 public Result call(RegionObserver observer) throws IOException { 1435 return observer.preIncrement(this, increment); 1436 } 1437 }); 1438 } 1439 1440 /** 1441 * Supports Coprocessor 'bypass'. 1442 * @param increment increment object 1443 * @return result to return to client if default operation should be bypassed, null otherwise 1444 * @throws IOException if an error occurred on the coprocessor 1445 */ 1446 public Result preIncrementAfterRowLock(final Increment increment) throws IOException { 1447 boolean bypassable = true; 1448 Result defaultResult = null; 1449 if (coprocEnvironments.isEmpty()) { 1450 return defaultResult; 1451 } 1452 return execOperationWithResult( 1453 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, defaultResult, 1454 bypassable) { 1455 @Override 1456 public Result call(RegionObserver observer) throws IOException { 1457 return observer.preIncrementAfterRowLock(this, increment); 1458 } 1459 }); 1460 } 1461 1462 /** 1463 * @param append Append object 1464 * @param result the result returned by the append 1465 * @throws IOException if an error occurred on the coprocessor 1466 */ 1467 public Result postAppend(final Append append, final Result result) throws IOException { 1468 if (this.coprocEnvironments.isEmpty()) { 1469 return result; 1470 } 1471 return execOperationWithResult( 1472 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1473 @Override 1474 public Result call(RegionObserver observer) throws IOException { 1475 return observer.postAppend(this, append, result); 1476 } 1477 }); 1478 } 1479 1480 /** 1481 * @param increment increment object 1482 * @param result the result returned by postIncrement 1483 * @throws IOException if an error occurred on the coprocessor 1484 */ 1485 public Result postIncrement(final Increment increment, Result result) throws IOException { 1486 if (this.coprocEnvironments.isEmpty()) { 1487 return result; 1488 } 1489 return execOperationWithResult( 1490 new ObserverOperationWithResult<RegionObserver, Result>(regionObserverGetter, result) { 1491 @Override 1492 public Result call(RegionObserver observer) throws IOException { 1493 return observer.postIncrement(this, increment, getResult()); 1494 } 1495 }); 1496 } 1497 1498 /** 1499 * @param scan the Scan specification 1500 * @exception IOException Exception 1501 */ 1502 public void preScannerOpen(final Scan scan) throws IOException { 1503 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1504 @Override 1505 public void call(RegionObserver observer) throws IOException { 1506 observer.preScannerOpen(this, scan); 1507 } 1508 }); 1509 } 1510 1511 /** 1512 * @param scan the Scan specification 1513 * @param s the scanner 1514 * @return the scanner instance to use 1515 * @exception IOException Exception 1516 */ 1517 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { 1518 if (this.coprocEnvironments.isEmpty()) { 1519 return s; 1520 } 1521 return execOperationWithResult( 1522 new ObserverOperationWithResult<RegionObserver, RegionScanner>(regionObserverGetter, s) { 1523 @Override 1524 public RegionScanner call(RegionObserver observer) throws IOException { 1525 return observer.postScannerOpen(this, scan, getResult()); 1526 } 1527 }); 1528 } 1529 1530 /** 1531 * @param s the scanner 1532 * @param results the result set returned by the region server 1533 * @param limit the maximum number of results to return 1534 * @return 'has next' indication to client if bypassing default behavior, or null otherwise 1535 * @exception IOException Exception 1536 */ 1537 public Boolean preScannerNext(final InternalScanner s, 1538 final List<Result> results, final int limit) throws IOException { 1539 boolean bypassable = true; 1540 boolean defaultResult = false; 1541 if (coprocEnvironments.isEmpty()) { 1542 return null; 1543 } 1544 return execOperationWithResult( 1545 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, 1546 defaultResult, bypassable) { 1547 @Override 1548 public Boolean call(RegionObserver observer) throws IOException { 1549 return observer.preScannerNext(this, s, results, limit, getResult()); 1550 } 1551 }); 1552 } 1553 1554 /** 1555 * @param s the scanner 1556 * @param results the result set returned by the region server 1557 * @param limit the maximum number of results to return 1558 * @param hasMore 1559 * @return 'has more' indication to give to client 1560 * @exception IOException Exception 1561 */ 1562 public boolean postScannerNext(final InternalScanner s, 1563 final List<Result> results, final int limit, boolean hasMore) 1564 throws IOException { 1565 if (this.coprocEnvironments.isEmpty()) { 1566 return hasMore; 1567 } 1568 return execOperationWithResult( 1569 new ObserverOperationWithResult<RegionObserver, Boolean>(regionObserverGetter, hasMore) { 1570 @Override 1571 public Boolean call(RegionObserver observer) throws IOException { 1572 return observer.postScannerNext(this, s, results, limit, getResult()); 1573 } 1574 }); 1575 } 1576 1577 /** 1578 * This will be called by the scan flow when the current scanned row is being filtered out by the 1579 * filter. 1580 * @param s the scanner 1581 * @param curRowCell The cell in the current row which got filtered out 1582 * @return whether more rows are available for the scanner or not 1583 * @throws IOException 1584 */ 1585 public boolean postScannerFilterRow(final InternalScanner s, final Cell curRowCell) 1586 throws IOException { 1587 // short circuit for performance 1588 boolean defaultResult = true; 1589 if (!hasCustomPostScannerFilterRow) { 1590 return defaultResult; 1591 } 1592 if (this.coprocEnvironments.isEmpty()) { 1593 return defaultResult; 1594 } 1595 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, Boolean>( 1596 regionObserverGetter, defaultResult) { 1597 @Override 1598 public Boolean call(RegionObserver observer) throws IOException { 1599 return observer.postScannerFilterRow(this, s, curRowCell, getResult()); 1600 } 1601 }); 1602 } 1603 1604 /** 1605 * Supports Coprocessor 'bypass'. 1606 * @param s the scanner 1607 * @return true if default behavior should be bypassed, false otherwise 1608 * @exception IOException Exception 1609 */ 1610 // Should this be bypassable? 1611 public boolean preScannerClose(final InternalScanner s) throws IOException { 1612 return execOperation(coprocEnvironments.isEmpty()? null: 1613 new RegionObserverOperationWithoutResult(true) { 1614 @Override 1615 public void call(RegionObserver observer) throws IOException { 1616 observer.preScannerClose(this, s); 1617 } 1618 }); 1619 } 1620 1621 /** 1622 * @exception IOException Exception 1623 */ 1624 public void postScannerClose(final InternalScanner s) throws IOException { 1625 execOperation(coprocEnvironments.isEmpty()? null: 1626 new RegionObserverOperationWithoutResult() { 1627 @Override 1628 public void call(RegionObserver observer) throws IOException { 1629 observer.postScannerClose(this, s); 1630 } 1631 }); 1632 } 1633 1634 /** 1635 * Called before open store scanner for user scan. 1636 */ 1637 public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException { 1638 if (coprocEnvironments.isEmpty()) return store.getScanInfo(); 1639 CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan); 1640 execOperation(new RegionObserverOperationWithoutResult() { 1641 @Override 1642 public void call(RegionObserver observer) throws IOException { 1643 observer.preStoreScannerOpen(this, store, builder); 1644 } 1645 }); 1646 return builder.build(); 1647 } 1648 1649 /** 1650 * @param info the RegionInfo for this region 1651 * @param edits the file of recovered edits 1652 */ 1653 public void preReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1654 execOperation(coprocEnvironments.isEmpty()? null: 1655 new RegionObserverOperationWithoutResult(true) { 1656 @Override 1657 public void call(RegionObserver observer) throws IOException { 1658 observer.preReplayWALs(this, info, edits); 1659 } 1660 }); 1661 } 1662 1663 /** 1664 * @param info the RegionInfo for this region 1665 * @param edits the file of recovered edits 1666 * @throws IOException Exception 1667 */ 1668 public void postReplayWALs(final RegionInfo info, final Path edits) throws IOException { 1669 execOperation(coprocEnvironments.isEmpty()? null: 1670 new RegionObserverOperationWithoutResult() { 1671 @Override 1672 public void call(RegionObserver observer) throws IOException { 1673 observer.postReplayWALs(this, info, edits); 1674 } 1675 }); 1676 } 1677 1678 /** 1679 * Supports Coprocessor 'bypass'. 1680 * @return true if default behavior should be bypassed, false otherwise 1681 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1682 * with something that doesn't expose IntefaceAudience.Private classes. 1683 */ 1684 @Deprecated 1685 public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1686 throws IOException { 1687 return execOperation(coprocEnvironments.isEmpty()? null: 1688 new RegionObserverOperationWithoutResult(true) { 1689 @Override 1690 public void call(RegionObserver observer) throws IOException { 1691 observer.preWALRestore(this, info, logKey, logEdit); 1692 } 1693 }); 1694 } 1695 1696 /** 1697 * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced 1698 * with something that doesn't expose IntefaceAudience.Private classes. 1699 */ 1700 @Deprecated 1701 public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) 1702 throws IOException { 1703 execOperation(coprocEnvironments.isEmpty()? null: 1704 new RegionObserverOperationWithoutResult() { 1705 @Override 1706 public void call(RegionObserver observer) throws IOException { 1707 observer.postWALRestore(this, info, logKey, logEdit); 1708 } 1709 }); 1710 } 1711 1712 /** 1713 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1714 */ 1715 public void preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { 1716 execOperation(coprocEnvironments.isEmpty()? null: new RegionObserverOperationWithoutResult() { 1717 @Override 1718 public void call(RegionObserver observer) throws IOException { 1719 observer.preBulkLoadHFile(this, familyPaths); 1720 } 1721 }); 1722 } 1723 1724 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs) 1725 throws IOException { 1726 return execOperation(coprocEnvironments.isEmpty()? null: 1727 new RegionObserverOperationWithoutResult() { 1728 @Override 1729 public void call(RegionObserver observer) throws IOException { 1730 observer.preCommitStoreFile(this, family, pairs); 1731 } 1732 }); 1733 } 1734 1735 public void postCommitStoreFile(final byte[] family, Path srcPath, Path dstPath) throws IOException { 1736 execOperation(coprocEnvironments.isEmpty()? null: 1737 new RegionObserverOperationWithoutResult() { 1738 @Override 1739 public void call(RegionObserver observer) throws IOException { 1740 observer.postCommitStoreFile(this, family, srcPath, dstPath); 1741 } 1742 }); 1743 } 1744 1745 /** 1746 * @param familyPaths pairs of { CF, file path } submitted for bulk load 1747 * @param map Map of CF to List of file paths for the final loaded files 1748 * @throws IOException 1749 */ 1750 public void postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, 1751 Map<byte[], List<Path>> map) throws IOException { 1752 if (this.coprocEnvironments.isEmpty()) { 1753 return; 1754 } 1755 execOperation(coprocEnvironments.isEmpty()? null: 1756 new RegionObserverOperationWithoutResult() { 1757 @Override 1758 public void call(RegionObserver observer) throws IOException { 1759 observer.postBulkLoadHFile(this, familyPaths, map); 1760 } 1761 }); 1762 } 1763 1764 public void postStartRegionOperation(final Operation op) throws IOException { 1765 execOperation(coprocEnvironments.isEmpty()? null: 1766 new RegionObserverOperationWithoutResult() { 1767 @Override 1768 public void call(RegionObserver observer) throws IOException { 1769 observer.postStartRegionOperation(this, op); 1770 } 1771 }); 1772 } 1773 1774 public void postCloseRegionOperation(final Operation op) throws IOException { 1775 execOperation(coprocEnvironments.isEmpty()? null: 1776 new RegionObserverOperationWithoutResult() { 1777 @Override 1778 public void call(RegionObserver observer) throws IOException { 1779 observer.postCloseRegionOperation(this, op); 1780 } 1781 }); 1782 } 1783 1784 /** 1785 * @param fs fileystem to read from 1786 * @param p path to the file 1787 * @param in {@link FSDataInputStreamWrapper} 1788 * @param size Full size of the file 1789 * @param cacheConf 1790 * @param r original reference file. This will be not null only when reading a split file. 1791 * @return a Reader instance to use instead of the base reader if overriding 1792 * default behavior, null otherwise 1793 * @throws IOException 1794 */ 1795 public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p, 1796 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1797 final Reference r) throws IOException { 1798 if (coprocEnvironments.isEmpty()) { 1799 return null; 1800 } 1801 return execOperationWithResult( 1802 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) { 1803 @Override 1804 public StoreFileReader call(RegionObserver observer) throws IOException { 1805 return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1806 getResult()); 1807 } 1808 }); 1809 } 1810 1811 /** 1812 * @param fs fileystem to read from 1813 * @param p path to the file 1814 * @param in {@link FSDataInputStreamWrapper} 1815 * @param size Full size of the file 1816 * @param cacheConf 1817 * @param r original reference file. This will be not null only when reading a split file. 1818 * @param reader the base reader instance 1819 * @return The reader to use 1820 * @throws IOException 1821 */ 1822 public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p, 1823 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf, 1824 final Reference r, final StoreFileReader reader) throws IOException { 1825 if (this.coprocEnvironments.isEmpty()) { 1826 return reader; 1827 } 1828 return execOperationWithResult( 1829 new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) { 1830 @Override 1831 public StoreFileReader call(RegionObserver observer) throws IOException { 1832 return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r, 1833 getResult()); 1834 } 1835 }); 1836 } 1837 1838 public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, 1839 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1840 if (this.coprocEnvironments.isEmpty()) { 1841 return cellPairs; 1842 } 1843 return execOperationWithResult( 1844 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1845 regionObserverGetter, cellPairs) { 1846 @Override 1847 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1848 return observer.postIncrementBeforeWAL(this, mutation, getResult()); 1849 } 1850 }); 1851 } 1852 1853 public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, 1854 final List<Pair<Cell, Cell>> cellPairs) throws IOException { 1855 if (this.coprocEnvironments.isEmpty()) { 1856 return cellPairs; 1857 } 1858 return execOperationWithResult( 1859 new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( 1860 regionObserverGetter, cellPairs) { 1861 @Override 1862 public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { 1863 return observer.postAppendBeforeWAL(this, mutation, getResult()); 1864 } 1865 }); 1866 } 1867 1868 public void preWALAppend(WALKey key, WALEdit edit) throws IOException { 1869 if (this.coprocEnvironments.isEmpty()){ 1870 return; 1871 } 1872 execOperation(new RegionObserverOperationWithoutResult() { 1873 @Override 1874 public void call(RegionObserver observer) throws IOException { 1875 observer.preWALAppend(this, key, edit); 1876 } 1877 }); 1878 } 1879 1880 public Message preEndpointInvocation(final Service service, final String methodName, 1881 Message request) throws IOException { 1882 if (coprocEnvironments.isEmpty()) { 1883 return request; 1884 } 1885 return execOperationWithResult(new ObserverOperationWithResult<EndpointObserver, 1886 Message>(endpointObserverGetter, request) { 1887 @Override 1888 public Message call(EndpointObserver observer) throws IOException { 1889 return observer.preEndpointInvocation(this, service, methodName, getResult()); 1890 } 1891 }); 1892 } 1893 1894 public void postEndpointInvocation(final Service service, final String methodName, 1895 final Message request, final Message.Builder responseBuilder) throws IOException { 1896 execOperation(coprocEnvironments.isEmpty() ? null : 1897 new ObserverOperationWithoutResult<EndpointObserver>(endpointObserverGetter) { 1898 @Override 1899 public void call(EndpointObserver observer) throws IOException { 1900 observer.postEndpointInvocation(this, service, methodName, request, responseBuilder); 1901 } 1902 }); 1903 } 1904 1905 /** 1906 * @deprecated Since 2.0 with out any replacement and will be removed in 3.0 1907 */ 1908 @Deprecated 1909 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker result) throws IOException { 1910 if (this.coprocEnvironments.isEmpty()) { 1911 return result; 1912 } 1913 return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, DeleteTracker>( 1914 regionObserverGetter, result) { 1915 @Override 1916 public DeleteTracker call(RegionObserver observer) throws IOException { 1917 return observer.postInstantiateDeleteTracker(this, getResult()); 1918 } 1919 }); 1920 } 1921 1922 ///////////////////////////////////////////////////////////////////////////////////////////////// 1923 // BulkLoadObserver hooks 1924 ///////////////////////////////////////////////////////////////////////////////////////////////// 1925 public void prePrepareBulkLoad(User user) throws IOException { 1926 execOperation(coprocEnvironments.isEmpty() ? null : 1927 new BulkLoadObserverOperation(user) { 1928 @Override protected void call(BulkLoadObserver observer) throws IOException { 1929 observer.prePrepareBulkLoad(this); 1930 } 1931 }); 1932 } 1933 1934 public void preCleanupBulkLoad(User user) throws IOException { 1935 execOperation(coprocEnvironments.isEmpty() ? null : 1936 new BulkLoadObserverOperation(user) { 1937 @Override protected void call(BulkLoadObserver observer) throws IOException { 1938 observer.preCleanupBulkLoad(this); 1939 } 1940 }); 1941 } 1942}